diff --git a/src/go.mod b/src/go.mod index 6159acdad..3edcb7b36 100644 --- a/src/go.mod +++ b/src/go.mod @@ -2,7 +2,7 @@ module github.com/alcionai/corso go 1.18 -replace github.com/kopia/kopia => github.com/kopia/kopia v0.11.4-0.20220819163352-5ad8f1cf38a3 +replace github.com/kopia/kopia => github.com/kopia/kopia v0.11.4-0.20220822194227-5c88bcf1a6e7 require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0 diff --git a/src/go.sum b/src/go.sum index 97738b071..ee6317600 100644 --- a/src/go.sum +++ b/src/go.sum @@ -52,7 +52,7 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/aws/aws-sdk-go v1.44.80 h1:jEXGecSgPdvM5KnyDsSgFhZSm7WwaTp4h544Im4SfhI= +github.com/aws/aws-sdk-go v1.44.81 h1:C8oBZ+a+ka0qk3Q24MohQIFq0tkbO8IAu5tfpAMKVWE= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -225,8 +225,8 @@ github.com/klauspost/reedsolomon v1.10.0 h1:MonMtg979rxSHjwtsla5dZLhreS0Lu42AyQ2 github.com/klauspost/reedsolomon v1.10.0/go.mod h1:qHMIzMkuZUWqIh8mS/GruPdo3u0qwX2jk/LH440ON7Y= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/kopia/kopia v0.11.4-0.20220819163352-5ad8f1cf38a3 h1:10M0vnKBCqQqKhbUowEXrD0minuv4Gbh1aCF2GotV60= -github.com/kopia/kopia v0.11.4-0.20220819163352-5ad8f1cf38a3/go.mod h1:YO48laHllfGEHM1PtLcY66PYLFB9XIqru4bPmtZn8i0= +github.com/kopia/kopia v0.11.4-0.20220822194227-5c88bcf1a6e7 h1:CJaI4frTo1+ayoCa/imv8F3VPQbkWyr7U3KBI5PPjaI= +github.com/kopia/kopia v0.11.4-0.20220822194227-5c88bcf1a6e7/go.mod h1:ckJEq1c7KJcK1ZgqMRy+r+VpE/Z6iUzioZb/0KSBhWw= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= diff --git a/src/internal/connector/mockconnector/mock_data_collection.go b/src/internal/connector/mockconnector/mock_data_collection.go index 4dcef2cf1..852c08601 100644 --- a/src/internal/connector/mockconnector/mock_data_collection.go +++ b/src/internal/connector/mockconnector/mock_data_collection.go @@ -57,8 +57,8 @@ func (medc *MockExchangeDataCollection) Items() <-chan data.Stream { defer close(res) for i := 0; i < medc.messageCount; i++ { res <- &MockExchangeData{ - medc.Names[i], - io.NopCloser(bytes.NewReader(medc.Data[i])), + ID: medc.Names[i], + Reader: io.NopCloser(bytes.NewReader(medc.Data[i])), } } }() @@ -68,8 +68,9 @@ func (medc *MockExchangeDataCollection) Items() <-chan data.Stream { // ExchangeData represents a single item retrieved from exchange type MockExchangeData struct { - ID string - Reader io.ReadCloser + ID string + Reader io.ReadCloser + ReadErr error } func (med *MockExchangeData) UUID() string { @@ -77,6 +78,10 @@ func (med *MockExchangeData) UUID() string { } func (med *MockExchangeData) ToReader() io.ReadCloser { + if med.ReadErr != nil { + return io.NopCloser(errReader{med.ReadErr}) + } + return med.Reader } @@ -127,3 +132,11 @@ func GetMockEventBytes(subject string) []byte { "Review + Lunch\",\"type\":\"singleInstance\",\"webLink\":\"https://outlook.office365.com/owa/?itemid=AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwBGAAAAAADCNgjhM9QmQYWNcI7hCpPrBwDSEBNbUIB9RL6ePDeF3FIYAAAAAAENAADSEBNbUIB9RL6ePDeF3FIYAAAAAG76AAA%3D&exvsurl=1&path=/calendar/item\"}" return []byte(event) } + +type errReader struct { + readErr error +} + +func (er errReader) Read([]byte) (int, error) { + return 0, er.readErr +} diff --git a/src/internal/connector/mockconnector/mock_data_collection_test.go b/src/internal/connector/mockconnector/mock_data_collection_test.go index e3a14f1c2..4370e3354 100644 --- a/src/internal/connector/mockconnector/mock_data_collection_test.go +++ b/src/internal/connector/mockconnector/mock_data_collection_test.go @@ -2,11 +2,13 @@ package mockconnector_test import ( "bytes" + "io" "io/ioutil" "testing" "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/alcionai/corso/internal/connector/mockconnector" @@ -50,3 +52,53 @@ func (suite *MockExchangeCollectionSuite) TestMockExchangeCollection_NewExchange assert.NotNil(t, something) } } + +type MockExchangeDataSuite struct { + suite.Suite +} + +func TestMockExchangeDataSuite(t *testing.T) { + suite.Run(t, new(MockExchangeDataSuite)) +} + +func (suite *MockExchangeDataSuite) TestMockExchangeData() { + data := []byte("foo") + id := "bar" + + table := []struct { + name string + reader *mockconnector.MockExchangeData + check require.ErrorAssertionFunc + }{ + { + name: "NoError", + reader: &mockconnector.MockExchangeData{ + ID: id, + Reader: io.NopCloser(bytes.NewReader(data)), + }, + check: require.NoError, + }, + { + name: "Error", + reader: &mockconnector.MockExchangeData{ + ID: id, + ReadErr: assert.AnError, + }, + check: require.Error, + }, + } + + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + assert.Equal(t, id, test.reader.UUID()) + buf, err := ioutil.ReadAll(test.reader.ToReader()) + + test.check(t, err) + if err != nil { + return + } + + assert.Equal(t, data, buf) + }) + } +} diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index d03c2a1e0..4a29fbf16 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -3,6 +3,7 @@ package kopia import ( "context" "path" + "sync" "github.com/hashicorp/go-multierror" "github.com/kopia/kopia/fs" @@ -50,6 +51,58 @@ func manifestToStats(man *snapshot.Manifest) BackupStats { } } +type itemDetails struct { + info details.ItemInfo + repoRef string +} + +type corsoProgress struct { + snapshotfs.UploadProgress + pending map[string]*itemDetails + deets *details.Details + mu sync.RWMutex +} + +// Kopia interface function used as a callback when kopia finishes processing a +// file. +func (cp *corsoProgress) FinishedFile(relativePath string, err error) { + // Pass the call through as well so we don't break expected functionality. + defer cp.UploadProgress.FinishedFile(relativePath, err) + // Whether it succeeded or failed, remove the entry from our pending set so we + // don't leak references. + defer func() { + cp.mu.Lock() + defer cp.mu.Unlock() + + delete(cp.pending, relativePath) + }() + + if err != nil { + return + } + + d := cp.get(relativePath) + if d == nil { + return + } + + cp.deets.Add(d.repoRef, d.info) +} + +func (cp *corsoProgress) put(k string, v *itemDetails) { + cp.mu.Lock() + defer cp.mu.Unlock() + + cp.pending[k] = v +} + +func (cp *corsoProgress) get(k string) *itemDetails { + cp.mu.RLock() + defer cp.mu.RUnlock() + + return cp.pending[k] +} + func NewWrapper(c *conn) (*Wrapper, error) { if err := c.wrap(); err != nil { return nil, errors.Wrap(err, "creating Wrapper") @@ -79,9 +132,13 @@ func (w *Wrapper) Close(ctx context.Context) error { func getStreamItemFunc( staticEnts []fs.Entry, streamedEnts data.Collection, - snapshotDetails *details.Details, + progress *corsoProgress, ) func(context.Context, func(context.Context, fs.Entry) error) error { return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error { + // Collect all errors and return them at the end so that iteration for this + // directory doesn't end early. + var errs *multierror.Error + // Return static entries in this directory first. for _, d := range staticEnts { if err := cb(ctx, d); err != nil { @@ -101,22 +158,37 @@ func getStreamItemFunc( case e, ok := <-items: if !ok { - return nil + return errs.ErrorOrNil() } + itemPath := path.Join(append(streamedEnts.FullPath(), e.UUID())...) + ei, ok := e.(data.StreamInfo) if !ok { - return errors.New("item does not implement DataStreamInfo") + errs = multierror.Append( + errs, errors.Errorf("item %q does not implement DataStreamInfo", itemPath)) + + logger.Ctx(ctx).Errorw( + "item does not implement DataStreamInfo; skipping", "path", itemPath) + + continue } + // Relative path given to us in the callback is missing the root + // element. Add to pending set before calling the callback to avoid race + // conditions when the item is completed. + p := path.Join(append(streamedEnts.FullPath()[1:], e.UUID())...) + d := &itemDetails{info: ei.Info(), repoRef: itemPath} + + progress.put(p, d) + entry := virtualfs.StreamingFileFromReader(e.UUID(), e.ToReader()) if err := cb(ctx, entry); err != nil { - return errors.Wrap(err, "executing callback") + // Kopia's uploader swallows errors in most cases, so if we see + // something here it's probably a big issue and we should return. + errs = multierror.Append(errs, errors.Wrapf(err, "executing callback on %q", itemPath)) + return errs.ErrorOrNil() } - - // Populate BackupDetails - ep := append(streamedEnts.FullPath(), e.UUID()) - snapshotDetails.Add(path.Join(ep...), ei.Info()) } } } @@ -124,13 +196,13 @@ func getStreamItemFunc( // buildKopiaDirs recursively builds a directory hierarchy from the roots up. // Returned directories are virtualfs.StreamingDirectory. -func buildKopiaDirs(dirName string, dir *treeMap, snapshotDetails *details.Details) (fs.Directory, error) { +func buildKopiaDirs(dirName string, dir *treeMap, progress *corsoProgress) (fs.Directory, error) { // Need to build the directory tree from the leaves up because intermediate // directories need to have all their entries at creation time. var childDirs []fs.Entry for childName, childDir := range dir.childDirs { - child, err := buildKopiaDirs(childName, childDir, snapshotDetails) + child, err := buildKopiaDirs(childName, childDir, progress) if err != nil { return nil, err } @@ -140,7 +212,7 @@ func buildKopiaDirs(dirName string, dir *treeMap, snapshotDetails *details.Detai return virtualfs.NewStreamingDirectory( dirName, - getStreamItemFunc(childDirs, dir.collection, snapshotDetails), + getStreamItemFunc(childDirs, dir.collection, progress), ), nil } @@ -162,7 +234,7 @@ func newTreeMap() *treeMap { func inflateDirTree( ctx context.Context, collections []data.Collection, - snapshotDetails *details.Details, + progress *corsoProgress, ) (fs.Directory, error) { roots := make(map[string]*treeMap) @@ -222,7 +294,7 @@ func inflateDirTree( var res fs.Directory for dirName, dir := range roots { - tmp, err := buildKopiaDirs(dirName, dir, snapshotDetails) + tmp, err := buildKopiaDirs(dirName, dir, progress) if err != nil { return nil, err } @@ -241,25 +313,28 @@ func (w Wrapper) BackupCollections( return nil, nil, errNotConnected } - snapshotDetails := &details.Details{} + progress := &corsoProgress{ + pending: map[string]*itemDetails{}, + deets: &details.Details{}, + } - dirTree, err := inflateDirTree(ctx, collections, snapshotDetails) + dirTree, err := inflateDirTree(ctx, collections, progress) if err != nil { return nil, nil, errors.Wrap(err, "building kopia directories") } - stats, err := w.makeSnapshotWithRoot(ctx, dirTree, snapshotDetails) + stats, err := w.makeSnapshotWithRoot(ctx, dirTree, progress) if err != nil { return nil, nil, err } - return stats, snapshotDetails, nil + return stats, progress.deets, nil } func (w Wrapper) makeSnapshotWithRoot( ctx context.Context, root fs.Directory, - snapshotDetails *details.Details, + progress *corsoProgress, ) (*BackupStats, error) { var man *snapshot.Manifest @@ -280,14 +355,25 @@ func (w Wrapper) makeSnapshotWithRoot( Path: root.Name(), } - policyTree, err := policy.TreeForSource(innerCtx, w.c, si) + trueVal := policy.OptionalBool(true) + errPolicy := &policy.Policy{ + ErrorHandlingPolicy: policy.ErrorHandlingPolicy{ + IgnoreFileErrors: &trueVal, + IgnoreDirectoryErrors: &trueVal, + }, + } + policyTree, err := policy.TreeForSourceWithOverride(innerCtx, w.c, si, errPolicy) if err != nil { err = errors.Wrap(err, "get policy tree") logger.Ctx(innerCtx).Errorw("kopia backup", err) return err } + // By default Uploader is best-attempt. u := snapshotfs.NewUploader(rw) + progress.UploadProgress = u.Progress + u.Progress = progress + man, err = u.Upload(innerCtx, root, policyTree, si) if err != nil { err = errors.Wrap(err, "uploading data") @@ -459,7 +545,8 @@ func walkDirectory( files = append(files, e) default: errs = multierror.Append(errs, errors.Errorf("unexpected item type %T", e)) - logger.Ctx(ctx).Warnf("unexpected item of type %T; skipping", e) + logger.Ctx(ctx).Errorw( + "unexpected item type; skipping", "type", e) } return nil @@ -507,7 +594,8 @@ func restoreSubtree( fileFullPath := path.Join(append(append([]string{}, fullPath...), f.Name())...) errs = multierror.Append( errs, errors.Wrapf(err, "getting reader for file %q", fileFullPath)) - logger.Ctx(ctx).Warnf("skipping file %q", fileFullPath) + logger.Ctx(ctx).Errorw( + "unable to get file reader; skipping", "path", fileFullPath) continue } diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 5266034ef..95d9e0f45 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -12,6 +12,7 @@ import ( "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/virtualfs" "github.com/kopia/kopia/repo/manifest" + "github.com/kopia/kopia/snapshot/snapshotfs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -88,6 +89,89 @@ func testForFiles( // --------------- // unit tests // --------------- +type CorsoProgressUnitSuite struct { + suite.Suite +} + +func TestCorsoProgressUnitSuite(t *testing.T) { + suite.Run(t, new(CorsoProgressUnitSuite)) +} + +func (suite *CorsoProgressUnitSuite) TestFinishedFile() { + type testInfo struct { + info *itemDetails + err error + } + + targetFileName := "testFile" + deets := &itemDetails{details.ItemInfo{}, targetFileName} + + table := []struct { + name string + cachedItems map[string]testInfo + expectedLen int + err error + }{ + { + name: "DetailsExist", + cachedItems: map[string]testInfo{ + targetFileName: { + info: deets, + err: nil, + }, + }, + expectedLen: 1, + }, + { + name: "PendingNoDetails", + cachedItems: map[string]testInfo{ + targetFileName: { + info: nil, + err: nil, + }, + }, + expectedLen: 0, + }, + { + name: "HadError", + cachedItems: map[string]testInfo{ + targetFileName: { + info: deets, + err: assert.AnError, + }, + }, + expectedLen: 0, + }, + { + name: "NotPending", + expectedLen: 0, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + bd := &details.Details{} + cp := corsoProgress{ + UploadProgress: &snapshotfs.NullUploadProgress{}, + deets: bd, + pending: map[string]*itemDetails{}, + } + + for k, v := range test.cachedItems { + cp.put(k, v.info) + } + + require.Len(t, cp.pending, len(test.cachedItems)) + + for k, v := range test.cachedItems { + cp.FinishedFile(k, v.err) + } + + assert.Empty(t, cp.pending) + assert.Len(t, bd.Entries, test.expectedLen) + }) + } +} + type KopiaUnitSuite struct { suite.Suite } @@ -117,7 +201,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() { user2: 42, } - snapshotDetails := &details.Details{} + progress := &corsoProgress{pending: map[string]*itemDetails{}} collections := []data.Collection{ mockconnector.NewMockExchangeCollection( @@ -138,7 +222,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() { // - user2 // - emails // - 42 separate files - dirTree, err := inflateDirTree(ctx, collections, snapshotDetails) + dirTree, err := inflateDirTree(ctx, collections, progress) require.NoError(suite.T(), err) assert.Equal(suite.T(), dirTree.Name(), tenant) @@ -169,7 +253,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() { totalFileCount += c } - assert.Len(suite.T(), snapshotDetails.Entries, totalFileCount) + assert.Len(suite.T(), progress.pending, totalFileCount) } func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() { @@ -180,7 +264,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() { expectedFileCount := 42 - snapshotDetails := &details.Details{} + progress := &corsoProgress{pending: map[string]*itemDetails{}} collections := []data.Collection{ mockconnector.NewMockExchangeCollection( []string{emails}, @@ -191,7 +275,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() { // Returned directory structure should look like: // - emails // - 42 separate files - dirTree, err := inflateDirTree(ctx, collections, snapshotDetails) + dirTree, err := inflateDirTree(ctx, collections, progress) require.NoError(suite.T(), err) assert.Equal(suite.T(), dirTree.Name(), emails) @@ -243,9 +327,9 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_MixedDirectory() { for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { - snapshotDetails := &details.Details{} + progress := &corsoProgress{pending: map[string]*itemDetails{}} - dirTree, err := inflateDirTree(ctx, test.layout, snapshotDetails) + dirTree, err := inflateDirTree(ctx, test.layout, progress) require.NoError(t, err) assert.Equal(t, testTenant, dirTree.Name()) @@ -323,8 +407,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() { ctx := context.Background() suite.T().Run(test.name, func(t *testing.T) { - snapshotDetails := &details.Details{} - _, err := inflateDirTree(ctx, test.layout, snapshotDetails) + _, err := inflateDirTree(ctx, test.layout, nil) assert.Error(t, err) }) } @@ -528,6 +611,57 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { testForFiles(t, expected, result) } +func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { + t := suite.T() + + collections := []data.Collection{ + &kopiaDataCollection{ + path: testPath, + streams: []data.Stream{ + &mockconnector.MockExchangeData{ + ID: testFileName, + Reader: io.NopCloser(bytes.NewReader(testFileData)), + }, + &mockconnector.MockExchangeData{ + ID: testFileName2, + Reader: io.NopCloser(bytes.NewReader(testFileData2)), + }, + }, + }, + &kopiaDataCollection{ + path: testPath2, + streams: []data.Stream{ + &mockconnector.MockExchangeData{ + ID: testFileName3, + Reader: io.NopCloser(bytes.NewReader(testFileData3)), + }, + &mockconnector.MockExchangeData{ + ID: testFileName4, + ReadErr: assert.AnError, + }, + &mockconnector.MockExchangeData{ + ID: testFileName5, + Reader: io.NopCloser(bytes.NewReader(testFileData5)), + }, + &mockconnector.MockExchangeData{ + ID: testFileName6, + Reader: io.NopCloser(bytes.NewReader(testFileData6)), + }, + }, + }, + } + + stats, rp, err := suite.w.BackupCollections(suite.ctx, collections) + require.NoError(t, err) + + assert.Equal(t, 0, stats.ErrorCount) + assert.Equal(t, 5, stats.TotalFileCount) + assert.Equal(t, 5, stats.TotalDirectoryCount) + assert.Equal(t, 1, stats.IgnoredErrorCount) + assert.False(t, stats.Incomplete) + assert.Len(t, rp.Entries, 5) +} + type KopiaSimpleRepoIntegrationSuite struct { suite.Suite w *Wrapper