From a7fd90b2f8b28991cbc20c5e18daa9adc689411f Mon Sep 17 00:00:00 2001 From: Keepers Date: Wed, 8 Feb 2023 13:50:55 -0700 Subject: [PATCH] add fault/clues to kopia, pt 1 (#2365) ## Description Begins adding fault and clues to kopia. Part 1 just covers the surface kopia/Wrapper, and all the upstream packages that call it. This also replaces the progress multierr with a fault errs. RestoreMultipleItems changes from always handling errors in failFast mode to checking for failFast configuraton, and handling bestEffort otherwise. ## Does this PR need a docs update or release note? - [x] :no_entry: No ## Type of change - [x] :broom: Tech Debt/Cleanup ## Issue(s) * #1970 ## Test Plan - [x] :zap: Unit test - [x] :green_heart: E2E --- src/internal/kopia/upload.go | 14 +-- src/internal/kopia/upload_test.go | 43 +++++++-- src/internal/kopia/wrapper.go | 88 +++++++++++-------- src/internal/kopia/wrapper_test.go | 38 ++++---- src/internal/operations/backup.go | 20 +++-- .../operations/backup_integration_test.go | 3 +- src/internal/operations/backup_test.go | 14 +-- src/internal/operations/common.go | 6 +- src/internal/operations/manifests.go | 7 +- src/internal/operations/manifests_test.go | 9 +- src/internal/operations/restore.go | 5 +- src/internal/streamstore/streamstore.go | 8 +- src/internal/streamstore/streamstore_test.go | 6 +- src/pkg/repository/repository.go | 3 +- 14 files changed, 164 insertions(+), 100 deletions(-) diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index 7ada05013..a4ae1fbcc 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -15,6 +15,7 @@ import ( "time" "unsafe" + "github.com/alcionai/clues" "github.com/hashicorp/go-multierror" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/virtualfs" @@ -25,6 +26,7 @@ import ( "github.com/alcionai/corso/src/internal/data" D "github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" ) @@ -137,7 +139,7 @@ type corsoProgress struct { toMerge map[string]path.Path mu sync.RWMutex totalBytes int64 - errs *multierror.Error + errs *fault.Errors } // Kopia interface function used as a callback when kopia finishes processing a @@ -167,11 +169,11 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) { // never had to materialize their details in-memory. if d.info == nil { if d.prevPath == nil { - cp.errs = multierror.Append(cp.errs, errors.Errorf( - "item sourced from previous backup with no previous path. Service: %s, Category: %s", - d.repoPath.Service().String(), - d.repoPath.Category().String(), - )) + cp.errs.Add(clues.New("item sourced from previous backup with no previous path"). + WithAll( + "service", d.repoPath.Service().String(), + "category", d.repoPath.Category().String(), + )) return } diff --git a/src/internal/kopia/upload_test.go b/src/internal/kopia/upload_test.go index 55877db17..e284d4d67 100644 --- a/src/internal/kopia/upload_test.go +++ b/src/internal/kopia/upload_test.go @@ -22,6 +22,7 @@ import ( "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -456,6 +457,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() { UploadProgress: &snapshotfs.NullUploadProgress{}, deets: bd, pending: map[string]*itemDetails{}, + errs: fault.New(true), } ci := test.cachedItems(suite.targetFileName, suite.targetFilePath) @@ -503,6 +505,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileCachedNoPrevPathErrors() { UploadProgress: &snapshotfs.NullUploadProgress{}, deets: bd, pending: map[string]*itemDetails{}, + errs: fault.New(true), } for k, v := range cachedItems { @@ -518,7 +521,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileCachedNoPrevPathErrors() { assert.Empty(t, cp.pending) assert.Empty(t, bd.Details().Entries) - assert.Error(t, cp.errs.ErrorOrNil()) + assert.Error(t, cp.errs.Err()) } func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchyNewItem() { @@ -533,6 +536,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchyNewItem() { deets: bd, pending: map[string]*itemDetails{}, toMerge: map[string]path.Path{}, + errs: fault.New(true), } deets := &itemDetails{info: &details.ItemInfo{}, repoPath: suite.targetFilePath} @@ -605,6 +609,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileBaseItemDoesntBuildHierarch deets: bd, pending: map[string]*itemDetails{}, toMerge: map[string]path.Path{}, + errs: fault.New(true), } deets := &itemDetails{ @@ -629,6 +634,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedHashingFile() { UploadProgress: &snapshotfs.NullUploadProgress{}, deets: bd, pending: map[string]*itemDetails{}, + errs: fault.New(true), } ci := test.cachedItems(suite.targetFileName, suite.targetFilePath) @@ -681,7 +687,10 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree() { user2Encoded: 42, } - progress := &corsoProgress{pending: map[string]*itemDetails{}} + progress := &corsoProgress{ + pending: map[string]*itemDetails{}, + errs: fault.New(true), + } collections := []data.BackupCollection{ mockconnector.NewMockExchangeCollection( @@ -791,7 +800,10 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_MixedDirectory() for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { - progress := &corsoProgress{pending: map[string]*itemDetails{}} + progress := &corsoProgress{ + pending: map[string]*itemDetails{}, + errs: fault.New(true), + } dirTree, err := inflateDirTree(ctx, nil, nil, test.layout, nil, progress) require.NoError(t, err) @@ -971,7 +983,10 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeErrors() { ctx, flush := tester.NewContext() defer flush() - progress := &corsoProgress{pending: map[string]*itemDetails{}} + progress := &corsoProgress{ + pending: map[string]*itemDetails{}, + errs: fault.New(true), + } cols := []data.BackupCollection{} for _, s := range test.states { @@ -1249,7 +1264,10 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSingleSubtree() { ctx, flush := tester.NewContext() defer flush() - progress := &corsoProgress{pending: map[string]*itemDetails{}} + progress := &corsoProgress{ + pending: map[string]*itemDetails{}, + errs: fault.New(true), + } msw := &mockSnapshotWalker{ snapshotRoot: getBaseSnapshot(), } @@ -1951,7 +1969,10 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto ctx, flush := tester.NewContext() defer flush() - progress := &corsoProgress{pending: map[string]*itemDetails{}} + progress := &corsoProgress{ + pending: map[string]*itemDetails{}, + errs: fault.New(true), + } msw := &mockSnapshotWalker{ snapshotRoot: getBaseSnapshot(), } @@ -2097,7 +2118,10 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSkipsDeletedSubtre }, ) - progress := &corsoProgress{pending: map[string]*itemDetails{}} + progress := &corsoProgress{ + pending: map[string]*itemDetails{}, + errs: fault.New(true), + } mc := mockconnector.NewMockExchangeCollection(suite.testPath, 1) mc.PrevPath = mc.FullPath() mc.ColState = data.DeletedState @@ -2346,7 +2370,10 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSelectsCorrectSubt }, ) - progress := &corsoProgress{pending: map[string]*itemDetails{}} + progress := &corsoProgress{ + pending: map[string]*itemDetails{}, + errs: fault.New(true), + } mc := mockconnector.NewMockExchangeCollection(inboxPath, 1) mc.PrevPath = mc.FullPath() diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index db2f8e645..d49241d36 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -4,7 +4,7 @@ import ( "context" "strings" - "github.com/hashicorp/go-multierror" + "github.com/alcionai/clues" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/manifest" @@ -17,6 +17,7 @@ import ( D "github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/internal/stats" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" ) @@ -101,7 +102,11 @@ func (w *Wrapper) Close(ctx context.Context) error { err := w.c.Close(ctx) w.c = nil - return errors.Wrap(err, "closing Wrapper") + if err != nil { + return clues.Wrap(err, "closing Wrapper").WithClues(ctx) + } + + return nil } type IncrementalBase struct { @@ -122,9 +127,10 @@ func (w Wrapper) BackupCollections( globalExcludeSet map[string]struct{}, tags map[string]string, buildTreeWithBase bool, + errs *fault.Errors, ) (*BackupStats, *details.Builder, map[string]path.Path, error) { if w.c == nil { - return nil, nil, nil, errNotConnected + return nil, nil, nil, clues.Stack(errNotConnected).WithClues(ctx) } ctx, end := D.Span(ctx, "kopia:backupCollections") @@ -138,6 +144,7 @@ func (w Wrapper) BackupCollections( pending: map[string]*itemDetails{}, deets: &details.Builder{}, toMerge: map[string]path.Path{}, + errs: errs, } // When running an incremental backup, we need to pass the prior @@ -165,14 +172,12 @@ func (w Wrapper) BackupCollections( previousSnapshots, dirTree, tags, - progress, - ) + progress) if err != nil { - combinedErrs := multierror.Append(nil, err, progress.errs) - return nil, nil, nil, combinedErrs.ErrorOrNil() + return nil, nil, nil, err } - return s, progress.deets, progress.toMerge, progress.errs.ErrorOrNil() + return s, progress.deets, progress.toMerge, progress.errs.Err() } func (w Wrapper) makeSnapshotWithRoot( @@ -197,9 +202,7 @@ func (w Wrapper) makeSnapshotWithRoot( logger.Ctx(ctx).Infow( "using snapshots for kopia-assisted incrementals", - "snapshot_ids", - snapIDs, - ) + "snapshot_ids", snapIDs) tags := map[string]string{} @@ -224,6 +227,8 @@ func (w Wrapper) makeSnapshotWithRoot( OnUpload: bc.Count, }, func(innerCtx context.Context, rw repo.RepositoryWriter) error { + log := logger.Ctx(innerCtx) + si := snapshot.SourceInfo{ Host: corsoHost, UserName: corsoUser, @@ -240,8 +245,8 @@ func (w Wrapper) makeSnapshotWithRoot( } policyTree, err := policy.TreeForSourceWithOverride(innerCtx, w.c, si, errPolicy) if err != nil { - err = errors.Wrap(err, "get policy tree") - logger.Ctx(innerCtx).Errorw("kopia backup", err) + err = clues.Wrap(err, "get policy tree").WithClues(ctx) + log.With("err", err).Errorw("building kopia backup", clues.InErr(err).Slice()...) return err } @@ -253,16 +258,16 @@ func (w Wrapper) makeSnapshotWithRoot( man, err = u.Upload(innerCtx, root, policyTree, si, prevSnaps...) if err != nil { - err = errors.Wrap(err, "uploading data") - logger.Ctx(innerCtx).Errorw("kopia backup", err) + err = clues.Wrap(err, "uploading data").WithClues(ctx) + log.With("err", err).Errorw("uploading kopia backup", clues.InErr(err).Slice()...) return err } man.Tags = tags if _, err := snapshot.SaveSnapshot(innerCtx, rw, man); err != nil { - err = errors.Wrap(err, "saving snapshot") - logger.Ctx(innerCtx).Errorw("kopia backup", err) + err = clues.Wrap(err, "saving snapshot").WithClues(ctx) + log.With("err", err).Errorw("persisting kopia backup snapshot", clues.InErr(err).Slice()...) return err } @@ -272,7 +277,7 @@ func (w Wrapper) makeSnapshotWithRoot( // Telling kopia to always flush may hide other errors if it fails while // flushing the write session (hence logging above). if err != nil { - return nil, errors.Wrap(err, "kopia backup") + return nil, clues.Wrap(err, "kopia backup") } res := manifestToStats(man, progress, bc) @@ -286,12 +291,15 @@ func (w Wrapper) getSnapshotRoot( ) (fs.Entry, error) { man, err := snapshot.LoadSnapshot(ctx, w.c, manifest.ID(snapshotID)) if err != nil { - return nil, errors.Wrap(err, "getting snapshot handle") + return nil, clues.Wrap(err, "getting snapshot handle").WithClues(ctx) } rootDirEntry, err := snapshotfs.SnapshotRoot(w.c, man) + if err != nil { + return nil, clues.Wrap(err, "getting root directory").WithClues(ctx) + } - return rootDirEntry, errors.Wrap(err, "getting root directory") + return rootDirEntry, nil } // getItemStream looks up the item at the given path starting from snapshotRoot. @@ -306,7 +314,7 @@ func getItemStream( bcounter ByteCounter, ) (data.Stream, error) { if itemPath == nil { - return nil, errors.WithStack(errNoRestorePath) + return nil, clues.Stack(errNoRestorePath).WithClues(ctx) } // GetNestedEntry handles nil properly. @@ -317,15 +325,15 @@ func getItemStream( ) if err != nil { if strings.Contains(err.Error(), "entry not found") { - err = errors.Wrap(data.ErrNotFound, err.Error()) + err = clues.Stack(data.ErrNotFound, err).WithClues(ctx) } - return nil, errors.Wrap(err, "getting nested object handle") + return nil, clues.Wrap(err, "getting nested object handle").WithClues(ctx) } f, ok := e.(fs.File) if !ok { - return nil, errors.New("requested object is not a file") + return nil, clues.New("requested object is not a file").WithClues(ctx) } if bcounter != nil { @@ -334,12 +342,12 @@ func getItemStream( r, err := f.Open(ctx) if err != nil { - return nil, errors.Wrap(err, "opening file") + return nil, clues.Wrap(err, "opening file").WithClues(ctx) } decodedName, err := decodeElement(f.Name()) if err != nil { - return nil, errors.Wrap(err, "decoding file name") + return nil, clues.Wrap(err, "decoding file name").WithClues(ctx) } return &kopiaDataStream{ @@ -368,12 +376,13 @@ func (w Wrapper) RestoreMultipleItems( snapshotID string, paths []path.Path, bcounter ByteCounter, + errs *fault.Errors, ) ([]data.RestoreCollection, error) { ctx, end := D.Span(ctx, "kopia:restoreMultipleItems") defer end() if len(paths) == 0 { - return nil, errors.WithStack(errNoRestorePath) + return nil, clues.Stack(errNoRestorePath).WithClues(ctx) } snapshotRoot, err := w.getSnapshotRoot(ctx, snapshotID) @@ -381,22 +390,23 @@ func (w Wrapper) RestoreMultipleItems( return nil, err } - var ( - errs *multierror.Error - // Maps short ID of parent path to data collection for that folder. - cols = map[string]*kopiaDataCollection{} - ) + // Maps short ID of parent path to data collection for that folder. + cols := map[string]*kopiaDataCollection{} for _, itemPath := range paths { + if errs.Err() != nil { + return nil, errs.Err() + } + ds, err := getItemStream(ctx, itemPath, snapshotRoot, bcounter) if err != nil { - errs = multierror.Append(errs, err) + errs.Add(err) continue } parentPath, err := itemPath.Dir() if err != nil { - errs = multierror.Append(errs, errors.Wrap(err, "making directory collection")) + errs.Add(clues.Wrap(err, "making directory collection").WithClues(ctx)) continue } @@ -414,7 +424,7 @@ func (w Wrapper) RestoreMultipleItems( res = append(res, c) } - return res, errs.ErrorOrNil() + return res, errs.Err() } // DeleteSnapshot removes the provided manifest from kopia. @@ -425,7 +435,7 @@ func (w Wrapper) DeleteSnapshot( mid := manifest.ID(snapshotID) if len(mid) == 0 { - return errors.New("attempt to delete unidentified snapshot") + return clues.New("attempt to delete unidentified snapshot").WithClues(ctx) } err := repo.WriteSession( @@ -434,7 +444,7 @@ func (w Wrapper) DeleteSnapshot( repo.WriteSessionOptions{Purpose: "KopiaWrapperBackupDeletion"}, func(innerCtx context.Context, rw repo.RepositoryWriter) error { if err := rw.DeleteManifest(ctx, mid); err != nil { - return errors.Wrap(err, "deleting snapshot") + return clues.Wrap(err, "deleting snapshot").WithClues(ctx) } return nil @@ -443,7 +453,7 @@ func (w Wrapper) DeleteSnapshot( // Telling kopia to always flush may hide other errors if it fails while // flushing the write session (hence logging above). if err != nil { - return errors.Wrap(err, "kopia deleting backup manifest") + return clues.Wrap(err, "deleting backup manifest").WithClues(ctx) } return nil @@ -464,7 +474,7 @@ func (w Wrapper) FetchPrevSnapshotManifests( tags map[string]string, ) ([]*ManifestEntry, error) { if w.c == nil { - return nil, errors.WithStack(errNotConnected) + return nil, clues.Stack(errNotConnected).WithClues(ctx) } return fetchPrevSnapshotManifests(ctx, w.c, reasons, tags), nil diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 03f42bcc3..e754ff9cf 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -19,6 +19,7 @@ import ( "github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" ) @@ -269,7 +270,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { nil, tags, true, - ) + fault.New(true)) assert.NoError(t, err) assert.Equal(t, test.expectedUploadedFiles, stats.TotalFileCount, "total files") @@ -357,7 +358,7 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { nil, tags, true, - ) + fault.New(true)) require.NoError(t, err) require.NoError(t, k.Compression(ctx, "gzip")) @@ -374,8 +375,8 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { fp1, fp2, }, - nil) - + nil, + fault.New(true)) require.NoError(t, err) assert.Equal(t, 2, len(result)) @@ -475,7 +476,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { nil, tags, true, - ) + fault.New(true)) require.NoError(t, err) assert.Equal(t, 0, stats.ErrorCount) @@ -496,7 +497,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { string(stats.SnapshotID), []path.Path{failedPath}, &ic, - ) + fault.New(true)) // Files that had an error shouldn't make a dir entry in kopia. If they do we // may run into kopia-assisted incrementals issues because only mod time and // not file size is checked for StreamingFiles. @@ -536,7 +537,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollectionsHandlesNoCollections() nil, nil, true, - ) + fault.New(true)) require.NoError(t, err) assert.Equal(t, BackupStats{}, *s) @@ -695,7 +696,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { nil, tags, false, - ) + fault.New(true)) require.NoError(t, err) require.Equal(t, stats.ErrorCount, 0) require.Equal(t, stats.TotalFileCount, expectedFiles) @@ -825,7 +826,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { excluded, tags, true, - ) + fault.New(true)) require.NoError(t, err) assert.Equal(t, test.expectedCachedItems, stats.CachedFileCount) assert.Equal(t, test.expectedUncachedItems, stats.UncachedFileCount) @@ -845,7 +846,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { suite.files[suite.testPath1.String()][0].itemPath, }, &ic, - ) + fault.New(true)) test.restoreCheck(t, err) }) } @@ -902,7 +903,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() { suite.testPath1, suite.files[suite.testPath2.String()][0].itemPath, }, - expectedCollections: 2, + expectedCollections: 0, expectedErr: assert.Error, }, { @@ -912,7 +913,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() { doesntExist, suite.files[suite.testPath2.String()][0].itemPath, }, - expectedCollections: 2, + expectedCollections: 0, expectedErr: assert.Error, }, } @@ -939,9 +940,14 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() { suite.ctx, string(suite.snapshotID), test.inputPaths, - &ic) + &ic, + fault.New(true)) test.expectedErr(t, err) + if err != nil { + return + } + assert.Len(t, result, test.expectedCollections) assert.Less(t, int64(0), ic.i) testForFiles(t, expected, result) @@ -981,7 +987,8 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems_Errors() suite.ctx, test.snapshotID, test.paths, - nil) + nil, + fault.New(true)) assert.Error(t, err) assert.Empty(t, c) }) @@ -1001,7 +1008,8 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestDeleteSnapshot() { suite.ctx, string(suite.snapshotID), []path.Path{itemPath}, - &ic) + &ic, + fault.New(true)) assert.Error(t, err, "snapshot should be deleted") assert.Empty(t, c) assert.Zero(t, ic.i) diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index a3fde4cb8..a82c0d30c 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -26,6 +26,7 @@ import ( "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" @@ -100,7 +101,7 @@ type backupStats struct { } type detailsWriter interface { - WriteBackupDetails(context.Context, *details.Details) (string, error) + WriteBackupDetails(context.Context, *details.Details, *fault.Errors) (string, error) } // --------------------------------------------------------------------------- @@ -259,7 +260,8 @@ func (op *BackupOperation) do( cs, excludes, backupID, - op.incremental && canUseMetaData) + op.incremental && canUseMetaData, + op.Errors) if err != nil { return nil, errors.Wrap(err, "persisting collection backups") } @@ -272,7 +274,8 @@ func (op *BackupOperation) do( detailsStore, mans, toMerge, - deets) + deets, + op.Errors) if err != nil { return nil, errors.Wrap(err, "merging details") } @@ -335,6 +338,7 @@ type backuper interface { excluded map[string]struct{}, tags map[string]string, buildTreeWithBase bool, + errs *fault.Errors, ) (*kopia.BackupStats, *details.Builder, map[string]path.Path, error) } @@ -393,6 +397,7 @@ func consumeBackupDataCollections( excludes map[string]struct{}, backupID model.StableID, isIncremental bool, + errs *fault.Errors, ) (*kopia.BackupStats, *details.Builder, map[string]path.Path, error) { complete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Backing up data")) defer func() { @@ -460,7 +465,8 @@ func consumeBackupDataCollections( // OneDrive replace this with `excludes`. nil, tags, - isIncremental) + isIncremental, + errs) if err != nil { if kopiaStats == nil { return nil, nil, nil, err @@ -500,6 +506,7 @@ func mergeDetails( mans []*kopia.ManifestEntry, shortRefsFromPrevBackup map[string]path.Path, deets *details.Builder, + errs *fault.Errors, ) error { // Don't bother loading any of the base details if there's nothing we need to // merge. @@ -529,7 +536,8 @@ func mergeDetails( ctx, model.StableID(bID), ms, - detailsStore) + detailsStore, + errs) if err != nil { return clues.New("fetching base details for backup").WithClues(mctx) } @@ -650,7 +658,7 @@ func (op *BackupOperation) createBackupModels( return clues.New("no backup details to record").WithClues(ctx) } - detailsID, err := detailsStore.WriteBackupDetails(ctx, backupDetails) + detailsID, err := detailsStore.WriteBackupDetails(ctx, backupDetails, op.Errors) if err != nil { return clues.Wrap(err, "creating backupDetails model").WithClues(ctx) } diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index 277e5a40d..83033ab3b 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -31,6 +31,7 @@ import ( "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/store" @@ -250,7 +251,7 @@ func checkMetadataFilesExist( pathsByRef[dir.ShortRef()] = append(pathsByRef[dir.ShortRef()], fName) } - cols, err := kw.RestoreMultipleItems(ctx, bup.SnapshotID, paths, nil) + cols, err := kw.RestoreMultipleItems(ctx, bup.SnapshotID, paths, nil, fault.New(true)) assert.NoError(t, err) for _, col := range cols { diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index 6e9afda8a..12ebcef6c 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -23,6 +23,7 @@ import ( "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/store" @@ -62,6 +63,7 @@ func (mr *mockRestorer) RestoreMultipleItems( snapshotID string, paths []path.Path, bc kopia.ByteCounter, + errs *fault.Errors, ) ([]data.RestoreCollection, error) { mr.gotPaths = append(mr.gotPaths, paths...) @@ -98,6 +100,7 @@ func (mbu mockBackuper) BackupCollections( excluded map[string]struct{}, tags map[string]string, buildTreeWithBase bool, + errs *fault.Errors, ) (*kopia.BackupStats, *details.Builder, map[string]path.Path, error) { if mbu.checkFunc != nil { mbu.checkFunc(bases, cs, tags, buildTreeWithBase) @@ -115,6 +118,7 @@ type mockDetailsReader struct { func (mdr mockDetailsReader) ReadBackupDetails( ctx context.Context, detailsID string, + errs *fault.Errors, ) (*details.Details, error) { r := mdr.entries[detailsID] @@ -578,7 +582,7 @@ func (suite *BackupOpSuite) TestBackupOperation_ConsumeBackupDataCollections_Pat nil, model.StableID(""), true, - ) + fault.New(true)) }) } } @@ -1060,7 +1064,6 @@ func (suite *BackupOpSuite) TestBackupOperation_MergeBackupDetails_AddsItems() { mdr := mockDetailsReader{entries: test.populatedDetails} w := &store.Wrapper{Storer: mockBackupStorer{entries: test.populatedModels}} - deets := details.Builder{} err := mergeDetails( @@ -1070,8 +1073,7 @@ func (suite *BackupOpSuite) TestBackupOperation_MergeBackupDetails_AddsItems() { test.inputMans, test.inputShortRefsFromPrevBackup, &deets, - ) - + fault.New(true)) test.errCheck(t, err) if err != nil { return @@ -1168,7 +1170,6 @@ func (suite *BackupOpSuite) TestBackupOperation_MergeBackupDetails_AddsFolders() mdr := mockDetailsReader{entries: populatedDetails} w := &store.Wrapper{Storer: mockBackupStorer{entries: populatedModels}} - deets := details.Builder{} err := mergeDetails( @@ -1178,8 +1179,7 @@ func (suite *BackupOpSuite) TestBackupOperation_MergeBackupDetails_AddsFolders() inputMans, inputToMerge, &deets, - ) - + fault.New(true)) assert.NoError(t, err) assert.ElementsMatch(t, expectedEntries, deets.Details().Entries) } diff --git a/src/internal/operations/common.go b/src/internal/operations/common.go index addbeb5ac..18266a734 100644 --- a/src/internal/operations/common.go +++ b/src/internal/operations/common.go @@ -8,11 +8,12 @@ import ( "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/store" ) type detailsReader interface { - ReadBackupDetails(ctx context.Context, detailsID string) (*details.Details, error) + ReadBackupDetails(ctx context.Context, detailsID string, errs *fault.Errors) (*details.Details, error) } func getBackupAndDetailsFromID( @@ -20,13 +21,14 @@ func getBackupAndDetailsFromID( backupID model.StableID, ms *store.Wrapper, detailsStore detailsReader, + errs *fault.Errors, ) (*backup.Backup, *details.Details, error) { dID, bup, err := ms.GetDetailsIDFromBackupID(ctx, backupID) if err != nil { return nil, nil, errors.Wrap(err, "getting backup details ID") } - deets, err := detailsStore.ReadBackupDetails(ctx, dID) + deets, err := detailsStore.ReadBackupDetails(ctx, dID, errs) if err != nil { return nil, nil, errors.Wrap(err, "getting backup details data") } diff --git a/src/internal/operations/manifests.go b/src/internal/operations/manifests.go index 86ad7f1b5..a1d7997ed 100644 --- a/src/internal/operations/manifests.go +++ b/src/internal/operations/manifests.go @@ -45,7 +45,7 @@ func produceManifestsAndMetadata( reasons []kopia.Reason, tenantID string, getMetadata bool, - errs fault.Adder, + errs *fault.Errors, ) ([]*kopia.ManifestEntry, []data.RestoreCollection, bool, error) { var ( metadataFiles = graph.AllMetadataFileNames() @@ -117,7 +117,7 @@ func produceManifestsAndMetadata( return ms, nil, false, nil } - colls, err := collectMetadata(mctx, mr, man, metadataFiles, tenantID) + colls, err := collectMetadata(mctx, mr, man, metadataFiles, tenantID, errs) if err != nil && !errors.Is(err, data.ErrNotFound) { // prior metadata isn't guaranteed to exist. // if it doesn't, we'll just have to do a @@ -183,6 +183,7 @@ func collectMetadata( man *kopia.ManifestEntry, fileNames []string, tenantID string, + errs *fault.Errors, ) ([]data.RestoreCollection, error) { paths := []path.Path{} @@ -206,7 +207,7 @@ func collectMetadata( } } - dcs, err := r.RestoreMultipleItems(ctx, string(man.ID), paths, nil) + dcs, err := r.RestoreMultipleItems(ctx, string(man.ID), paths, nil, errs) if err != nil { // Restore is best-effort and we want to keep it that way since we want to // return as much metadata as we can to reduce the work we'll need to do. diff --git a/src/internal/operations/manifests_test.go b/src/internal/operations/manifests_test.go index e1be7df54..d30f72726 100644 --- a/src/internal/operations/manifests_test.go +++ b/src/internal/operations/manifests_test.go @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault/mock" "github.com/alcionai/corso/src/pkg/path" ) @@ -226,7 +227,7 @@ func (suite *OperationsManifestsUnitSuite) TestCollectMetadata() { Reasons: test.reasons, } - _, err := collectMetadata(ctx, &mr, man, test.fileNames, tid) + _, err := collectMetadata(ctx, &mr, man, test.fileNames, tid, fault.New(true)) assert.ErrorIs(t, err, test.expectErr) }) } @@ -637,8 +638,6 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { ctx, flush := tester.NewContext() defer flush() - ma := mock.NewAdder() - mans, dcs, b, err := produceManifestsAndMetadata( ctx, &test.mr, @@ -646,7 +645,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { test.reasons, tid, test.getMeta, - ma) + fault.New(true)) test.assertErr(t, err) test.assertB(t, b) @@ -936,7 +935,7 @@ func (suite *BackupManifestSuite) TestBackupOperation_CollectMetadata() { mr := &mockRestorer{} - _, err := collectMetadata(ctx, mr, test.inputMan, test.inputFiles, tenant) + _, err := collectMetadata(ctx, mr, test.inputMan, test.inputFiles, tenant, fault.New(true)) assert.NoError(t, err) checkPaths(t, test.expected, mr.gotPaths) diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index d29c0bf40..51f69cbc1 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -104,6 +104,7 @@ type restorer interface { snapshotID string, paths []path.Path, bc kopia.ByteCounter, + errs *fault.Errors, ) ([]data.RestoreCollection, error) } @@ -197,7 +198,7 @@ func (op *RestoreOperation) do( op.BackupID, op.store, detailsStore, - ) + op.Errors) if err != nil { return nil, errors.Wrap(err, "getting backup and details") } @@ -228,7 +229,7 @@ func (op *RestoreOperation) do( defer closer() defer close(kopiaComplete) - dcs, err := op.kopia.RestoreMultipleItems(ctx, bup.SnapshotID, paths, opStats.bytesRead) + dcs, err := op.kopia.RestoreMultipleItems(ctx, bup.SnapshotID, paths, opStats.bytesRead, op.Errors) if err != nil { return nil, errors.Wrap(err, "retrieving collections from repository") } diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index a97cace98..a0d55136d 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/stats" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -46,6 +47,7 @@ const ( func (ss *streamStore) WriteBackupDetails( ctx context.Context, backupDetails *details.Details, + errs *fault.Errors, ) (string, error) { // construct the path of the container for the `details` item p, err := path.Builder{}. @@ -79,7 +81,8 @@ func (ss *streamStore) WriteBackupDetails( []data.BackupCollection{dc}, nil, nil, - false) + false, + errs) if err != nil { return "", errors.Wrap(err, "storing details in repository") } @@ -92,6 +95,7 @@ func (ss *streamStore) WriteBackupDetails( func (ss *streamStore) ReadBackupDetails( ctx context.Context, detailsID string, + errs *fault.Errors, ) (*details.Details, error) { // construct the path for the `details` item detailsPath, err := path.Builder{}. @@ -108,7 +112,7 @@ func (ss *streamStore) ReadBackupDetails( var bc stats.ByteCounter - dcs, err := ss.kw.RestoreMultipleItems(ctx, detailsID, []path.Path{detailsPath}, &bc) + dcs, err := ss.kw.RestoreMultipleItems(ctx, detailsID, []path.Path{detailsPath}, &bc, errs) if err != nil { return nil, errors.Wrap(err, "retrieving backup details data") } diff --git a/src/internal/streamstore/streamstore_test.go b/src/internal/streamstore/streamstore_test.go index c3259606e..cc3309a11 100644 --- a/src/internal/streamstore/streamstore_test.go +++ b/src/internal/streamstore/streamstore_test.go @@ -10,6 +10,7 @@ import ( "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -51,14 +52,13 @@ func (suite *StreamStoreIntegrationSuite) TestDetails() { }) deets := deetsBuilder.Details() - ss := New(kw, "tenant", path.ExchangeService) - id, err := ss.WriteBackupDetails(ctx, deets) + id, err := ss.WriteBackupDetails(ctx, deets, fault.New(true)) require.NoError(t, err) require.NotNil(t, id) - readDeets, err := ss.ReadBackupDetails(ctx, id) + readDeets, err := ss.ReadBackupDetails(ctx, id, fault.New(true)) require.NoError(t, err) require.NotNil(t, readDeets) diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 429b0e7a7..518685134 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -323,7 +323,8 @@ func (r repository) BackupDetails( deets, err := streamstore.New( r.dataLayer, r.Account.ID(), - b.Selector.PathService()).ReadBackupDetails(ctx, dID) + b.Selector.PathService(), + ).ReadBackupDetails(ctx, dID, errs) if err != nil { return nil, nil, errs.Fail(err) }