From 6b689b76c1e02b29f60ee29d140dfcde2abcc414 Mon Sep 17 00:00:00 2001 From: ashmrtn Date: Thu, 22 Dec 2022 14:29:15 -0800 Subject: [PATCH] Produce map of items that need backup details entries (#1892) ## Description Generate a `map[old ShortRef]new path.Path` for every item sourced from a base snapshot during backup. Return this information at the end of BackupCollections so that callers can use it to merge backup details if desired. This PR does a few auxiliary things as well: * thread current and previous path through hierarchy merging * add extra field to info tracked by corsoProgress to hold previous path * provide current and previous paths to streamBaseItems function ## Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No ## Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Test - [ ] :computer: CI/Deployment - [ ] :hamster: Trivial/Minor ## Issue(s) * #1800 ## Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- src/internal/kopia/upload.go | 127 ++++++++++++++++++++---- src/internal/kopia/upload_test.go | 71 +++++++++++-- src/internal/kopia/wrapper.go | 13 +-- src/internal/kopia/wrapper_test.go | 10 +- src/internal/operations/backup.go | 8 +- src/internal/operations/backup_test.go | 4 +- src/internal/streamstore/streamstore.go | 9 +- 7 files changed, 196 insertions(+), 46 deletions(-) diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index e4f14e58d..1b7267cbc 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -118,14 +118,18 @@ func (rw *restoreStreamReader) Read(p []byte) (n int, err error) { } type itemDetails struct { - info details.ItemInfo + info *details.ItemInfo repoPath path.Path + prevPath path.Path } type corsoProgress struct { snapshotfs.UploadProgress - pending map[string]*itemDetails - deets *details.Builder + pending map[string]*itemDetails + deets *details.Builder + // toMerge represents items that we don't have in-memory item info for. The + // item info for these items should be sourced from a base snapshot later on. + toMerge map[string]path.Path mu sync.RWMutex totalBytes int64 } @@ -153,6 +157,22 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) { return } + // These items were sourced from a base snapshot or were cached in kopia so we + // never had to materialize their details in-memory. + if d.info == nil { + // TODO(ashmrtn): We should probably be returning an error here? + if d.prevPath == nil { + return + } + + cp.mu.Lock() + defer cp.mu.Unlock() + + cp.toMerge[d.prevPath.ShortRef()] = d.repoPath + + return + } + parent := d.repoPath.ToBuilder().Dir() cp.deets.Add( @@ -160,7 +180,7 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) { d.repoPath.ShortRef(), parent.ShortRef(), true, - d.info, + *d.info, ) folders := []details.FolderEntry{} @@ -182,7 +202,7 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) { parent = nextParent } - cp.deets.AddFoldersForItem(folders, d.info) + cp.deets.AddFoldersForItem(folders, *d.info) } // Kopia interface function used as a callback when kopia finishes hashing a file. @@ -282,7 +302,12 @@ func collectionEntries( // Relative path given to us in the callback is missing the root // element. Add to pending set before calling the callback to avoid race // conditions when the item is completed. - d := &itemDetails{info: ei.Info(), repoPath: itemPath} + // + // TODO(ashmrtn): If we want to pull item info for cached item from a + // previous snapshot then we should populate prevPath here and leave + // info nil. + itemInfo := ei.Info() + d := &itemDetails{info: &itemInfo, repoPath: itemPath} progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) } @@ -309,6 +334,8 @@ func collectionEntries( func streamBaseEntries( ctx context.Context, cb func(context.Context, fs.Entry) error, + curPath path.Path, + prevPath path.Path, dir fs.Directory, encodedSeen map[string]struct{}, progress *corsoProgress, @@ -334,27 +361,43 @@ func streamBaseEntries( return nil } - if err := cb(ctx, entry); err != nil { - entName, err := decodeElement(entry.Name()) - if err != nil { - entName = entry.Name() - } + entName, err := decodeElement(entry.Name()) + if err != nil { + return errors.Wrapf(err, "unable to decode entry name %s", entry.Name()) + } - return errors.Wrapf(err, "executing callback on item %q", entName) + // For now assuming that item IDs don't need escaping. + itemPath, err := curPath.Append(entName, true) + if err != nil { + return errors.Wrap(err, "getting full item path for base entry") + } + + // We need the previous path so we can find this item in the base snapshot's + // backup details. If the item moved and we had only the new path, we'd be + // unable to find it in the old backup details because we wouldn't know what + // to look for. + prevItemPath, err := prevPath.Append(entName, true) + if err != nil { + return errors.Wrap(err, "getting previous full item path for base entry") + } + + // All items have item info in the base backup. However, we need to make + // sure we have enough metadata to find those entries. To do that we add the + // item to progress and having progress aggregate everything for later. + d := &itemDetails{info: nil, repoPath: itemPath, prevPath: prevItemPath} + progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) + + if err := cb(ctx, entry); err != nil { + return errors.Wrapf(err, "executing callback on item %q", itemPath) } return nil }) if err != nil { - name, err := decodeElement(dir.Name()) - if err != nil { - name = dir.Name() - } - return errors.Wrapf( err, "traversing items in base snapshot directory %q", - name, + curPath, ) } @@ -366,6 +409,8 @@ func streamBaseEntries( // kopia callbacks on directory entries. It binds the directory to the given // DataCollection. func getStreamItemFunc( + curPath path.Path, + prevPath path.Path, staticEnts []fs.Entry, streamedEnts data.Collection, baseDir fs.Directory, @@ -384,7 +429,15 @@ func getStreamItemFunc( seen, errs := collectionEntries(ctx, cb, streamedEnts, progress) - if err := streamBaseEntries(ctx, cb, baseDir, seen, progress); err != nil { + if err := streamBaseEntries( + ctx, + cb, + curPath, + prevPath, + baseDir, + seen, + progress, + ); err != nil { errs = multierror.Append( errs, errors.Wrap(err, "streaming base snapshot entries"), @@ -430,11 +483,25 @@ func buildKopiaDirs(dirName string, dir *treeMap, progress *corsoProgress) (fs.D return virtualfs.NewStreamingDirectory( encodeAsPath(dirName), - getStreamItemFunc(childDirs, dir.collection, dir.baseDir, progress), + getStreamItemFunc( + dir.currentPath, + dir.prevPath, + childDirs, + dir.collection, + dir.baseDir, + progress, + ), ), nil } type treeMap struct { + // path.Path representing the node's path. This is passed as a parameter to + // the stream item function so that even baseDir directories can properly + // generate the full path of items. + currentPath path.Path + // Previous path this directory may have resided at if it is sourced from a + // base snapshot. + prevPath path.Path // Child directories of this directory. childDirs map[string]*treeMap // Reference to data pulled from the external service. Contains only items in @@ -583,6 +650,8 @@ func inflateCollectionTree( } node.collection = s + node.currentPath = s.FullPath() + node.prevPath = s.PreviousPath() } // Check that each previous path has only one of the states of deleted, moved, @@ -708,7 +777,25 @@ func traverseBaseDir( return errors.Errorf("unable to get tree node for path %s", currentPath) } + curP, err := path.FromDataLayerPath(currentPath.String(), false) + if err != nil { + return errors.Errorf( + "unable to convert current path %s to path.Path", + currentPath, + ) + } + + oldP, err := path.FromDataLayerPath(oldDirPath.String(), false) + if err != nil { + return errors.Errorf( + "unable to convert old path %s to path.Path", + oldDirPath, + ) + } + node.baseDir = dir + node.currentPath = curP + node.prevPath = oldP } return nil diff --git a/src/internal/kopia/upload_test.go b/src/internal/kopia/upload_test.go index 3e84dddf9..fb70c557d 100644 --- a/src/internal/kopia/upload_test.go +++ b/src/internal/kopia/upload_test.go @@ -25,8 +25,8 @@ import ( "github.com/alcionai/corso/src/pkg/path" ) -func makePath(t *testing.T, elements []string) path.Path { - p, err := path.FromDataLayerPath(stdpath.Join(elements...), false) +func makePath(t *testing.T, elements []string, isItem bool) path.Path { + p, err := path.FromDataLayerPath(stdpath.Join(elements...), isItem) require.NoError(t, err) return p @@ -389,7 +389,7 @@ var finishedFileTable = []struct { cachedItems: func(fname string, fpath path.Path) map[string]testInfo { return map[string]testInfo{ fname: { - info: &itemDetails{details.ItemInfo{}, fpath}, + info: &itemDetails{info: &details.ItemInfo{}, repoPath: fpath}, err: nil, totalBytes: 100, }, @@ -416,7 +416,7 @@ var finishedFileTable = []struct { cachedItems: func(fname string, fpath path.Path) map[string]testInfo { return map[string]testInfo{ fname: { - info: &itemDetails{details.ItemInfo{}, fpath}, + info: &itemDetails{info: &details.ItemInfo{}, repoPath: fpath}, err: assert.AnError, }, } @@ -460,7 +460,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() { } } -func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchy() { +func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchyNewItem() { t := suite.T() // Order of folders in hierarchy from root to leaf (excluding the item). expectedFolderOrder := suite.targetFilePath.ToBuilder().Dir().Elements() @@ -471,14 +471,17 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchy() { UploadProgress: &snapshotfs.NullUploadProgress{}, deets: bd, pending: map[string]*itemDetails{}, + toMerge: map[string]path.Path{}, } - deets := &itemDetails{details.ItemInfo{}, suite.targetFilePath} + deets := &itemDetails{info: &details.ItemInfo{}, repoPath: suite.targetFilePath} cp.put(suite.targetFileName, deets) require.Len(t, cp.pending, 1) cp.FinishedFile(suite.targetFileName, nil) + assert.Empty(t, cp.toMerge) + // Gather information about the current state. var ( curRef *details.DetailsEntry @@ -521,6 +524,42 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchy() { assert.Empty(t, rootRef.ParentRef) } +func (suite *CorsoProgressUnitSuite) TestFinishedFileBaseItemDoesntBuildHierarchy() { + t := suite.T() + + prevPath := makePath( + suite.T(), + []string{testTenant, service, testUser, category, testInboxDir, testFileName2}, + true, + ) + + expectedToMerge := map[string]path.Path{ + prevPath.ShortRef(): suite.targetFilePath, + } + + // Setup stuff. + bd := &details.Builder{} + cp := corsoProgress{ + UploadProgress: &snapshotfs.NullUploadProgress{}, + deets: bd, + pending: map[string]*itemDetails{}, + toMerge: map[string]path.Path{}, + } + + deets := &itemDetails{ + info: nil, + repoPath: suite.targetFilePath, + prevPath: prevPath, + } + cp.put(suite.targetFileName, deets) + require.Len(t, cp.pending, 1) + + cp.FinishedFile(suite.targetFileName, nil) + + assert.Equal(t, expectedToMerge, cp.toMerge) + assert.Empty(t, cp.deets) +} + func (suite *CorsoProgressUnitSuite) TestFinishedHashingFile() { for _, test := range finishedFileTable { suite.T().Run(test.name, func(t *testing.T) { @@ -552,6 +591,7 @@ func (suite *HierarchyBuilderUnitSuite) SetupSuite() { suite.testPath = makePath( suite.T(), []string{testTenant, service, testUser, category, testInboxDir}, + false, ) } @@ -572,7 +612,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree() { user2 := "user2" user2Encoded := encodeAsPath(user2) - p2 := makePath(t, []string{tenant, service, user2, category, testInboxDir}) + p2 := makePath(t, []string{tenant, service, user2, category, testInboxDir}, false) // Encode user names here so we don't have to decode things later. expectedFileCount := map[string]int{ @@ -644,7 +684,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_MixedDirectory() subdir := "subfolder" - p2 := makePath(suite.T(), append(suite.testPath.Elements(), subdir)) + p2 := makePath(suite.T(), append(suite.testPath.Elements(), subdir), false) // Test multiple orders of items because right now order can matter. Both // orders result in a directory structure like: @@ -739,6 +779,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_Fails() { p2 := makePath( suite.T(), []string{"tenant2", service, "user2", category, testInboxDir}, + false, ) table := []struct { @@ -820,10 +861,12 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeErrors() { dirPath := makePath( suite.T(), []string{testTenant, service, testUser, category, testInboxDir}, + false, ) dirPath2 := makePath( suite.T(), []string{testTenant, service, testUser, category, testArchiveDir}, + false, ) table := []struct { @@ -898,10 +941,12 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSingleSubtree() { dirPath := makePath( suite.T(), []string{testTenant, service, testUser, category, testInboxDir}, + false, ) dirPath2 := makePath( suite.T(), []string{testTenant, service, testUser, category, testArchiveDir}, + false, ) // Must be a function that returns a new instance each time as StreamingFile @@ -1142,11 +1187,13 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto inboxPath := makePath( suite.T(), []string{testTenant, service, testUser, category, testInboxDir}, + false, ) personalPath := makePath( suite.T(), append(inboxPath.Elements(), personalDir), + false, ) personalFileName1 := testFileName personalFileName2 := testFileName2 @@ -1154,6 +1201,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto workPath := makePath( suite.T(), append(inboxPath.Elements(), workDir), + false, ) workFileName := testFileName3 @@ -1224,6 +1272,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto newPath := makePath( t, []string{testTenant, service, testUser, category, testInboxDir + "2"}, + false, ) mc := mockconnector.NewMockExchangeCollection(newPath, 0) @@ -1276,10 +1325,12 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto newInboxPath := makePath( t, []string{testTenant, service, testUser, category, testInboxDir + "2"}, + false, ) newWorkPath := makePath( t, []string{testTenant, service, testUser, category, workDir}, + false, ) inbox := mockconnector.NewMockExchangeCollection(newInboxPath, 0) @@ -1336,6 +1387,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto newWorkPath := makePath( t, []string{testTenant, service, testUser, category, workDir}, + false, ) inbox := mockconnector.NewMockExchangeCollection(inboxPath, 0) @@ -1411,6 +1463,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto newPersonalPath := makePath( t, []string{testTenant, service, testUser, category, personalDir}, + false, ) personal := mockconnector.NewMockExchangeCollection(newPersonalPath, 0) @@ -1464,6 +1517,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto newPersonalPath := makePath( t, []string{testTenant, service, testUser, category, workDir}, + false, ) personal := mockconnector.NewMockExchangeCollection(newPersonalPath, 2) @@ -1729,6 +1783,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSelectsCorrectSubt inboxPath := makePath( suite.T(), []string{testTenant, service, testUser, category, testInboxDir}, + false, ) inboxFileName1 := testFileName diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 95c46f7a2..51659b9e3 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -122,28 +122,29 @@ func (w Wrapper) BackupCollections( service path.ServiceType, oc *OwnersCats, tags map[string]string, -) (*BackupStats, *details.Details, error) { +) (*BackupStats, *details.Details, map[string]path.Path, error) { if w.c == nil { - return nil, nil, errNotConnected + return nil, nil, nil, errNotConnected } ctx, end := D.Span(ctx, "kopia:backupCollections") defer end() if len(collections) == 0 { - return &BackupStats{}, (&details.Builder{}).Details(), nil + return &BackupStats{}, (&details.Builder{}).Details(), nil, nil } progress := &corsoProgress{ pending: map[string]*itemDetails{}, deets: &details.Builder{}, + toMerge: map[string]path.Path{}, } // TODO(ashmrtn): Pass previousSnapshots here to enable building the directory // hierarchy with them. dirTree, err := inflateDirTree(ctx, w.c, nil, collections, progress) if err != nil { - return nil, nil, errors.Wrap(err, "building kopia directories") + return nil, nil, nil, errors.Wrap(err, "building kopia directories") } s, err := w.makeSnapshotWithRoot( @@ -155,10 +156,10 @@ func (w Wrapper) BackupCollections( progress, ) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - return s, progress.deets.Details(), nil + return s, progress.deets.Details(), progress.toMerge, nil } func (w Wrapper) makeSnapshotWithRoot( diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 9221e97ab..e0ee17c8a 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -265,7 +265,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { - stats, deets, err := suite.w.BackupCollections( + stats, deets, _, err := suite.w.BackupCollections( suite.ctx, prevSnaps, collections, @@ -346,7 +346,7 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { fp2, err := suite.testPath2.Append(dc2.Names[0], true) require.NoError(t, err) - stats, _, err := w.BackupCollections( + stats, _, _, err := w.BackupCollections( ctx, nil, []data.Collection{dc1, dc2}, @@ -428,7 +428,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { }, } - stats, deets, err := suite.w.BackupCollections( + stats, deets, _, err := suite.w.BackupCollections( suite.ctx, nil, collections, @@ -473,7 +473,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollectionsHandlesNoCollections() ctx, flush := tester.NewContext() defer flush() - s, d, err := suite.w.BackupCollections( + s, d, _, err := suite.w.BackupCollections( ctx, nil, test.collections, @@ -634,7 +634,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { }, } - stats, deets, err := suite.w.BackupCollections( + stats, deets, _, err := suite.w.BackupCollections( suite.ctx, nil, collections, diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 6edb25f38..ca657c171 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -149,7 +149,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { return opStats.readErr } - opStats.k, backupDetails, err = consumeBackupDataCollections( + opStats.k, backupDetails, _, err = consumeBackupDataCollections( ctx, op.kopia, tenantID, @@ -363,7 +363,7 @@ type backuper interface { service path.ServiceType, oc *kopia.OwnersCats, tags map[string]string, - ) (*kopia.BackupStats, *details.Details, error) + ) (*kopia.BackupStats, *details.Details, map[string]path.Path, error) } // calls kopia to backup the collections of data @@ -376,7 +376,7 @@ func consumeBackupDataCollections( mans []*kopia.ManifestEntry, cs []data.Collection, backupID model.StableID, -) (*kopia.BackupStats, *details.Details, error) { +) (*kopia.BackupStats, *details.Details, map[string]path.Path, error) { complete, closer := observe.MessageWithCompletion("Backing up data:") defer func() { complete <- struct{}{} @@ -397,7 +397,7 @@ func consumeBackupDataCollections( for _, reason := range m.Reasons { pb, err := builderFromReason(tenantID, reason) if err != nil { - return nil, nil, errors.Wrap(err, "getting subtree paths for bases") + return nil, nil, nil, errors.Wrap(err, "getting subtree paths for bases") } paths = append(paths, pb) diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index 74630c0b3..12995e00d 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -296,12 +296,12 @@ func (mbu mockBackuper) BackupCollections( service path.ServiceType, oc *kopia.OwnersCats, tags map[string]string, -) (*kopia.BackupStats, *details.Details, error) { +) (*kopia.BackupStats, *details.Details, map[string]path.Path, error) { if mbu.checkFunc != nil { mbu.checkFunc(bases, cs, service, oc, tags) } - return &kopia.BackupStats{}, &details.Details{}, nil + return &kopia.BackupStats{}, &details.Details{}, nil, nil } func (suite *BackupOpSuite) TestBackupOperation_ConsumeBackupDataCollections_Paths() { diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index 9938e0627..20e4262e9 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -73,7 +73,14 @@ func (ss *streamStore) WriteBackupDetails( }, } - backupStats, _, err := ss.kw.BackupCollections(ctx, nil, []data.Collection{dc}, ss.service, nil, nil) + backupStats, _, _, err := ss.kw.BackupCollections( + ctx, + nil, + []data.Collection{dc}, + ss.service, + nil, + nil, + ) if err != nil { return "", nil }