diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 9390fa543..87d83e3e7 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -4,6 +4,7 @@ import ( "context" "path" + "github.com/hashicorp/go-multierror" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/virtualfs" "github.com/kopia/kopia/repo" @@ -15,6 +16,7 @@ import ( "github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/pkg/backup" + "github.com/alcionai/corso/pkg/logger" ) const ( @@ -340,14 +342,23 @@ func (w Wrapper) collectItems( // The paths passed below is the path up to (but not including) the // file/directory passed. if isDirectory { - return nil, errors.New("directory restore not implemented") + dir, ok := e.(fs.Directory) + if !ok { + return nil, errors.New("requested object is not a directory") + } + + c, err := restoreSubtree(ctx, dir, itemPath[:len(itemPath)-1]) + // For some reason tests error out if the multierror is nil but we don't + // call ErrorOrNil. + return c, err.ErrorOrNil() } f, ok := e.(fs.File) if !ok { return nil, errors.New("requested object is not a file") } - c, err := w.restoreSingleItem(ctx, f, itemPath[:len(itemPath)-1]) + + c, err := restoreSingleItem(ctx, f, itemPath[:len(itemPath)-1]) if err != nil { return nil, err } @@ -382,7 +393,7 @@ func (w Wrapper) RestoreSingleItem( // does not exist in kopia or is not a file an error is returned. The UUID of // the returned DataStreams will be the name of the kopia file the data is // sourced from. -func (w Wrapper) restoreSingleItem( +func restoreSingleItem( ctx context.Context, f fs.File, itemPath []string, @@ -402,3 +413,120 @@ func (w Wrapper) restoreSingleItem( path: itemPath, }, nil } + +func walkDirectory( + ctx context.Context, + dir fs.Directory, +) ([]fs.File, []fs.Directory, *multierror.Error) { + files := []fs.File{} + dirs := []fs.Directory{} + var errs *multierror.Error + + err := dir.IterateEntries(ctx, func(innerCtx context.Context, e fs.Entry) error { + // Early exit on context cancel. + if err := innerCtx.Err(); err != nil { + return err + } + + switch e.(type) { + case fs.Directory: + d := e.(fs.Directory) + dirs = append(dirs, d) + case fs.File: + f := e.(fs.File) + files = append(files, f) + default: + errs = multierror.Append(errs, errors.Errorf("unexpected item type %T", e)) + logger.Ctx(ctx).Warnf("unexpected item of type %T; skipping", e) + } + + return nil + }) + + if err != nil { + // If the iterator itself had an error add it to the list. + errs = multierror.Append(errs, errors.Wrap(err, "getting directory data")) + } + + return files, dirs, errs +} + +// restoreSubtree returns DataCollections for each subdirectory (or the +// directory itself) that contains files. The FullPath of each returned +// DataCollection is the path from the root of the kopia directory structure to +// the directory. The UUID of each DataStream in each DataCollection is the name +// of the kopia file the data is sourced from. +func restoreSubtree( + ctx context.Context, + dir fs.Directory, + relativePath []string, +) ([]connector.DataCollection, *multierror.Error) { + collections := []connector.DataCollection{} + // Want a local copy of relativePath with our new element. + fullPath := append(append([]string{}, relativePath...), dir.Name()) + var errs *multierror.Error + + files, dirs, err := walkDirectory(ctx, dir) + if err != nil { + errs = multierror.Append( + errs, errors.Wrapf(err, "walking directory %q", path.Join(fullPath...))) + } + + if len(files) > 0 { + if ctxErr := ctx.Err(); ctxErr != nil { + errs = multierror.Append(errs, errors.WithStack(ctxErr)) + return nil, errs + } + + streams := make([]connector.DataStream, 0, len(files)) + + for _, f := range files { + r, err := f.Open(ctx) + if err != nil { + fileFullPath := path.Join(append(append([]string{}, fullPath...), f.Name())...) + errs = multierror.Append( + errs, errors.Wrapf(err, "getting reader for file %q", fileFullPath)) + logger.Ctx(ctx).Warnf("skipping file %q", fileFullPath) + continue + } + + streams = append(streams, &kopiaDataStream{ + reader: r, + uuid: f.Name(), + }) + } + + collections = append(collections, &kopiaDataCollection{ + streams: streams, + path: fullPath, + }) + } + + for _, d := range dirs { + if ctxErr := ctx.Err(); ctxErr != nil { + errs = multierror.Append(errs, errors.WithStack(ctxErr)) + return nil, errs + } + + c, err := restoreSubtree(ctx, d, fullPath) + if err != nil { + errs = multierror.Append(errs, errors.Wrapf( + err, + "traversing subdirectory %q", + path.Join(append(append([]string{}, fullPath...), d.Name())...), + )) + } + + collections = append(collections, c...) + } + + return collections, errs +} + +func (w Wrapper) RestoreDirectory( + ctx context.Context, + snapshotID string, + basePath []string, +) ([]connector.DataCollection, error) { + return w.collectItems(ctx, snapshotID, basePath, true) +} diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 329470847..22d5f4a15 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -5,9 +5,11 @@ import ( "context" "io" "io/ioutil" + "path" "testing" "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/fs/virtualfs" "github.com/kopia/kopia/repo/manifest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -15,20 +17,36 @@ import ( "github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/internal/connector/mockconnector" + "github.com/alcionai/corso/internal/kopia/mockkopia" ctesting "github.com/alcionai/corso/internal/testing" "github.com/alcionai/corso/pkg/backup" ) const ( - testTenant = "a-tenant" - testUser = "user1" - testEmailDir = "mail" - testFileUUID = "a-file" + testTenant = "a-tenant" + testUser = "user1" + testEmailDir = "mail" + testInboxDir = "inbox" + testArchiveDir = "archive" + testFileName = "file1" + testFileName2 = "file2" + testFileName3 = "file3" + testFileName4 = "file4" + testFileName5 = "file5" + testFileName6 = "file6" ) var ( - testPath = []string{testTenant, testUser, testEmailDir} - testFileData = []byte("abcdefghijklmnopqrstuvwxyz") + testPath = []string{testTenant, testUser, testEmailDir, testInboxDir} + testPath2 = []string{testTenant, testUser, testEmailDir, testArchiveDir} + testFileData = []byte("abcdefghijklmnopqrstuvwxyz") + testFileData2 = []byte("zyxwvutsrqponmlkjihgfedcba") + testFileData3 = []byte("foo") + testFileData4 = []byte("bar") + testFileData5 = []byte("baz") + // Intentional duplicate to make sure all files are scanned during recovery + // (contrast to behavior of snapshotfs.TreeWalker). + testFileData6 = testFileData ) func entriesToNames(entries []fs.Entry) []string { @@ -41,6 +59,36 @@ func entriesToNames(entries []fs.Entry) []string { return res } +func testForFiles( + t *testing.T, + expected map[string][]byte, + collections []connector.DataCollection, +) { + count := 0 + for _, c := range collections { + for s := range c.Items() { + count++ + + fullPath := path.Join(append(c.FullPath(), s.UUID())...) + + expected, ok := expected[fullPath] + require.True( + t, + ok, + "unexpected file with path %q", + path.Join(append(c.FullPath(), fullPath)...), + ) + + buf, err := ioutil.ReadAll(s.ToReader()) + require.NoError(t, err) + + assert.Equal(t, expected, buf) + } + } + + assert.Equal(t, len(expected), count) +} + // --------------- // unit tests // --------------- @@ -222,6 +270,102 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() { } } +func (suite *KopiaUnitSuite) TestRestoreItem() { + ctx := context.Background() + + file := &mockkopia.MockFile{ + Entry: &mockkopia.MockEntry{ + EntryName: testFileName2, + EntryMode: mockkopia.DefaultPermissions, + }, + OpenErr: assert.AnError, + } + + _, err := restoreSingleItem(ctx, file, nil) + assert.Error(suite.T(), err) +} + +func (suite *KopiaUnitSuite) TestRestoreDirectory_FailGettingReader() { + ctx := context.Background() + t := suite.T() + + expectedStreamData := map[string][]byte{ + path.Join(testInboxDir, testFileName): testFileData, + path.Join(testInboxDir, testFileName3): testFileData3, + } + + dirs := virtualfs.NewStaticDirectory(testInboxDir, []fs.Entry{ + &mockkopia.MockFile{ + Entry: &mockkopia.MockEntry{ + EntryName: testFileName, + EntryMode: mockkopia.DefaultPermissions, + }, + Data: testFileData, + }, + &mockkopia.MockFile{ + Entry: &mockkopia.MockEntry{ + EntryName: testFileName2, + EntryMode: mockkopia.DefaultPermissions, + }, + OpenErr: assert.AnError, + }, + &mockkopia.MockFile{ + Entry: &mockkopia.MockEntry{ + EntryName: testFileName3, + EntryMode: mockkopia.DefaultPermissions, + }, + Data: testFileData3, + }, + }) + + collections, err := restoreSubtree(ctx, dirs, nil) + assert.Error(t, err) + + assert.Len(t, collections, 1) + testForFiles(t, expectedStreamData, collections) +} + +func (suite *KopiaUnitSuite) TestRestoreDirectory_FailWrongItemType() { + ctx := context.Background() + t := suite.T() + + expectedStreamData := map[string][]byte{ + path.Join(testEmailDir, testInboxDir, testFileName): testFileData, + path.Join(testEmailDir, testArchiveDir, testFileName3): testFileData3, + } + + dirs := virtualfs.NewStaticDirectory(testEmailDir, []fs.Entry{ + virtualfs.NewStaticDirectory(testInboxDir, []fs.Entry{ + &mockkopia.MockFile{ + Entry: &mockkopia.MockEntry{ + EntryName: testFileName, + EntryMode: mockkopia.DefaultPermissions, + }, + Data: testFileData, + }, + }), + virtualfs.NewStaticDirectory("foo", []fs.Entry{ + virtualfs.StreamingFileFromReader( + testFileName2, bytes.NewReader(testFileData2)), + }), + virtualfs.NewStaticDirectory(testArchiveDir, []fs.Entry{ + &mockkopia.MockFile{ + Entry: &mockkopia.MockEntry{ + EntryName: testFileName3, + EntryMode: mockkopia.DefaultPermissions, + }, + Data: testFileData3, + }, + }), + }) + + collections, err := restoreSubtree(ctx, dirs, nil) + assert.Error(t, err) + + assert.Len(t, collections, 2) + testForFiles(t, expectedStreamData, collections) +} + // --------------- // integration tests that use kopia // --------------- @@ -284,9 +428,12 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { type KopiaSimpleRepoIntegrationSuite struct { suite.Suite - w *Wrapper - ctx context.Context - snapshotID manifest.ID + w *Wrapper + ctx context.Context + snapshotID manifest.ID + inboxExpectedFiles map[string][]byte + archiveExpectedFiles map[string][]byte + allExpectedFiles map[string][]byte } func TestKopiaSimpleRepoIntegrationSuite(t *testing.T) { @@ -318,9 +465,34 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { path: testPath, streams: []connector.DataStream{ &mockconnector.MockExchangeData{ - ID: testFileUUID, + ID: testFileName, Reader: io.NopCloser(bytes.NewReader(testFileData)), }, + &mockconnector.MockExchangeData{ + ID: testFileName2, + Reader: io.NopCloser(bytes.NewReader(testFileData2)), + }, + }, + }, + &kopiaDataCollection{ + path: testPath2, + streams: []connector.DataStream{ + &mockconnector.MockExchangeData{ + ID: testFileName3, + Reader: io.NopCloser(bytes.NewReader(testFileData3)), + }, + &mockconnector.MockExchangeData{ + ID: testFileName4, + Reader: io.NopCloser(bytes.NewReader(testFileData4)), + }, + &mockconnector.MockExchangeData{ + ID: testFileName5, + Reader: io.NopCloser(bytes.NewReader(testFileData5)), + }, + &mockconnector.MockExchangeData{ + ID: testFileName6, + Reader: io.NopCloser(bytes.NewReader(testFileData6)), + }, }, }, } @@ -328,13 +500,33 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { stats, rp, err := suite.w.BackupCollections(suite.ctx, collections) require.NoError(t, err) require.Equal(t, stats.ErrorCount, 0) - require.Equal(t, stats.TotalFileCount, 1) - require.Equal(t, stats.TotalDirectoryCount, 3) + require.Equal(t, stats.TotalFileCount, 6) + require.Equal(t, stats.TotalDirectoryCount, 5) require.Equal(t, stats.IgnoredErrorCount, 0) require.False(t, stats.Incomplete) - assert.Len(t, rp.Entries, 1) + assert.Len(t, rp.Entries, 6) suite.snapshotID = manifest.ID(stats.SnapshotID) + + // path.Join doesn't like (testPath..., testFileName). + suite.inboxExpectedFiles = map[string][]byte{ + path.Join(append(testPath, testFileName)...): testFileData, + path.Join(append(testPath, testFileName2)...): testFileData2, + } + suite.archiveExpectedFiles = map[string][]byte{ + path.Join(append(testPath2, testFileName3)...): testFileData3, + path.Join(append(testPath2, testFileName4)...): testFileData4, + path.Join(append(testPath2, testFileName5)...): testFileData5, + path.Join(append(testPath2, testFileName6)...): testFileData6, + } + + suite.allExpectedFiles = map[string][]byte{} + for k, v := range suite.inboxExpectedFiles { + suite.allExpectedFiles[k] = v + } + for k, v := range suite.archiveExpectedFiles { + suite.allExpectedFiles[k] = v + } } func (suite *KopiaSimpleRepoIntegrationSuite) TearDownTest() { @@ -347,7 +539,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem() { c, err := suite.w.RestoreSingleItem( suite.ctx, string(suite.snapshotID), - append(testPath, testFileUUID), + append(testPath, testFileName), ) require.NoError(t, err) @@ -381,7 +573,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem_Err { "NoSnapshot", "foo", - append(testPath, testFileUUID), + append(testPath, testFileName), }, { "TargetNotAFile", @@ -406,3 +598,79 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem_Err }) } } + +func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupRestoreDirectory() { + table := []struct { + name string + dirPath []string + expectedFiles map[string][]byte + }{ + { + "RecoverUser", + []string{testTenant, testUser}, + suite.allExpectedFiles, + }, + { + "RecoverMail", + []string{testTenant, testUser, testEmailDir}, + suite.allExpectedFiles, + }, + { + "RecoverInbox", + []string{testTenant, testUser, testEmailDir, testInboxDir}, + suite.inboxExpectedFiles, + }, + { + "RecoverArchive", + []string{testTenant, testUser, testEmailDir, testArchiveDir}, + suite.archiveExpectedFiles, + }, + } + + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + collections, err := suite.w.RestoreDirectory( + suite.ctx, string(suite.snapshotID), test.dirPath) + require.NoError(t, err) + + testForFiles(t, test.expectedFiles, collections) + }) + } +} + +func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupRestoreDirectory_Errors() { + table := []struct { + name string + snapshotID string + dirPath []string + }{ + { + "EmptyPath", + string(suite.snapshotID), + []string{}, + }, + { + "BadSnapshotID", + "foo", + []string{testTenant, testUser, testEmailDir}, + }, + { + "NotADirectory", + string(suite.snapshotID), + append(testPath, testFileName), + }, + { + "NonExistantDirectory", + string(suite.snapshotID), + []string{testTenant, testUser, testEmailDir, "subdir"}, + }, + } + + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + _, err := suite.w.RestoreDirectory( + suite.ctx, test.snapshotID, test.dirPath) + assert.Error(t, err) + }) + } +}