From 2415addd05576f96f85c97c30de48b5b58e4a0b6 Mon Sep 17 00:00:00 2001 From: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Date: Mon, 11 Jul 2022 11:22:28 -0700 Subject: [PATCH] Allow getting data from kopia for a directory subtree (#299) * Implement getting data for directory subtree Return a slice of collections with data for a given directory subtree in kopia. Traverse the full directory before creating a DataCollection instead of sending items as they are found because future implementations may cause blocking on send. This could reduce parallelism because the code won't be able to find other directories to traverse until the files are seen. Kopia also currently loads the entire directory at once so there's not much benefit to streaming. System will now continuing pulling data until completion and report all errors at the end of the run. * Tests for getting persisted subtree data including some error cases --- src/internal/kopia/wrapper.go | 134 ++++++++++++- src/internal/kopia/wrapper_test.go | 298 +++++++++++++++++++++++++++-- 2 files changed, 414 insertions(+), 18 deletions(-) diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 9390fa543..87d83e3e7 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -4,6 +4,7 @@ import ( "context" "path" + "github.com/hashicorp/go-multierror" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/virtualfs" "github.com/kopia/kopia/repo" @@ -15,6 +16,7 @@ import ( "github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/pkg/backup" + "github.com/alcionai/corso/pkg/logger" ) const ( @@ -340,14 +342,23 @@ func (w Wrapper) collectItems( // The paths passed below is the path up to (but not including) the // file/directory passed. if isDirectory { - return nil, errors.New("directory restore not implemented") + dir, ok := e.(fs.Directory) + if !ok { + return nil, errors.New("requested object is not a directory") + } + + c, err := restoreSubtree(ctx, dir, itemPath[:len(itemPath)-1]) + // For some reason tests error out if the multierror is nil but we don't + // call ErrorOrNil. + return c, err.ErrorOrNil() } f, ok := e.(fs.File) if !ok { return nil, errors.New("requested object is not a file") } - c, err := w.restoreSingleItem(ctx, f, itemPath[:len(itemPath)-1]) + + c, err := restoreSingleItem(ctx, f, itemPath[:len(itemPath)-1]) if err != nil { return nil, err } @@ -382,7 +393,7 @@ func (w Wrapper) RestoreSingleItem( // does not exist in kopia or is not a file an error is returned. The UUID of // the returned DataStreams will be the name of the kopia file the data is // sourced from. -func (w Wrapper) restoreSingleItem( +func restoreSingleItem( ctx context.Context, f fs.File, itemPath []string, @@ -402,3 +413,120 @@ func (w Wrapper) restoreSingleItem( path: itemPath, }, nil } + +func walkDirectory( + ctx context.Context, + dir fs.Directory, +) ([]fs.File, []fs.Directory, *multierror.Error) { + files := []fs.File{} + dirs := []fs.Directory{} + var errs *multierror.Error + + err := dir.IterateEntries(ctx, func(innerCtx context.Context, e fs.Entry) error { + // Early exit on context cancel. + if err := innerCtx.Err(); err != nil { + return err + } + + switch e.(type) { + case fs.Directory: + d := e.(fs.Directory) + dirs = append(dirs, d) + case fs.File: + f := e.(fs.File) + files = append(files, f) + default: + errs = multierror.Append(errs, errors.Errorf("unexpected item type %T", e)) + logger.Ctx(ctx).Warnf("unexpected item of type %T; skipping", e) + } + + return nil + }) + + if err != nil { + // If the iterator itself had an error add it to the list. + errs = multierror.Append(errs, errors.Wrap(err, "getting directory data")) + } + + return files, dirs, errs +} + +// restoreSubtree returns DataCollections for each subdirectory (or the +// directory itself) that contains files. The FullPath of each returned +// DataCollection is the path from the root of the kopia directory structure to +// the directory. The UUID of each DataStream in each DataCollection is the name +// of the kopia file the data is sourced from. +func restoreSubtree( + ctx context.Context, + dir fs.Directory, + relativePath []string, +) ([]connector.DataCollection, *multierror.Error) { + collections := []connector.DataCollection{} + // Want a local copy of relativePath with our new element. + fullPath := append(append([]string{}, relativePath...), dir.Name()) + var errs *multierror.Error + + files, dirs, err := walkDirectory(ctx, dir) + if err != nil { + errs = multierror.Append( + errs, errors.Wrapf(err, "walking directory %q", path.Join(fullPath...))) + } + + if len(files) > 0 { + if ctxErr := ctx.Err(); ctxErr != nil { + errs = multierror.Append(errs, errors.WithStack(ctxErr)) + return nil, errs + } + + streams := make([]connector.DataStream, 0, len(files)) + + for _, f := range files { + r, err := f.Open(ctx) + if err != nil { + fileFullPath := path.Join(append(append([]string{}, fullPath...), f.Name())...) + errs = multierror.Append( + errs, errors.Wrapf(err, "getting reader for file %q", fileFullPath)) + logger.Ctx(ctx).Warnf("skipping file %q", fileFullPath) + continue + } + + streams = append(streams, &kopiaDataStream{ + reader: r, + uuid: f.Name(), + }) + } + + collections = append(collections, &kopiaDataCollection{ + streams: streams, + path: fullPath, + }) + } + + for _, d := range dirs { + if ctxErr := ctx.Err(); ctxErr != nil { + errs = multierror.Append(errs, errors.WithStack(ctxErr)) + return nil, errs + } + + c, err := restoreSubtree(ctx, d, fullPath) + if err != nil { + errs = multierror.Append(errs, errors.Wrapf( + err, + "traversing subdirectory %q", + path.Join(append(append([]string{}, fullPath...), d.Name())...), + )) + } + + collections = append(collections, c...) + } + + return collections, errs +} + +func (w Wrapper) RestoreDirectory( + ctx context.Context, + snapshotID string, + basePath []string, +) ([]connector.DataCollection, error) { + return w.collectItems(ctx, snapshotID, basePath, true) +} diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 329470847..22d5f4a15 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -5,9 +5,11 @@ import ( "context" "io" "io/ioutil" + "path" "testing" "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/fs/virtualfs" "github.com/kopia/kopia/repo/manifest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -15,20 +17,36 @@ import ( "github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/internal/connector/mockconnector" + "github.com/alcionai/corso/internal/kopia/mockkopia" ctesting "github.com/alcionai/corso/internal/testing" "github.com/alcionai/corso/pkg/backup" ) const ( - testTenant = "a-tenant" - testUser = "user1" - testEmailDir = "mail" - testFileUUID = "a-file" + testTenant = "a-tenant" + testUser = "user1" + testEmailDir = "mail" + testInboxDir = "inbox" + testArchiveDir = "archive" + testFileName = "file1" + testFileName2 = "file2" + testFileName3 = "file3" + testFileName4 = "file4" + testFileName5 = "file5" + testFileName6 = "file6" ) var ( - testPath = []string{testTenant, testUser, testEmailDir} - testFileData = []byte("abcdefghijklmnopqrstuvwxyz") + testPath = []string{testTenant, testUser, testEmailDir, testInboxDir} + testPath2 = []string{testTenant, testUser, testEmailDir, testArchiveDir} + testFileData = []byte("abcdefghijklmnopqrstuvwxyz") + testFileData2 = []byte("zyxwvutsrqponmlkjihgfedcba") + testFileData3 = []byte("foo") + testFileData4 = []byte("bar") + testFileData5 = []byte("baz") + // Intentional duplicate to make sure all files are scanned during recovery + // (contrast to behavior of snapshotfs.TreeWalker). + testFileData6 = testFileData ) func entriesToNames(entries []fs.Entry) []string { @@ -41,6 +59,36 @@ func entriesToNames(entries []fs.Entry) []string { return res } +func testForFiles( + t *testing.T, + expected map[string][]byte, + collections []connector.DataCollection, +) { + count := 0 + for _, c := range collections { + for s := range c.Items() { + count++ + + fullPath := path.Join(append(c.FullPath(), s.UUID())...) + + expected, ok := expected[fullPath] + require.True( + t, + ok, + "unexpected file with path %q", + path.Join(append(c.FullPath(), fullPath)...), + ) + + buf, err := ioutil.ReadAll(s.ToReader()) + require.NoError(t, err) + + assert.Equal(t, expected, buf) + } + } + + assert.Equal(t, len(expected), count) +} + // --------------- // unit tests // --------------- @@ -222,6 +270,102 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() { } } +func (suite *KopiaUnitSuite) TestRestoreItem() { + ctx := context.Background() + + file := &mockkopia.MockFile{ + Entry: &mockkopia.MockEntry{ + EntryName: testFileName2, + EntryMode: mockkopia.DefaultPermissions, + }, + OpenErr: assert.AnError, + } + + _, err := restoreSingleItem(ctx, file, nil) + assert.Error(suite.T(), err) +} + +func (suite *KopiaUnitSuite) TestRestoreDirectory_FailGettingReader() { + ctx := context.Background() + t := suite.T() + + expectedStreamData := map[string][]byte{ + path.Join(testInboxDir, testFileName): testFileData, + path.Join(testInboxDir, testFileName3): testFileData3, + } + + dirs := virtualfs.NewStaticDirectory(testInboxDir, []fs.Entry{ + &mockkopia.MockFile{ + Entry: &mockkopia.MockEntry{ + EntryName: testFileName, + EntryMode: mockkopia.DefaultPermissions, + }, + Data: testFileData, + }, + &mockkopia.MockFile{ + Entry: &mockkopia.MockEntry{ + EntryName: testFileName2, + EntryMode: mockkopia.DefaultPermissions, + }, + OpenErr: assert.AnError, + }, + &mockkopia.MockFile{ + Entry: &mockkopia.MockEntry{ + EntryName: testFileName3, + EntryMode: mockkopia.DefaultPermissions, + }, + Data: testFileData3, + }, + }) + + collections, err := restoreSubtree(ctx, dirs, nil) + assert.Error(t, err) + + assert.Len(t, collections, 1) + testForFiles(t, expectedStreamData, collections) +} + +func (suite *KopiaUnitSuite) TestRestoreDirectory_FailWrongItemType() { + ctx := context.Background() + t := suite.T() + + expectedStreamData := map[string][]byte{ + path.Join(testEmailDir, testInboxDir, testFileName): testFileData, + path.Join(testEmailDir, testArchiveDir, testFileName3): testFileData3, + } + + dirs := virtualfs.NewStaticDirectory(testEmailDir, []fs.Entry{ + virtualfs.NewStaticDirectory(testInboxDir, []fs.Entry{ + &mockkopia.MockFile{ + Entry: &mockkopia.MockEntry{ + EntryName: testFileName, + EntryMode: mockkopia.DefaultPermissions, + }, + Data: testFileData, + }, + }), + virtualfs.NewStaticDirectory("foo", []fs.Entry{ + virtualfs.StreamingFileFromReader( + testFileName2, bytes.NewReader(testFileData2)), + }), + virtualfs.NewStaticDirectory(testArchiveDir, []fs.Entry{ + &mockkopia.MockFile{ + Entry: &mockkopia.MockEntry{ + EntryName: testFileName3, + EntryMode: mockkopia.DefaultPermissions, + }, + Data: testFileData3, + }, + }), + }) + + collections, err := restoreSubtree(ctx, dirs, nil) + assert.Error(t, err) + + assert.Len(t, collections, 2) + testForFiles(t, expectedStreamData, collections) +} + // --------------- // integration tests that use kopia // --------------- @@ -284,9 +428,12 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { type KopiaSimpleRepoIntegrationSuite struct { suite.Suite - w *Wrapper - ctx context.Context - snapshotID manifest.ID + w *Wrapper + ctx context.Context + snapshotID manifest.ID + inboxExpectedFiles map[string][]byte + archiveExpectedFiles map[string][]byte + allExpectedFiles map[string][]byte } func TestKopiaSimpleRepoIntegrationSuite(t *testing.T) { @@ -318,9 +465,34 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { path: testPath, streams: []connector.DataStream{ &mockconnector.MockExchangeData{ - ID: testFileUUID, + ID: testFileName, Reader: io.NopCloser(bytes.NewReader(testFileData)), }, + &mockconnector.MockExchangeData{ + ID: testFileName2, + Reader: io.NopCloser(bytes.NewReader(testFileData2)), + }, + }, + }, + &kopiaDataCollection{ + path: testPath2, + streams: []connector.DataStream{ + &mockconnector.MockExchangeData{ + ID: testFileName3, + Reader: io.NopCloser(bytes.NewReader(testFileData3)), + }, + &mockconnector.MockExchangeData{ + ID: testFileName4, + Reader: io.NopCloser(bytes.NewReader(testFileData4)), + }, + &mockconnector.MockExchangeData{ + ID: testFileName5, + Reader: io.NopCloser(bytes.NewReader(testFileData5)), + }, + &mockconnector.MockExchangeData{ + ID: testFileName6, + Reader: io.NopCloser(bytes.NewReader(testFileData6)), + }, }, }, } @@ -328,13 +500,33 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { stats, rp, err := suite.w.BackupCollections(suite.ctx, collections) require.NoError(t, err) require.Equal(t, stats.ErrorCount, 0) - require.Equal(t, stats.TotalFileCount, 1) - require.Equal(t, stats.TotalDirectoryCount, 3) + require.Equal(t, stats.TotalFileCount, 6) + require.Equal(t, stats.TotalDirectoryCount, 5) require.Equal(t, stats.IgnoredErrorCount, 0) require.False(t, stats.Incomplete) - assert.Len(t, rp.Entries, 1) + assert.Len(t, rp.Entries, 6) suite.snapshotID = manifest.ID(stats.SnapshotID) + + // path.Join doesn't like (testPath..., testFileName). + suite.inboxExpectedFiles = map[string][]byte{ + path.Join(append(testPath, testFileName)...): testFileData, + path.Join(append(testPath, testFileName2)...): testFileData2, + } + suite.archiveExpectedFiles = map[string][]byte{ + path.Join(append(testPath2, testFileName3)...): testFileData3, + path.Join(append(testPath2, testFileName4)...): testFileData4, + path.Join(append(testPath2, testFileName5)...): testFileData5, + path.Join(append(testPath2, testFileName6)...): testFileData6, + } + + suite.allExpectedFiles = map[string][]byte{} + for k, v := range suite.inboxExpectedFiles { + suite.allExpectedFiles[k] = v + } + for k, v := range suite.archiveExpectedFiles { + suite.allExpectedFiles[k] = v + } } func (suite *KopiaSimpleRepoIntegrationSuite) TearDownTest() { @@ -347,7 +539,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem() { c, err := suite.w.RestoreSingleItem( suite.ctx, string(suite.snapshotID), - append(testPath, testFileUUID), + append(testPath, testFileName), ) require.NoError(t, err) @@ -381,7 +573,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem_Err { "NoSnapshot", "foo", - append(testPath, testFileUUID), + append(testPath, testFileName), }, { "TargetNotAFile", @@ -406,3 +598,79 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem_Err }) } } + +func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupRestoreDirectory() { + table := []struct { + name string + dirPath []string + expectedFiles map[string][]byte + }{ + { + "RecoverUser", + []string{testTenant, testUser}, + suite.allExpectedFiles, + }, + { + "RecoverMail", + []string{testTenant, testUser, testEmailDir}, + suite.allExpectedFiles, + }, + { + "RecoverInbox", + []string{testTenant, testUser, testEmailDir, testInboxDir}, + suite.inboxExpectedFiles, + }, + { + "RecoverArchive", + []string{testTenant, testUser, testEmailDir, testArchiveDir}, + suite.archiveExpectedFiles, + }, + } + + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + collections, err := suite.w.RestoreDirectory( + suite.ctx, string(suite.snapshotID), test.dirPath) + require.NoError(t, err) + + testForFiles(t, test.expectedFiles, collections) + }) + } +} + +func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupRestoreDirectory_Errors() { + table := []struct { + name string + snapshotID string + dirPath []string + }{ + { + "EmptyPath", + string(suite.snapshotID), + []string{}, + }, + { + "BadSnapshotID", + "foo", + []string{testTenant, testUser, testEmailDir}, + }, + { + "NotADirectory", + string(suite.snapshotID), + append(testPath, testFileName), + }, + { + "NonExistantDirectory", + string(suite.snapshotID), + []string{testTenant, testUser, testEmailDir, "subdir"}, + }, + } + + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + _, err := suite.w.RestoreDirectory( + suite.ctx, test.snapshotID, test.dirPath) + assert.Error(t, err) + }) + } +}