From 975f15b42b6cda11b4949836fd59e63d61784161 Mon Sep 17 00:00:00 2001 From: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Date: Tue, 8 Aug 2023 19:04:06 -0700 Subject: [PATCH] Use modTime in details merging (#3962) Add modTime as one of the things that the DetailsMergeInfoer knows how to check against, wire it into details merging, and populate it during item upload This will help merge assist backup base items by allowing us to do a direct comparison on modTimes if we add them to the DetailsMergeInfoer during upload --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) * # #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- src/internal/kopia/merge_details.go | 24 ++++- src/internal/kopia/merge_details_test.go | 115 +++++++++++++++-------- src/internal/kopia/upload.go | 43 ++++++--- src/internal/kopia/upload_test.go | 70 +++++++++++++- src/internal/operations/backup.go | 10 +- src/internal/operations/backup_test.go | 2 + 6 files changed, 203 insertions(+), 61 deletions(-) diff --git a/src/internal/kopia/merge_details.go b/src/internal/kopia/merge_details.go index 2ec6cc4bb..11e0a94e4 100644 --- a/src/internal/kopia/merge_details.go +++ b/src/internal/kopia/merge_details.go @@ -1,6 +1,8 @@ package kopia import ( + "time" + "github.com/alcionai/clues" "github.com/alcionai/corso/src/internal/common/prefixmatcher" @@ -12,14 +14,11 @@ type DetailsMergeInfoer interface { // ItemsToMerge returns the number of items that need to be merged. ItemsToMerge() int // GetNewPathRefs takes the old RepoRef and old LocationRef of an item and - // returns the new RepoRef, a prefix of the old LocationRef to replace, and - // the new LocationRefPrefix of the item if the item should be merged. If the + // returns the new RepoRef and the new location of the item the item. If the // item shouldn't be merged nils are returned. - // - // If the returned old LocationRef prefix is equal to the old LocationRef then - // the entire LocationRef should be replaced with the returned value. GetNewPathRefs( oldRef *path.Builder, + modTime time.Time, oldLoc details.LocationIDer, ) (path.Path, *path.Builder, error) } @@ -27,6 +26,7 @@ type DetailsMergeInfoer interface { type prevRef struct { repoRef path.Path locRef *path.Builder + modTime *time.Time } type mergeDetails struct { @@ -42,8 +42,12 @@ func (m *mergeDetails) ItemsToMerge() int { return len(m.repoRefs) } +// addRepoRef adds an entry in mergeDetails that can be looked up later. If +// modTime is non-nil then it's checked during lookup. If it is nil then the +// mod time provided during lookup is ignored. func (m *mergeDetails) addRepoRef( oldRef *path.Builder, + modTime *time.Time, newRef path.Path, newLocRef *path.Builder, ) error { @@ -58,6 +62,7 @@ func (m *mergeDetails) addRepoRef( pr := prevRef{ repoRef: newRef, locRef: newLocRef, + modTime: modTime, } m.repoRefs[oldRef.ShortRef()] = pr @@ -67,6 +72,7 @@ func (m *mergeDetails) addRepoRef( func (m *mergeDetails) GetNewPathRefs( oldRef *path.Builder, + modTime time.Time, oldLoc details.LocationIDer, ) (path.Path, *path.Builder, error) { pr, ok := m.repoRefs[oldRef.ShortRef()] @@ -74,6 +80,14 @@ func (m *mergeDetails) GetNewPathRefs( return nil, nil, nil } + // ModTimes don't match which means we're attempting to merge a different + // version of the item (i.e. an older version from an assist base). We + // shouldn't return a match because it could cause us to source out-of-date + // details for the item. + if pr.modTime != nil && !pr.modTime.Equal(modTime) { + return nil, nil, nil + } + // This was a location specified directly by a collection. if pr.locRef != nil { return pr.repoRef, pr.locRef, nil diff --git a/src/internal/kopia/merge_details_test.go b/src/internal/kopia/merge_details_test.go index 6dfee6381..ae19a8c6a 100644 --- a/src/internal/kopia/merge_details_test.go +++ b/src/internal/kopia/merge_details_test.go @@ -2,6 +2,7 @@ package kopia import ( "testing" + "time" "github.com/alcionai/clues" "github.com/stretchr/testify/assert" @@ -47,10 +48,10 @@ func (suite *DetailsMergeInfoerUnitSuite) TestAddRepoRef_DuplicateFails() { dm := newMergeDetails() - err := dm.addRepoRef(oldRef1.ToBuilder(), oldRef1, nil) + err := dm.addRepoRef(oldRef1.ToBuilder(), nil, oldRef1, nil) require.NoError(t, err, clues.ToCore(err)) - err = dm.addRepoRef(oldRef1.ToBuilder(), oldRef1, nil) + err = dm.addRepoRef(oldRef1.ToBuilder(), nil, oldRef1, nil) require.Error(t, err, clues.ToCore(err)) } @@ -58,6 +59,10 @@ func (suite *DetailsMergeInfoerUnitSuite) TestAddRepoRef_DuplicateFails() { // for stored RepoRefs. func (suite *DetailsMergeInfoerUnitSuite) TestGetNewPathRefs() { t := suite.T() + + t1 := time.Now() + t2 := t1.Add(time.Second * 30) + oldRef1 := makePath( t, []string{ @@ -110,10 +115,13 @@ func (suite *DetailsMergeInfoerUnitSuite) TestGetNewPathRefs() { dm := newMergeDetails() - err := dm.addRepoRef(oldRef1.ToBuilder(), newRef1, newLoc1) + err := dm.addRepoRef(oldRef1.ToBuilder(), &t1, newRef1, newLoc1) require.NoError(t, err, clues.ToCore(err)) - err = dm.addRepoRef(oldRef2.ToBuilder(), newRef2, nil) + err = dm.addRepoRef(oldRef2.ToBuilder(), &t2, newRef2, nil) + require.NoError(t, err, clues.ToCore(err)) + + err = dm.addRepoRef(newRef1.ToBuilder(), nil, oldRef1, oldLoc1) require.NoError(t, err, clues.ToCore(err)) // Add prefix matcher entry. @@ -121,58 +129,89 @@ func (suite *DetailsMergeInfoerUnitSuite) TestGetNewPathRefs() { require.NoError(t, err, clues.ToCore(err)) table := []struct { - name string - searchRef *path.Builder - searchLoc mockLocationIDer - errCheck require.ErrorAssertionFunc - expectedRef path.Path - expectedLoc *path.Builder + name string + searchRef *path.Builder + searchModTime time.Time + searchLoc mockLocationIDer + errCheck require.ErrorAssertionFunc + expectFound bool + expectedRef path.Path + expectedLoc *path.Builder }{ { - name: "Exact Match With Loc", - searchRef: oldRef1.ToBuilder(), - searchLoc: searchLoc1, - errCheck: require.NoError, - expectedRef: newRef1, - expectedLoc: newLoc1, + name: "Exact Match With Loc", + searchRef: oldRef1.ToBuilder(), + searchModTime: t1, + searchLoc: searchLoc1, + errCheck: require.NoError, + expectFound: true, + expectedRef: newRef1, + expectedLoc: newLoc1, }, { - name: "Exact Match Without Loc", - searchRef: oldRef1.ToBuilder(), - errCheck: require.NoError, - expectedRef: newRef1, - expectedLoc: newLoc1, + name: "Exact Match Without Loc", + searchRef: oldRef1.ToBuilder(), + searchModTime: t1, + errCheck: require.NoError, + expectFound: true, + expectedRef: newRef1, + expectedLoc: newLoc1, }, { - name: "Prefix Match", - searchRef: oldRef2.ToBuilder(), - searchLoc: searchLoc2, - errCheck: require.NoError, - expectedRef: newRef2, - expectedLoc: newLoc2, + name: "Exact Match Without Loc ModTime Not In Merger", + searchRef: newRef1.ToBuilder(), + searchModTime: time.Now(), + errCheck: require.NoError, + expectFound: true, + expectedRef: oldRef1, + expectedLoc: oldLoc1, }, { - name: "Would Be Prefix Match Without Old Loc Errors", - searchRef: oldRef2.ToBuilder(), - errCheck: require.Error, + name: "Prefix Match", + searchRef: oldRef2.ToBuilder(), + searchModTime: t2, + searchLoc: searchLoc2, + errCheck: require.NoError, + expectFound: true, + expectedRef: newRef2, + expectedLoc: newLoc2, }, { - name: "Not Found With Old Loc", - searchRef: newRef1.ToBuilder(), - searchLoc: searchLoc2, - errCheck: require.NoError, + name: "Would Be Prefix Match Without Old Loc Errors", + searchRef: oldRef2.ToBuilder(), + searchModTime: t2, + errCheck: require.Error, }, { - name: "Not Found Without Old Loc", - searchRef: newRef1.ToBuilder(), - errCheck: require.NoError, + name: "Not Found With Old Loc", + searchRef: newRef2.ToBuilder(), + searchModTime: t1, + searchLoc: searchLoc2, + errCheck: require.NoError, + }, + { + name: "Not Found Without Old Loc", + searchRef: newRef2.ToBuilder(), + searchModTime: t1, + errCheck: require.NoError, + }, + { + name: "Not Found Due To Mod Time", + searchRef: oldRef1.ToBuilder(), + searchModTime: time.Now(), + searchLoc: searchLoc1, + errCheck: require.NoError, }, } + for _, test := range table { suite.Run(test.name, func() { t := suite.T() - newRef, newLoc, err := dm.GetNewPathRefs(test.searchRef, test.searchLoc) + newRef, newLoc, err := dm.GetNewPathRefs( + test.searchRef, + test.searchModTime, + test.searchLoc) test.errCheck(t, err, clues.ToCore(err)) assert.Equal(t, test.expectedRef, newRef, "RepoRef") diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index b0dbd40e5..bc7f86deb 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -23,6 +23,7 @@ import ( "golang.org/x/exp/maps" "github.com/alcionai/corso/src/internal/common/prefixmatcher" + "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/internal/m365/graph" @@ -137,6 +138,7 @@ type itemDetails struct { prevPath path.Path locationPath *path.Builder cached bool + modTime *time.Time } type corsoProgress struct { @@ -148,9 +150,11 @@ type corsoProgress struct { snapshotfs.UploadProgress pending map[string]*itemDetails - deets *details.Builder - // toMerge represents items that we don't have in-memory item info for. The - // item info for these items should be sourced from a base snapshot later on. + // deets contains entries that are complete and don't need merged with base + // backup data at all. + deets *details.Builder + // toMerge represents items that we either don't have in-memory item info or + // that need sourced from a base backup due to caching etc. toMerge *mergeDetails mu sync.RWMutex totalBytes int64 @@ -194,6 +198,9 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) { // These items were sourced from a base snapshot or were cached in kopia so we // never had to materialize their details in-memory. + // + // TODO(ashmrtn): When we're ready to merge with cached items add cached as a + // condition here. if d.info == nil { if d.prevPath == nil { cp.errs.AddRecoverable(cp.ctx, clues.New("item sourced from previous backup with no previous path"). @@ -208,7 +215,11 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) { cp.mu.Lock() defer cp.mu.Unlock() - err := cp.toMerge.addRepoRef(d.prevPath.ToBuilder(), d.repoPath, d.locationPath) + err := cp.toMerge.addRepoRef( + d.prevPath.ToBuilder(), + d.modTime, + d.repoPath, + d.locationPath) if err != nil { cp.errs.AddRecoverable(cp.ctx, clues.Wrap(err, "adding item to merge list"). With( @@ -375,6 +386,11 @@ func collectionEntries( continue } + modTime := time.Now() + if smt, ok := e.(data.StreamModTime); ok { + modTime = smt.ModTime() + } + // Not all items implement StreamInfo. For example, the metadata files // do not because they don't contain information directly backed up or // used for restore. If progress does not contain information about a @@ -391,18 +407,22 @@ func collectionEntries( // info nil. itemInfo := ei.Info() d := &itemDetails{ - info: &itemInfo, - repoPath: itemPath, + info: &itemInfo, + repoPath: itemPath, + // Also use the current path as the previous path for this item. This + // is so that if the item is marked as cached and we need to merge + // details with an assist backup base which sourced the cached item we + // can find it with the lookup in DetailsMergeInfoer. + // + // This all works out because cached item checks in kopia are direct + // path + metadata comparisons. + prevPath: itemPath, locationPath: locationPath, + modTime: &modTime, } progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) } - modTime := time.Now() - if smt, ok := e.(data.StreamModTime); ok { - modTime = smt.ModTime() - } - entry := virtualfs.StreamingFileWithModTimeFromReader( encodedName, modTime, @@ -508,6 +528,7 @@ func streamBaseEntries( repoPath: itemPath, prevPath: prevItemPath, locationPath: locationPath, + modTime: ptr.To(entry.ModTime()), } progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) } diff --git a/src/internal/kopia/upload_test.go b/src/internal/kopia/upload_test.go index bbdbe9e6f..26c776709 100644 --- a/src/internal/kopia/upload_test.go +++ b/src/internal/kopia/upload_test.go @@ -386,7 +386,9 @@ var finishedFileTable = []struct { cachedItems func(fname string, fpath path.Path) map[string]testInfo expectedBytes int64 expectedNumEntries int - err error + // Non-folder items. + expectedNumItems int + err error }{ { name: "DetailsExist", @@ -410,6 +412,7 @@ var finishedFileTable = []struct { expectedBytes: 100, // 1 file and 5 folders. expectedNumEntries: 2, + expectedNumItems: 1, }, { name: "PendingNoDetails", @@ -453,16 +456,35 @@ var finishedFileTable = []struct { func (suite *CorsoProgressUnitSuite) TestFinishedFile() { table := []struct { - name string - cached bool + name string + cached bool + differentPrevPath bool + dropInfo bool + expectToMergeEntries bool }{ { name: "all updated", cached: false, }, { - name: "all cached", + name: "all cached from assist base", cached: true, + // TODO(ashmrtn): Update to true when we add cached items to toMerge. + expectToMergeEntries: false, + }, + { + name: "all cached from merge base", + cached: true, + differentPrevPath: true, + dropInfo: true, + expectToMergeEntries: true, + }, + { + name: "all not cached from merge base", + cached: false, + differentPrevPath: true, + dropInfo: true, + expectToMergeEntries: true, }, } @@ -480,6 +502,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() { ctx: ctx, UploadProgress: &snapshotfs.NullUploadProgress{}, deets: bd, + toMerge: newMergeDetails(), pending: map[string]*itemDetails{}, errs: fault.New(true), } @@ -487,6 +510,29 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() { ci := test.cachedItems(suite.targetFileName, suite.targetFilePath) for k, v := range ci { + if v.info != nil { + v.info.prevPath = v.info.repoPath + + if cachedTest.differentPrevPath { + // Doesn't really matter how we change the path as long as it's + // different somehow. + p, err := path.FromDataLayerPath( + suite.targetFilePath.String()+"2", + true) + require.NoError( + t, + err, + "making prevPath: %v", + clues.ToCore(err)) + + v.info.prevPath = p + } + + if cachedTest.dropInfo { + v.info.info = nil + } + } + cp.put(k, v.info) } @@ -509,6 +555,17 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() { assert.Empty(t, cp.pending) entries := bd.Details().Entries + + if cachedTest.expectToMergeEntries { + assert.Equal( + t, + test.expectedNumItems, + cp.toMerge.ItemsToMerge(), + "merge entries") + + return + } + assert.Len(t, entries, test.expectedNumEntries) for _, entry := range entries { @@ -616,7 +673,10 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileBaseItemDoesntBuildHierarch assert.Empty(t, cp.deets) for _, expected := range expectedToMerge { - gotRef, _, _ := cp.toMerge.GetNewPathRefs(expected.oldRef, nil) + gotRef, _, _ := cp.toMerge.GetNewPathRefs( + expected.oldRef, + time.Now(), + nil) if !assert.NotNil(t, gotRef) { continue } diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index ed50dd368..4d1f43324 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -532,7 +532,10 @@ func getNewPathRefs( // able to assume we always have the location in the previous entry. We'll end // up doing some extra parsing, but it will simplify this code. if repoRef.Service() == path.ExchangeService { - newPath, newLoc, err := dataFromBackup.GetNewPathRefs(repoRef.ToBuilder(), nil) + newPath, newLoc, err := dataFromBackup.GetNewPathRefs( + repoRef.ToBuilder(), + entry.Modified(), + nil) if err != nil { return nil, nil, false, clues.Wrap(err, "getting new paths") } else if newPath == nil { @@ -565,7 +568,10 @@ func getNewPathRefs( return nil, nil, false, clues.New("entry with empty LocationRef") } - newPath, newLoc, err := dataFromBackup.GetNewPathRefs(repoRef.ToBuilder(), locRef) + newPath, newLoc, err := dataFromBackup.GetNewPathRefs( + repoRef.ToBuilder(), + entry.Modified(), + locRef) if err != nil { return nil, nil, false, clues.Wrap(err, "getting new paths with old location") } else if newPath == nil { diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index a2783e92e..e6fb2cd41 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -138,6 +138,7 @@ type mockDetailsMergeInfoer struct { locs map[string]*path.Builder } +// TODO(ashmrtn): Update this to take mod time? func (m *mockDetailsMergeInfoer) add(oldRef, newRef path.Path, newLoc *path.Builder) { oldPB := oldRef.ToBuilder() // Items are indexed individually. @@ -149,6 +150,7 @@ func (m *mockDetailsMergeInfoer) add(oldRef, newRef path.Path, newLoc *path.Buil func (m *mockDetailsMergeInfoer) GetNewPathRefs( oldRef *path.Builder, + _ time.Time, _ details.LocationIDer, ) (path.Path, *path.Builder, error) { return m.repoRefs[oldRef.ShortRef()], m.locs[oldRef.ShortRef()], nil