From b508ea3b72067337bf7ac93f9ee52cb29202cafa Mon Sep 17 00:00:00 2001 From: ashmrtn Date: Tue, 9 May 2023 18:50:38 -0700 Subject: [PATCH] Update kopia restore logic to take a (RepoRef, Collection Restore Path) pair (#3337) Begin expanding the restore logic to take a pair of paths, one denoting the precise location of the item in kopia and the other denoting the "restore location" or path the item should be placed at during restore This PR is not expected to change system functionality at all This is the first of 2 PRs to setup all the logic for this. This PR does not handle properly merging together multiple collections that have the same restore location but different RepoRefs due to recent updates to the kopia wrapper restore logic --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) * #3197 #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- src/internal/connector/onedrive/restore.go | 79 ++++- .../connector/onedrive/restore_test.go | 126 +++++++- src/internal/kopia/merge_collection.go | 112 +++++++ src/internal/kopia/merge_collection_test.go | 297 ++++++++++++++++++ src/internal/kopia/wrapper.go | 123 +++++--- src/internal/kopia/wrapper_test.go | 225 ++++++++++++- .../operations/backup_integration_test.go | 17 +- src/internal/operations/backup_test.go | 24 +- src/internal/operations/inject/inject.go | 2 +- src/internal/operations/manifests.go | 11 +- src/internal/operations/restore.go | 17 +- src/internal/streamstore/streamstore.go | 12 +- src/pkg/path/path.go | 7 + 13 files changed, 958 insertions(+), 94 deletions(-) create mode 100644 src/internal/kopia/merge_collection.go create mode 100644 src/internal/kopia/merge_collection_test.go diff --git a/src/internal/connector/onedrive/restore.go b/src/internal/connector/onedrive/restore.go index 0cff8b465..3f34cc9c4 100644 --- a/src/internal/connector/onedrive/restore.go +++ b/src/internal/connector/onedrive/restore.go @@ -781,17 +781,29 @@ func getMetadata(metar io.ReadCloser) (metadata.Metadata, error) { // Augment restore path to add extra files(meta) needed for restore as // well as do any other ordering operations on the paths -func AugmentRestorePaths(backupVersion int, paths []path.Path) ([]path.Path, error) { - colPaths := map[string]path.Path{} +// +// Only accepts StoragePath/RestorePath pairs where the RestorePath is +// at least as long as the StoragePath. If the RestorePath is longer than the +// StoragePath then the first few (closest to the root) directories will use +// default permissions during restore. +func AugmentRestorePaths( + backupVersion int, + paths []path.RestorePaths, +) ([]path.RestorePaths, error) { + // Keyed by each value's StoragePath.String() which corresponds to the RepoRef + // of the directory. + colPaths := map[string]path.RestorePaths{} for _, p := range paths { + first := true + for { - np, err := p.Dir() + sp, err := p.StoragePath.Dir() if err != nil { return nil, err } - drivePath, err := path.ToDrivePath(np) + drivePath, err := path.ToDrivePath(sp) if err != nil { return nil, err } @@ -800,8 +812,31 @@ func AugmentRestorePaths(backupVersion int, paths []path.Path) ([]path.Path, err break } - colPaths[np.String()] = np - p = np + if len(p.RestorePath.Elements()) < len(sp.Elements()) { + return nil, clues.New("restorePath shorter than storagePath"). + With("restore_path", p.RestorePath, "storage_path", sp) + } + + rp := p.RestorePath + + // Make sure the RestorePath always points to the level of the current + // collection. We need to track if it's the first iteration because the + // RestorePath starts out at the collection level to begin with. + if !first { + rp, err = p.RestorePath.Dir() + if err != nil { + return nil, err + } + } + + paths := path.RestorePaths{ + StoragePath: sp, + RestorePath: rp, + } + + colPaths[sp.String()] = paths + p = paths + first = false } } @@ -814,32 +849,45 @@ func AugmentRestorePaths(backupVersion int, paths []path.Path) ([]path.Path, err // As of now look up metadata for parent directories from a // collection. for _, p := range colPaths { - el := p.Elements() + el := p.StoragePath.Elements() if backupVersion >= version.OneDrive6NameInMeta { - mPath, err := p.Append(".dirmeta", true) + mPath, err := p.StoragePath.Append(".dirmeta", true) if err != nil { return nil, err } - paths = append(paths, mPath) + paths = append( + paths, + path.RestorePaths{StoragePath: mPath, RestorePath: p.RestorePath}) } else if backupVersion >= version.OneDrive4DirIncludesPermissions { - mPath, err := p.Append(el[len(el)-1]+".dirmeta", true) + mPath, err := p.StoragePath.Append(el[len(el)-1]+".dirmeta", true) if err != nil { return nil, err } - paths = append(paths, mPath) + paths = append( + paths, + path.RestorePaths{StoragePath: mPath, RestorePath: p.RestorePath}) } else if backupVersion >= version.OneDrive1DataAndMetaFiles { - pp, err := p.Dir() + pp, err := p.StoragePath.Dir() if err != nil { return nil, err } + mPath, err := pp.Append(el[len(el)-1]+".dirmeta", true) if err != nil { return nil, err } - paths = append(paths, mPath) + + prp, err := p.RestorePath.Dir() + if err != nil { + return nil, err + } + + paths = append( + paths, + path.RestorePaths{StoragePath: mPath, RestorePath: prp}) } } @@ -847,8 +895,11 @@ func AugmentRestorePaths(backupVersion int, paths []path.Path) ([]path.Path, err // files. This is only a necessity for OneDrive as we are storing // metadata for files/folders in separate meta files and we the // data to be restored before we can restore the metadata. + // + // This sorting assumes stuff in the same StoragePath directory end up in the + // same RestorePath collection. sort.Slice(paths, func(i, j int) bool { - return paths[i].String() < paths[j].String() + return paths[i].StoragePath.String() < paths[j].StoragePath.String() }) return paths, nil diff --git a/src/internal/connector/onedrive/restore_test.go b/src/internal/connector/onedrive/restore_test.go index 56e5d467b..c085d689f 100644 --- a/src/internal/connector/onedrive/restore_test.go +++ b/src/internal/connector/onedrive/restore_test.go @@ -172,20 +172,30 @@ func (suite *RestoreUnitSuite) TestAugmentRestorePaths() { base := "id/onedrive/user/files/drives/driveID/root:/" - inPaths := []path.Path{} + inPaths := []path.RestorePaths{} for _, ps := range test.input { p, err := path.FromDataLayerPath(base+ps, true) require.NoError(t, err, "creating path", clues.ToCore(err)) - inPaths = append(inPaths, p) + pd, err := p.Dir() + require.NoError(t, err, "creating collection path", clues.ToCore(err)) + + inPaths = append( + inPaths, + path.RestorePaths{StoragePath: p, RestorePath: pd}) } - outPaths := []path.Path{} + outPaths := []path.RestorePaths{} for _, ps := range test.output { p, err := path.FromDataLayerPath(base+ps, true) require.NoError(t, err, "creating path", clues.ToCore(err)) - outPaths = append(outPaths, p) + pd, err := p.Dir() + require.NoError(t, err, "creating collection path", clues.ToCore(err)) + + outPaths = append( + outPaths, + path.RestorePaths{StoragePath: p, RestorePath: pd}) } actual, err := AugmentRestorePaths(test.version, inPaths) @@ -197,3 +207,111 @@ func (suite *RestoreUnitSuite) TestAugmentRestorePaths() { }) } } + +// TestAugmentRestorePaths_DifferentRestorePath tests that RestorePath +// substitution works properly. Since it's only possible for future backup +// versions to need restore path substitution (i.e. due to storing folders by +// ID instead of name) this is only tested against the most recent backup +// version at the moment. +func (suite *RestoreUnitSuite) TestAugmentRestorePaths_DifferentRestorePath() { + // Adding a simple test here so that we can be sure that this + // function gets updated whenever we add a new version. + require.LessOrEqual(suite.T(), version.Backup, version.All8MigrateUserPNToID, "unsupported backup version") + + type pathPair struct { + storage string + restore string + } + + table := []struct { + name string + version int + input []pathPair + output []pathPair + errCheck assert.ErrorAssertionFunc + }{ + { + name: "nested folders", + version: version.Backup, + input: []pathPair{ + {storage: "folder-id/file.txt.data", restore: "folder"}, + {storage: "folder-id/folder2-id/file.txt.data", restore: "folder/folder2"}, + }, + output: []pathPair{ + {storage: "folder-id/.dirmeta", restore: "folder"}, + {storage: "folder-id/file.txt.data", restore: "folder"}, + {storage: "folder-id/folder2-id/.dirmeta", restore: "folder/folder2"}, + {storage: "folder-id/folder2-id/file.txt.data", restore: "folder/folder2"}, + }, + errCheck: assert.NoError, + }, + { + name: "restore path longer one folder", + version: version.Backup, + input: []pathPair{ + {storage: "folder-id/file.txt.data", restore: "corso_restore/folder"}, + }, + output: []pathPair{ + {storage: "folder-id/.dirmeta", restore: "corso_restore/folder"}, + {storage: "folder-id/file.txt.data", restore: "corso_restore/folder"}, + }, + errCheck: assert.NoError, + }, + { + name: "restore path shorter one folder", + version: version.Backup, + input: []pathPair{ + {storage: "folder-id/file.txt.data", restore: ""}, + }, + errCheck: assert.Error, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + _, flush := tester.NewContext() + defer flush() + + base := "id/onedrive/user/files/drives/driveID/root:/" + + inPaths := []path.RestorePaths{} + for _, ps := range test.input { + p, err := path.FromDataLayerPath(base+ps.storage, true) + require.NoError(t, err, "creating path", clues.ToCore(err)) + + r, err := path.FromDataLayerPath(base+ps.restore, false) + require.NoError(t, err, "creating path", clues.ToCore(err)) + + inPaths = append( + inPaths, + path.RestorePaths{StoragePath: p, RestorePath: r}) + } + + outPaths := []path.RestorePaths{} + for _, ps := range test.output { + p, err := path.FromDataLayerPath(base+ps.storage, true) + require.NoError(t, err, "creating path", clues.ToCore(err)) + + r, err := path.FromDataLayerPath(base+ps.restore, false) + require.NoError(t, err, "creating path", clues.ToCore(err)) + + outPaths = append( + outPaths, + path.RestorePaths{StoragePath: p, RestorePath: r}) + } + + actual, err := AugmentRestorePaths(test.version, inPaths) + test.errCheck(t, err, "augmenting paths", clues.ToCore(err)) + + if err != nil { + return + } + + // Ordering of paths matter here as we need dirmeta files + // to show up before file in dir + assert.Equal(t, outPaths, actual, "augmented paths") + }) + } +} diff --git a/src/internal/kopia/merge_collection.go b/src/internal/kopia/merge_collection.go new file mode 100644 index 000000000..ab95dead8 --- /dev/null +++ b/src/internal/kopia/merge_collection.go @@ -0,0 +1,112 @@ +package kopia + +import ( + "context" + "errors" + + "github.com/alcionai/clues" + "golang.org/x/exp/slices" + + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" +) + +var _ data.RestoreCollection = &mergeCollection{} + +type col struct { + storagePath string + data.RestoreCollection +} + +type mergeCollection struct { + cols []col + // Technically don't need to track this but it can help detect errors. + fullPath path.Path +} + +func (mc *mergeCollection) addCollection( + storagePath string, + c data.RestoreCollection, +) error { + if c == nil { + return clues.New("adding nil collection"). + With("current_path", mc.FullPath()) + } else if mc.FullPath().String() != c.FullPath().String() { + return clues.New("attempting to merge collection with different path"). + With("current_path", mc.FullPath(), "new_path", c.FullPath()) + } + + mc.cols = append(mc.cols, col{storagePath: storagePath, RestoreCollection: c}) + + // Keep a stable sorting of this merged collection set so we can say there's + // some deterministic behavior when Fetch is called. We don't expect to have + // to merge many collections. + slices.SortStableFunc(mc.cols, func(a, b col) bool { + return a.storagePath < b.storagePath + }) + + return nil +} + +func (mc mergeCollection) FullPath() path.Path { + return mc.fullPath +} + +func (mc *mergeCollection) Items( + ctx context.Context, + errs *fault.Bus, +) <-chan data.Stream { + res := make(chan data.Stream) + + go func() { + defer close(res) + + logger.Ctx(ctx).Infow( + "getting items for merged collection", + "merged_collection_count", len(mc.cols)) + + for _, c := range mc.cols { + // Unfortunately doesn't seem to be a way right now to see if the + // iteration failed and we should be exiting early. + ictx := clues.Add(ctx, "merged_collection_storage_path", c.storagePath) + logger.Ctx(ictx).Debug("sending items from merged collection") + + for item := range c.Items(ictx, errs) { + res <- item + } + } + }() + + return res +} + +// Fetch goes through all the collections in this one and returns the first +// match found or the first error that is not data.ErrNotFound. If multiple +// collections have the requested item, the instance in the collection with the +// lexicographically smallest storage path is returned. +func (mc *mergeCollection) Fetch( + ctx context.Context, + name string, +) (data.Stream, error) { + logger.Ctx(ctx).Infow( + "fetching item in merged collection", + "merged_collection_count", len(mc.cols)) + + for _, c := range mc.cols { + ictx := clues.Add(ctx, "merged_collection_storage_path", c.storagePath) + + logger.Ctx(ictx).Debug("looking for item in merged collection") + + s, err := c.Fetch(ictx, name) + if err == nil { + return s, nil + } else if err != nil && !errors.Is(err, data.ErrNotFound) { + return nil, clues.Wrap(err, "fetching from merged collection"). + WithClues(ictx) + } + } + + return nil, clues.Wrap(data.ErrNotFound, "merged collection fetch") +} diff --git a/src/internal/kopia/merge_collection_test.go b/src/internal/kopia/merge_collection_test.go new file mode 100644 index 000000000..e287452dc --- /dev/null +++ b/src/internal/kopia/merge_collection_test.go @@ -0,0 +1,297 @@ +package kopia + +import ( + "bytes" + "io" + "testing" + + "github.com/alcionai/clues" + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/fs/virtualfs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/connector/exchange/mock" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/path" +) + +type MergeCollectionUnitSuite struct { + tester.Suite +} + +func TestMergeCollectionUnitSuite(t *testing.T) { + suite.Run(t, &MergeCollectionUnitSuite{Suite: tester.NewUnitSuite(t)}) +} + +func (suite *MergeCollectionUnitSuite) TestReturnsPath() { + t := suite.T() + + pth, err := path.Build( + "a-tenant", + "a-user", + path.ExchangeService, + path.EmailCategory, + false, + "some", "path", "for", "data") + require.NoError(t, err, clues.ToCore(err)) + + c := mergeCollection{ + fullPath: pth, + } + + assert.Equal(t, pth, c.FullPath()) +} + +func (suite *MergeCollectionUnitSuite) TestItems() { + ctx, flush := tester.NewContext() + defer flush() + + t := suite.T() + storagePaths := []string{ + "tenant-id/exchange/user-id/mail/some/folder/path1", + "tenant-id/exchange/user-id/mail/some/folder/path2", + } + + expectedItemNames := []string{"1", "2"} + + pth, err := path.Build( + "a-tenant", + "a-user", + path.ExchangeService, + path.EmailCategory, + false, + "some", "path", "for", "data") + require.NoError(t, err, clues.ToCore(err)) + + c1 := mock.NewCollection(pth, nil, 1) + c1.Names[0] = expectedItemNames[0] + + c2 := mock.NewCollection(pth, nil, 1) + c2.Names[0] = expectedItemNames[1] + + // Not testing fetch here so safe to use this wrapper. + cols := []data.RestoreCollection{ + data.NotFoundRestoreCollection{Collection: c1}, + data.NotFoundRestoreCollection{Collection: c2}, + } + + dc := &mergeCollection{fullPath: pth} + + for i, c := range cols { + err := dc.addCollection(storagePaths[i], c) + require.NoError(t, err, "adding collection", clues.ToCore(err)) + } + + gotItemNames := []string{} + + for item := range dc.Items(ctx, fault.New(true)) { + gotItemNames = append(gotItemNames, item.UUID()) + } + + assert.ElementsMatch(t, expectedItemNames, gotItemNames) +} + +func (suite *MergeCollectionUnitSuite) TestAddCollection_DifferentPathFails() { + t := suite.T() + + pth1, err := path.Build( + "a-tenant", + "a-user", + path.ExchangeService, + path.EmailCategory, + false, + "some", "path", "for", "data") + require.NoError(t, err, clues.ToCore(err)) + + pth2, err := path.Build( + "a-tenant", + "a-user", + path.ExchangeService, + path.EmailCategory, + false, + "some", "path", "for", "data2") + require.NoError(t, err, clues.ToCore(err)) + + dc := mergeCollection{fullPath: pth1} + + err = dc.addCollection("some/path", &kopiaDataCollection{path: pth2}) + assert.Error(t, err, clues.ToCore(err)) +} + +func (suite *MergeCollectionUnitSuite) TestFetch() { + var ( + fileData1 = []byte("abcdefghijklmnopqrstuvwxyz") + fileData2 = []byte("zyxwvutsrqponmlkjihgfedcba") + fileData3 = []byte("foo bar baz") + + fileName1 = "file1" + fileName2 = "file2" + fileLookupErrName = "errLookup" + fileOpenErrName = "errOpen" + + colPaths = []string{ + "tenant-id/exchange/user-id/mail/some/data/directory1", + "tenant-id/exchange/user-id/mail/some/data/directory2", + } + ) + + pth, err := path.Build( + "a-tenant", + "a-user", + path.ExchangeService, + path.EmailCategory, + false, + "some", "path", "for", "data") + require.NoError(suite.T(), err, clues.ToCore(err)) + + // Needs to be a function so the readers get refreshed each time. + layouts := []func() fs.Directory{ + // Has the following; + // - file1: data[0] + // - errOpen: (error opening file) + func() fs.Directory { + return virtualfs.NewStaticDirectory(encodeAsPath(colPaths[0]), []fs.Entry{ + &mockFile{ + StreamingFile: virtualfs.StreamingFileFromReader( + encodeAsPath(fileName1), + nil, + ), + r: newBackupStreamReader( + serializationVersion, + io.NopCloser(bytes.NewReader(fileData1)), + ), + size: int64(len(fileData1) + versionSize), + }, + &mockFile{ + StreamingFile: virtualfs.StreamingFileFromReader( + encodeAsPath(fileOpenErrName), + nil, + ), + openErr: assert.AnError, + }, + }) + }, + + // Has the following; + // - file1: data[1] + // - file2: data[0] + // - errOpen: data[2] + func() fs.Directory { + return virtualfs.NewStaticDirectory(encodeAsPath(colPaths[1]), []fs.Entry{ + &mockFile{ + StreamingFile: virtualfs.StreamingFileFromReader( + encodeAsPath(fileName1), + nil, + ), + r: newBackupStreamReader( + serializationVersion, + io.NopCloser(bytes.NewReader(fileData2)), + ), + size: int64(len(fileData2) + versionSize), + }, + &mockFile{ + StreamingFile: virtualfs.StreamingFileFromReader( + encodeAsPath(fileName2), + nil, + ), + r: newBackupStreamReader( + serializationVersion, + io.NopCloser(bytes.NewReader(fileData1)), + ), + size: int64(len(fileData1) + versionSize), + }, + &mockFile{ + StreamingFile: virtualfs.StreamingFileFromReader( + encodeAsPath(fileOpenErrName), + nil, + ), + r: newBackupStreamReader( + serializationVersion, + io.NopCloser(bytes.NewReader(fileData3)), + ), + size: int64(len(fileData3) + versionSize), + }, + }) + }, + } + + table := []struct { + name string + fileName string + expectError assert.ErrorAssertionFunc + expectData []byte + notFoundErr bool + }{ + { + name: "Duplicate File, first collection", + fileName: fileName1, + expectError: assert.NoError, + expectData: fileData1, + }, + { + name: "Distinct File, second collection", + fileName: fileName2, + expectError: assert.NoError, + expectData: fileData1, + }, + { + name: "Error opening file", + fileName: fileOpenErrName, + expectError: assert.Error, + }, + { + name: "File not found", + fileName: fileLookupErrName, + expectError: assert.Error, + notFoundErr: true, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + ctx, flush := tester.NewContext() + defer flush() + + t := suite.T() + c := &i64counter{} + + dc := mergeCollection{fullPath: pth} + + for i, layout := range layouts { + col := &kopiaDataCollection{ + path: pth, + dir: layout(), + counter: c, + expectedVersion: serializationVersion, + } + + err := dc.addCollection(colPaths[i], col) + require.NoError(t, err, "adding collection", clues.ToCore(err)) + } + + s, err := dc.Fetch(ctx, test.fileName) + test.expectError(t, err, clues.ToCore(err)) + + if err != nil { + if test.notFoundErr { + assert.ErrorIs(t, err, data.ErrNotFound, clues.ToCore(err)) + } + + return + } + + fileData, err := io.ReadAll(s.ToReader()) + require.NoError(t, err, "reading file data", clues.ToCore(err)) + + if err != nil { + return + } + + assert.Equal(t, test.expectData, fileData) + }) + } +} diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index e35d61cb6..e4d73bb4c 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -365,6 +365,11 @@ type ByteCounter interface { Count(numBytes int64) } +type restoreCollection struct { + restorePath path.Path + storageDirs map[string]*dirAndItems +} + type dirAndItems struct { dir path.Path items []string @@ -380,7 +385,7 @@ func loadDirsAndItems( ctx context.Context, snapshotRoot fs.Entry, bcounter ByteCounter, - toLoad map[string]*dirAndItems, + toLoad map[string]*restoreCollection, bus *fault.Bus, ) ([]data.RestoreCollection, error) { var ( @@ -389,50 +394,67 @@ func loadDirsAndItems( loadCount = 0 ) - for _, dirItems := range toLoad { + for _, col := range toLoad { if el.Failure() != nil { return nil, el.Failure() } - ictx := clues.Add(ctx, "directory_path", dirItems.dir) + ictx := clues.Add(ctx, "restore_path", col.restorePath) - dir, err := getDir(ictx, dirItems.dir, snapshotRoot) - if err != nil { - el.AddRecoverable(clues.Wrap(err, "loading directory"). - WithClues(ictx). - Label(fault.LabelForceNoBackupCreation)) + mergeCol := &mergeCollection{fullPath: col.restorePath} + res = append(res, mergeCol) - continue - } - - dc := &kopiaDataCollection{ - path: dirItems.dir, - dir: dir, - counter: bcounter, - expectedVersion: serializationVersion, - } - - res = append(res, dc) - - for _, item := range dirItems.items { + for _, dirItems := range col.storageDirs { if el.Failure() != nil { return nil, el.Failure() } - err := dc.addStream(ictx, item) + ictx = clues.Add(ictx, "storage_directory_path", dirItems.dir) + + dir, err := getDir(ictx, dirItems.dir, snapshotRoot) if err != nil { - el.AddRecoverable(clues.Wrap(err, "loading item"). + el.AddRecoverable(clues.Wrap(err, "loading storage directory"). WithClues(ictx). Label(fault.LabelForceNoBackupCreation)) continue } - loadCount++ - if loadCount%1000 == 0 { - logger.Ctx(ctx).Infow( - "loading items from kopia", - "loaded_items", loadCount) + dc := &kopiaDataCollection{ + path: col.restorePath, + dir: dir, + counter: bcounter, + expectedVersion: serializationVersion, + } + + if err := mergeCol.addCollection(dirItems.dir.String(), dc); err != nil { + el.AddRecoverable(clues.Wrap(err, "adding collection to merge collection"). + WithClues(ctx). + Label(fault.LabelForceNoBackupCreation)) + + continue + } + + for _, item := range dirItems.items { + if el.Failure() != nil { + return nil, el.Failure() + } + + err := dc.addStream(ictx, item) + if err != nil { + el.AddRecoverable(clues.Wrap(err, "loading item"). + WithClues(ictx). + Label(fault.LabelForceNoBackupCreation)) + + continue + } + + loadCount++ + if loadCount%1000 == 0 { + logger.Ctx(ctx).Infow( + "loading items from kopia", + "loaded_items", loadCount) + } } } } @@ -454,7 +476,7 @@ func loadDirsAndItems( func (w Wrapper) ProduceRestoreCollections( ctx context.Context, snapshotID string, - paths []path.Path, + paths []path.RestorePaths, bcounter ByteCounter, errs *fault.Bus, ) ([]data.RestoreCollection, error) { @@ -474,36 +496,53 @@ func (w Wrapper) ProduceRestoreCollections( var ( loadCount int - // Directory path -> set of items to load from the directory. - dirsToItems = map[string]*dirAndItems{} + // RestorePath -> []StoragePath directory -> set of items to load from the + // directory. + dirsToItems = map[string]*restoreCollection{} el = errs.Local() ) - for _, itemPath := range paths { + for _, itemPaths := range paths { if el.Failure() != nil { return nil, el.Failure() } - // Group things by directory so we can load all items from a single - // directory instance lower down. - ictx := clues.Add(ctx, "item_path", itemPath.String()) + // Group things by RestorePath and then StoragePath so we can load multiple + // items from a single directory instance lower down. + ictx := clues.Add( + ctx, + "item_path", itemPaths.StoragePath.String(), + "restore_path", itemPaths.RestorePath.String()) - parentPath, err := itemPath.Dir() + parentStoragePath, err := itemPaths.StoragePath.Dir() if err != nil { - el.AddRecoverable(clues.Wrap(err, "making directory collection"). + el.AddRecoverable(clues.Wrap(err, "getting storage directory path"). WithClues(ictx). Label(fault.LabelForceNoBackupCreation)) continue } - di := dirsToItems[parentPath.ShortRef()] - if di == nil { - dirsToItems[parentPath.ShortRef()] = &dirAndItems{dir: parentPath} - di = dirsToItems[parentPath.ShortRef()] + // Find the location this item is restored to. + rc := dirsToItems[itemPaths.RestorePath.ShortRef()] + if rc == nil { + dirsToItems[itemPaths.RestorePath.ShortRef()] = &restoreCollection{ + restorePath: itemPaths.RestorePath, + storageDirs: map[string]*dirAndItems{}, + } + rc = dirsToItems[itemPaths.RestorePath.ShortRef()] } - di.items = append(di.items, itemPath.Item()) + // Find the collection this item is sourced from. + di := rc.storageDirs[parentStoragePath.ShortRef()] + if di == nil { + rc.storageDirs[parentStoragePath.ShortRef()] = &dirAndItems{ + dir: parentStoragePath, + } + di = rc.storageDirs[parentStoragePath.ShortRef()] + } + + di.items = append(di.items, itemPaths.StoragePath.Item()) loadCount++ if loadCount%1000 == 0 { diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 67540aec7..abe96fdc2 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -59,14 +59,12 @@ var ( testFileData6 = testFileData ) -//revive:disable:context-as-argument func testForFiles( t *testing.T, - ctx context.Context, + ctx context.Context, //revive:disable-line:context-as-argument expected map[string][]byte, collections []data.RestoreCollection, ) { - //revive:enable:context-as-argument t.Helper() count := 0 @@ -107,6 +105,19 @@ func checkSnapshotTags( assert.Equal(t, expectedTags, man.Tags) } +func toRestorePaths(t *testing.T, paths ...path.Path) []path.RestorePaths { + res := make([]path.RestorePaths, 0, len(paths)) + + for _, p := range paths { + dir, err := p.Dir() + require.NoError(t, err, clues.ToCore(err)) + + res = append(res, path.RestorePaths{StoragePath: p, RestorePath: dir}) + } + + return res +} + // --------------- // unit tests // --------------- @@ -705,10 +716,7 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { result, err := w.ProduceRestoreCollections( ctx, string(stats.SnapshotID), - []path.Path{ - fp1, - fp2, - }, + toRestorePaths(t, fp1, fp2), nil, fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -838,7 +846,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { _, err = suite.w.ProduceRestoreCollections( suite.ctx, string(stats.SnapshotID), - []path.Path{failedPath}, + toRestorePaths(t, failedPath), &ic, fault.New(true)) // Files that had an error shouldn't make a dir entry in kopia. If they do we @@ -1219,9 +1227,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { _, err = suite.w.ProduceRestoreCollections( suite.ctx, string(stats.SnapshotID), - []path.Path{ - suite.files[suite.testPath1.String()][0].itemPath, - }, + toRestorePaths(t, suite.files[suite.testPath1.String()][0].itemPath), &ic, fault.New(true)) test.restoreCheck(t, err, clues.ToCore(err)) @@ -1322,7 +1328,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections() { result, err := suite.w.ProduceRestoreCollections( suite.ctx, string(suite.snapshotID), - test.inputPaths, + toRestorePaths(t, test.inputPaths...), &ic, fault.New(true)) test.expectedErr(t, err, clues.ToCore(err)) @@ -1338,6 +1344,193 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections() { } } +// TestProduceRestoreCollections_PathChanges tests that having different +// Restore and Storage paths works properly. Having the same Restore and Storage +// paths is tested by TestProduceRestoreCollections. +func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_PathChanges() { + rp1, err := path.Build( + testTenant, + testUser, + path.ExchangeService, + path.EmailCategory, + false, + "corso_restore", "Inbox") + require.NoError(suite.T(), err) + + rp2, err := path.Build( + testTenant, + testUser, + path.ExchangeService, + path.EmailCategory, + false, + "corso_restore", "Archive") + require.NoError(suite.T(), err) + + // Expected items is generated during the test by looking up paths in the + // suite's map of files. + table := []struct { + name string + inputPaths []path.RestorePaths + expectedCollections int + }{ + { + name: "SingleItem", + inputPaths: []path.RestorePaths{ + { + StoragePath: suite.files[suite.testPath1.String()][0].itemPath, + RestorePath: rp1, + }, + }, + expectedCollections: 1, + }, + { + name: "MultipleItemsSameCollection", + inputPaths: []path.RestorePaths{ + { + StoragePath: suite.files[suite.testPath1.String()][0].itemPath, + RestorePath: rp1, + }, + { + StoragePath: suite.files[suite.testPath1.String()][1].itemPath, + RestorePath: rp1, + }, + }, + expectedCollections: 1, + }, + { + name: "MultipleItemsDifferentCollections", + inputPaths: []path.RestorePaths{ + { + StoragePath: suite.files[suite.testPath1.String()][0].itemPath, + RestorePath: rp1, + }, + { + StoragePath: suite.files[suite.testPath2.String()][0].itemPath, + RestorePath: rp2, + }, + }, + expectedCollections: 2, + }, + { + name: "Multiple Items From Different Collections To Same Collection", + inputPaths: []path.RestorePaths{ + { + StoragePath: suite.files[suite.testPath1.String()][0].itemPath, + RestorePath: rp1, + }, + { + StoragePath: suite.files[suite.testPath2.String()][0].itemPath, + RestorePath: rp1, + }, + }, + expectedCollections: 1, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + ctx, flush := tester.NewContext() + defer flush() + + t := suite.T() + expected := make(map[string][]byte, len(test.inputPaths)) + + for _, pth := range test.inputPaths { + item, ok := suite.filesByPath[pth.StoragePath.String()] + require.True(t, ok, "getting expected file data") + + itemPath, err := pth.RestorePath.Append(pth.StoragePath.Item(), true) + require.NoError(t, err, "getting expected item path") + + expected[itemPath.String()] = item.data + } + + ic := i64counter{} + + result, err := suite.w.ProduceRestoreCollections( + suite.ctx, + string(suite.snapshotID), + test.inputPaths, + &ic, + fault.New(true)) + require.NoError(t, err, clues.ToCore(err)) + + assert.Len(t, result, test.expectedCollections) + assert.Less(t, int64(0), ic.i) + testForFiles(t, ctx, expected, result) + }) + } +} + +// TestProduceRestoreCollections_Fetch tests that the Fetch function still works +// properly even with different Restore and Storage paths and items from +// different kopia directories. +func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_Fetch() { + ctx, flush := tester.NewContext() + defer flush() + + t := suite.T() + + rp1, err := path.Build( + testTenant, + testUser, + path.ExchangeService, + path.EmailCategory, + false, + "corso_restore", "Inbox") + require.NoError(suite.T(), err) + + inputPaths := []path.RestorePaths{ + { + StoragePath: suite.files[suite.testPath1.String()][0].itemPath, + RestorePath: rp1, + }, + { + StoragePath: suite.files[suite.testPath2.String()][0].itemPath, + RestorePath: rp1, + }, + } + + // Really only interested in getting the collection so we can call fetch on + // it. + ic := i64counter{} + + result, err := suite.w.ProduceRestoreCollections( + suite.ctx, + string(suite.snapshotID), + inputPaths, + &ic, + fault.New(true)) + require.NoError(t, err, "getting collection", clues.ToCore(err)) + require.Len(t, result, 1) + + // Item from first kopia directory. + f := suite.files[suite.testPath1.String()][0] + + item, err := result[0].Fetch(ctx, f.itemPath.Item()) + require.NoError(t, err, "fetching file", clues.ToCore(err)) + + r := item.ToReader() + + buf, err := io.ReadAll(r) + require.NoError(t, err, "reading file data", clues.ToCore(err)) + + assert.Equal(t, f.data, buf) + + // Item from second kopia directory. + f = suite.files[suite.testPath2.String()][0] + + item, err = result[0].Fetch(ctx, f.itemPath.Item()) + require.NoError(t, err, "fetching file", clues.ToCore(err)) + + r = item.ToReader() + + buf, err = io.ReadAll(r) + require.NoError(t, err, "reading file data", clues.ToCore(err)) + + assert.Equal(t, f.data, buf) +} + func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_Errors() { itemPath, err := suite.testPath1.Append(testFileName, true) require.NoError(suite.T(), err, clues.ToCore(err)) @@ -1345,7 +1538,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_Erro table := []struct { name string snapshotID string - paths []path.Path + paths []path.RestorePaths }{ { "NilPaths", @@ -1355,12 +1548,12 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_Erro { "EmptyPaths", string(suite.snapshotID), - []path.Path{}, + []path.RestorePaths{}, }, { "NoSnapshot", "foo", - []path.Path{itemPath}, + toRestorePaths(suite.T(), itemPath), }, } @@ -1393,7 +1586,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestDeleteSnapshot() { c, err := suite.w.ProduceRestoreCollections( suite.ctx, string(suite.snapshotID), - []path.Path{itemPath}, + toRestorePaths(t, itemPath), &ic, fault.New(true)) assert.Error(t, err, "snapshot should be deleted", clues.ToCore(err)) diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index 38a28ac86..fefbb5dde 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -248,10 +248,9 @@ func checkBackupIsInManifests( } } -//revive:disable:context-as-argument func checkMetadataFilesExist( t *testing.T, - ctx context.Context, + ctx context.Context, //revive:disable-line:context-as-argument backupID model.StableID, kw *kopia.Wrapper, ms *kopia.ModelStore, @@ -259,7 +258,6 @@ func checkMetadataFilesExist( service path.ServiceType, filesByCat map[path.CategoryType][]string, ) { - //revive:enable:context-as-argument for category, files := range filesByCat { t.Run(category.String(), func(t *testing.T) { bup := &backup.Backup{} @@ -269,7 +267,7 @@ func checkMetadataFilesExist( return } - paths := []path.Path{} + paths := []path.RestorePaths{} pathsByRef := map[string][]string{} for _, fName := range files { @@ -285,11 +283,18 @@ func checkMetadataFilesExist( continue } - paths = append(paths, p) + paths = append( + paths, + path.RestorePaths{StoragePath: p, RestorePath: dir}) pathsByRef[dir.ShortRef()] = append(pathsByRef[dir.ShortRef()], fName) } - cols, err := kw.ProduceRestoreCollections(ctx, bup.SnapshotID, paths, nil, fault.New(true)) + cols, err := kw.ProduceRestoreCollections( + ctx, + bup.SnapshotID, + paths, + nil, + fault.New(true)) assert.NoError(t, err, clues.ToCore(err)) for _, col := range cols { diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index ea710fcf3..1928dfc66 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -46,16 +46,28 @@ type mockRestoreProducer struct { onRestore restoreFunc } -type restoreFunc func(id string, ps []path.Path) ([]data.RestoreCollection, error) +type restoreFunc func( + id string, + ps []path.RestorePaths, +) ([]data.RestoreCollection, error) func (mr *mockRestoreProducer) buildRestoreFunc( t *testing.T, oid string, ops []path.Path, ) { - mr.onRestore = func(id string, ps []path.Path) ([]data.RestoreCollection, error) { + mr.onRestore = func( + id string, + ps []path.RestorePaths, + ) ([]data.RestoreCollection, error) { + gotPaths := make([]path.Path, 0, len(ps)) + + for _, rp := range ps { + gotPaths = append(gotPaths, rp.StoragePath) + } + assert.Equal(t, oid, id, "manifest id") - checkPaths(t, ops, ps) + checkPaths(t, ops, gotPaths) return mr.colls, mr.err } @@ -64,11 +76,13 @@ func (mr *mockRestoreProducer) buildRestoreFunc( func (mr *mockRestoreProducer) ProduceRestoreCollections( ctx context.Context, snapshotID string, - paths []path.Path, + paths []path.RestorePaths, bc kopia.ByteCounter, errs *fault.Bus, ) ([]data.RestoreCollection, error) { - mr.gotPaths = append(mr.gotPaths, paths...) + for _, ps := range paths { + mr.gotPaths = append(mr.gotPaths, ps.StoragePath) + } if mr.onRestore != nil { return mr.onRestore(snapshotID, paths) diff --git a/src/internal/operations/inject/inject.go b/src/internal/operations/inject/inject.go index 41f934692..55c472f7c 100644 --- a/src/internal/operations/inject/inject.go +++ b/src/internal/operations/inject/inject.go @@ -47,7 +47,7 @@ type ( ProduceRestoreCollections( ctx context.Context, snapshotID string, - paths []path.Path, + paths []path.RestorePaths, bc kopia.ByteCounter, errs *fault.Bus, ) ([]data.RestoreCollection, error) diff --git a/src/internal/operations/manifests.go b/src/internal/operations/manifests.go index a402808f2..16e2029f9 100644 --- a/src/internal/operations/manifests.go +++ b/src/internal/operations/manifests.go @@ -308,7 +308,7 @@ func collectMetadata( tenantID string, errs *fault.Bus, ) ([]data.RestoreCollection, error) { - paths := []path.Path{} + paths := []path.RestorePaths{} for _, fn := range fileNames { for _, reason := range man.Reasons { @@ -326,7 +326,14 @@ func collectMetadata( With("metadata_file", fn, "category", reason.Category) } - paths = append(paths, p) + dir, err := p.Dir() + if err != nil { + return nil, clues. + Wrap(err, "building metadata collection path"). + With("metadata_file", fn, "category", reason.Category) + } + + paths = append(paths, path.RestorePaths{StoragePath: p, RestorePath: dir}) } } diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 370869801..2dd5cd40c 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -349,7 +349,7 @@ func formatDetailsForRestoration( sel selectors.Selector, deets *details.Details, errs *fault.Bus, -) ([]path.Path, error) { +) ([]path.RestorePaths, error) { fds, err := sel.Reduce(ctx, deets, errs) if err != nil { return nil, err @@ -357,7 +357,7 @@ func formatDetailsForRestoration( var ( fdsPaths = fds.Paths() - paths = make([]path.Path, len(fdsPaths)) + paths = make([]path.RestorePaths, len(fdsPaths)) shortRefs = make([]string, len(fdsPaths)) el = errs.Local() ) @@ -377,7 +377,18 @@ func formatDetailsForRestoration( continue } - paths[i] = p + dir, err := p.Dir() + if err != nil { + el.AddRecoverable(clues. + Wrap(err, "getting restore directory after reduction"). + WithClues(ctx). + With("path", fdsPaths[i])) + + continue + } + + paths[i].StoragePath = p + paths[i].RestorePath = dir shortRefs[i] = p.ShortRef() } diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index bc86687ef..146f0d1c7 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -262,12 +262,22 @@ func read( return clues.Stack(err).WithClues(ctx) } + pd, err := p.Dir() + if err != nil { + return clues.Stack(err).WithClues(ctx) + } + ctx = clues.Add(ctx, "snapshot_id", snapshotID) cs, err := rer.ProduceRestoreCollections( ctx, snapshotID, - []path.Path{p}, + []path.RestorePaths{ + { + StoragePath: p, + RestorePath: pd, + }, + }, &stats.ByteCounter{}, errs) if err != nil { diff --git a/src/pkg/path/path.go b/src/pkg/path/path.go index 52daa1e87..79a14ea95 100644 --- a/src/pkg/path/path.go +++ b/src/pkg/path/path.go @@ -130,6 +130,13 @@ var ( _ fmt.Stringer = &Builder{} ) +// RestorePaths denotes the location to find an item in kopia and the path of +// the collection to place the item in for restore. +type RestorePaths struct { + StoragePath Path + RestorePath Path +} + // Builder is a simple path representation that only tracks path elements. It // can join, escape, and unescape elements. Higher-level packages are expected // to wrap this struct to build resource-specific contexts (e.x. an