diff --git a/src/internal/kopia/merge_details.go b/src/internal/kopia/merge_details.go index 2ec6cc4bb..11e0a94e4 100644 --- a/src/internal/kopia/merge_details.go +++ b/src/internal/kopia/merge_details.go @@ -1,6 +1,8 @@ package kopia import ( + "time" + "github.com/alcionai/clues" "github.com/alcionai/corso/src/internal/common/prefixmatcher" @@ -12,14 +14,11 @@ type DetailsMergeInfoer interface { // ItemsToMerge returns the number of items that need to be merged. ItemsToMerge() int // GetNewPathRefs takes the old RepoRef and old LocationRef of an item and - // returns the new RepoRef, a prefix of the old LocationRef to replace, and - // the new LocationRefPrefix of the item if the item should be merged. If the + // returns the new RepoRef and the new location of the item the item. If the // item shouldn't be merged nils are returned. - // - // If the returned old LocationRef prefix is equal to the old LocationRef then - // the entire LocationRef should be replaced with the returned value. GetNewPathRefs( oldRef *path.Builder, + modTime time.Time, oldLoc details.LocationIDer, ) (path.Path, *path.Builder, error) } @@ -27,6 +26,7 @@ type DetailsMergeInfoer interface { type prevRef struct { repoRef path.Path locRef *path.Builder + modTime *time.Time } type mergeDetails struct { @@ -42,8 +42,12 @@ func (m *mergeDetails) ItemsToMerge() int { return len(m.repoRefs) } +// addRepoRef adds an entry in mergeDetails that can be looked up later. If +// modTime is non-nil then it's checked during lookup. If it is nil then the +// mod time provided during lookup is ignored. func (m *mergeDetails) addRepoRef( oldRef *path.Builder, + modTime *time.Time, newRef path.Path, newLocRef *path.Builder, ) error { @@ -58,6 +62,7 @@ func (m *mergeDetails) addRepoRef( pr := prevRef{ repoRef: newRef, locRef: newLocRef, + modTime: modTime, } m.repoRefs[oldRef.ShortRef()] = pr @@ -67,6 +72,7 @@ func (m *mergeDetails) addRepoRef( func (m *mergeDetails) GetNewPathRefs( oldRef *path.Builder, + modTime time.Time, oldLoc details.LocationIDer, ) (path.Path, *path.Builder, error) { pr, ok := m.repoRefs[oldRef.ShortRef()] @@ -74,6 +80,14 @@ func (m *mergeDetails) GetNewPathRefs( return nil, nil, nil } + // ModTimes don't match which means we're attempting to merge a different + // version of the item (i.e. an older version from an assist base). We + // shouldn't return a match because it could cause us to source out-of-date + // details for the item. + if pr.modTime != nil && !pr.modTime.Equal(modTime) { + return nil, nil, nil + } + // This was a location specified directly by a collection. if pr.locRef != nil { return pr.repoRef, pr.locRef, nil diff --git a/src/internal/kopia/merge_details_test.go b/src/internal/kopia/merge_details_test.go index 6dfee6381..ae19a8c6a 100644 --- a/src/internal/kopia/merge_details_test.go +++ b/src/internal/kopia/merge_details_test.go @@ -2,6 +2,7 @@ package kopia import ( "testing" + "time" "github.com/alcionai/clues" "github.com/stretchr/testify/assert" @@ -47,10 +48,10 @@ func (suite *DetailsMergeInfoerUnitSuite) TestAddRepoRef_DuplicateFails() { dm := newMergeDetails() - err := dm.addRepoRef(oldRef1.ToBuilder(), oldRef1, nil) + err := dm.addRepoRef(oldRef1.ToBuilder(), nil, oldRef1, nil) require.NoError(t, err, clues.ToCore(err)) - err = dm.addRepoRef(oldRef1.ToBuilder(), oldRef1, nil) + err = dm.addRepoRef(oldRef1.ToBuilder(), nil, oldRef1, nil) require.Error(t, err, clues.ToCore(err)) } @@ -58,6 +59,10 @@ func (suite *DetailsMergeInfoerUnitSuite) TestAddRepoRef_DuplicateFails() { // for stored RepoRefs. func (suite *DetailsMergeInfoerUnitSuite) TestGetNewPathRefs() { t := suite.T() + + t1 := time.Now() + t2 := t1.Add(time.Second * 30) + oldRef1 := makePath( t, []string{ @@ -110,10 +115,13 @@ func (suite *DetailsMergeInfoerUnitSuite) TestGetNewPathRefs() { dm := newMergeDetails() - err := dm.addRepoRef(oldRef1.ToBuilder(), newRef1, newLoc1) + err := dm.addRepoRef(oldRef1.ToBuilder(), &t1, newRef1, newLoc1) require.NoError(t, err, clues.ToCore(err)) - err = dm.addRepoRef(oldRef2.ToBuilder(), newRef2, nil) + err = dm.addRepoRef(oldRef2.ToBuilder(), &t2, newRef2, nil) + require.NoError(t, err, clues.ToCore(err)) + + err = dm.addRepoRef(newRef1.ToBuilder(), nil, oldRef1, oldLoc1) require.NoError(t, err, clues.ToCore(err)) // Add prefix matcher entry. @@ -121,58 +129,89 @@ func (suite *DetailsMergeInfoerUnitSuite) TestGetNewPathRefs() { require.NoError(t, err, clues.ToCore(err)) table := []struct { - name string - searchRef *path.Builder - searchLoc mockLocationIDer - errCheck require.ErrorAssertionFunc - expectedRef path.Path - expectedLoc *path.Builder + name string + searchRef *path.Builder + searchModTime time.Time + searchLoc mockLocationIDer + errCheck require.ErrorAssertionFunc + expectFound bool + expectedRef path.Path + expectedLoc *path.Builder }{ { - name: "Exact Match With Loc", - searchRef: oldRef1.ToBuilder(), - searchLoc: searchLoc1, - errCheck: require.NoError, - expectedRef: newRef1, - expectedLoc: newLoc1, + name: "Exact Match With Loc", + searchRef: oldRef1.ToBuilder(), + searchModTime: t1, + searchLoc: searchLoc1, + errCheck: require.NoError, + expectFound: true, + expectedRef: newRef1, + expectedLoc: newLoc1, }, { - name: "Exact Match Without Loc", - searchRef: oldRef1.ToBuilder(), - errCheck: require.NoError, - expectedRef: newRef1, - expectedLoc: newLoc1, + name: "Exact Match Without Loc", + searchRef: oldRef1.ToBuilder(), + searchModTime: t1, + errCheck: require.NoError, + expectFound: true, + expectedRef: newRef1, + expectedLoc: newLoc1, }, { - name: "Prefix Match", - searchRef: oldRef2.ToBuilder(), - searchLoc: searchLoc2, - errCheck: require.NoError, - expectedRef: newRef2, - expectedLoc: newLoc2, + name: "Exact Match Without Loc ModTime Not In Merger", + searchRef: newRef1.ToBuilder(), + searchModTime: time.Now(), + errCheck: require.NoError, + expectFound: true, + expectedRef: oldRef1, + expectedLoc: oldLoc1, }, { - name: "Would Be Prefix Match Without Old Loc Errors", - searchRef: oldRef2.ToBuilder(), - errCheck: require.Error, + name: "Prefix Match", + searchRef: oldRef2.ToBuilder(), + searchModTime: t2, + searchLoc: searchLoc2, + errCheck: require.NoError, + expectFound: true, + expectedRef: newRef2, + expectedLoc: newLoc2, }, { - name: "Not Found With Old Loc", - searchRef: newRef1.ToBuilder(), - searchLoc: searchLoc2, - errCheck: require.NoError, + name: "Would Be Prefix Match Without Old Loc Errors", + searchRef: oldRef2.ToBuilder(), + searchModTime: t2, + errCheck: require.Error, }, { - name: "Not Found Without Old Loc", - searchRef: newRef1.ToBuilder(), - errCheck: require.NoError, + name: "Not Found With Old Loc", + searchRef: newRef2.ToBuilder(), + searchModTime: t1, + searchLoc: searchLoc2, + errCheck: require.NoError, + }, + { + name: "Not Found Without Old Loc", + searchRef: newRef2.ToBuilder(), + searchModTime: t1, + errCheck: require.NoError, + }, + { + name: "Not Found Due To Mod Time", + searchRef: oldRef1.ToBuilder(), + searchModTime: time.Now(), + searchLoc: searchLoc1, + errCheck: require.NoError, }, } + for _, test := range table { suite.Run(test.name, func() { t := suite.T() - newRef, newLoc, err := dm.GetNewPathRefs(test.searchRef, test.searchLoc) + newRef, newLoc, err := dm.GetNewPathRefs( + test.searchRef, + test.searchModTime, + test.searchLoc) test.errCheck(t, err, clues.ToCore(err)) assert.Equal(t, test.expectedRef, newRef, "RepoRef") diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index b0dbd40e5..bc7f86deb 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -23,6 +23,7 @@ import ( "golang.org/x/exp/maps" "github.com/alcionai/corso/src/internal/common/prefixmatcher" + "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/internal/m365/graph" @@ -137,6 +138,7 @@ type itemDetails struct { prevPath path.Path locationPath *path.Builder cached bool + modTime *time.Time } type corsoProgress struct { @@ -148,9 +150,11 @@ type corsoProgress struct { snapshotfs.UploadProgress pending map[string]*itemDetails - deets *details.Builder - // toMerge represents items that we don't have in-memory item info for. The - // item info for these items should be sourced from a base snapshot later on. + // deets contains entries that are complete and don't need merged with base + // backup data at all. + deets *details.Builder + // toMerge represents items that we either don't have in-memory item info or + // that need sourced from a base backup due to caching etc. toMerge *mergeDetails mu sync.RWMutex totalBytes int64 @@ -194,6 +198,9 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) { // These items were sourced from a base snapshot or were cached in kopia so we // never had to materialize their details in-memory. + // + // TODO(ashmrtn): When we're ready to merge with cached items add cached as a + // condition here. if d.info == nil { if d.prevPath == nil { cp.errs.AddRecoverable(cp.ctx, clues.New("item sourced from previous backup with no previous path"). @@ -208,7 +215,11 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) { cp.mu.Lock() defer cp.mu.Unlock() - err := cp.toMerge.addRepoRef(d.prevPath.ToBuilder(), d.repoPath, d.locationPath) + err := cp.toMerge.addRepoRef( + d.prevPath.ToBuilder(), + d.modTime, + d.repoPath, + d.locationPath) if err != nil { cp.errs.AddRecoverable(cp.ctx, clues.Wrap(err, "adding item to merge list"). With( @@ -375,6 +386,11 @@ func collectionEntries( continue } + modTime := time.Now() + if smt, ok := e.(data.StreamModTime); ok { + modTime = smt.ModTime() + } + // Not all items implement StreamInfo. For example, the metadata files // do not because they don't contain information directly backed up or // used for restore. If progress does not contain information about a @@ -391,18 +407,22 @@ func collectionEntries( // info nil. itemInfo := ei.Info() d := &itemDetails{ - info: &itemInfo, - repoPath: itemPath, + info: &itemInfo, + repoPath: itemPath, + // Also use the current path as the previous path for this item. This + // is so that if the item is marked as cached and we need to merge + // details with an assist backup base which sourced the cached item we + // can find it with the lookup in DetailsMergeInfoer. + // + // This all works out because cached item checks in kopia are direct + // path + metadata comparisons. + prevPath: itemPath, locationPath: locationPath, + modTime: &modTime, } progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) } - modTime := time.Now() - if smt, ok := e.(data.StreamModTime); ok { - modTime = smt.ModTime() - } - entry := virtualfs.StreamingFileWithModTimeFromReader( encodedName, modTime, @@ -508,6 +528,7 @@ func streamBaseEntries( repoPath: itemPath, prevPath: prevItemPath, locationPath: locationPath, + modTime: ptr.To(entry.ModTime()), } progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) } diff --git a/src/internal/kopia/upload_test.go b/src/internal/kopia/upload_test.go index bbdbe9e6f..26c776709 100644 --- a/src/internal/kopia/upload_test.go +++ b/src/internal/kopia/upload_test.go @@ -386,7 +386,9 @@ var finishedFileTable = []struct { cachedItems func(fname string, fpath path.Path) map[string]testInfo expectedBytes int64 expectedNumEntries int - err error + // Non-folder items. + expectedNumItems int + err error }{ { name: "DetailsExist", @@ -410,6 +412,7 @@ var finishedFileTable = []struct { expectedBytes: 100, // 1 file and 5 folders. expectedNumEntries: 2, + expectedNumItems: 1, }, { name: "PendingNoDetails", @@ -453,16 +456,35 @@ var finishedFileTable = []struct { func (suite *CorsoProgressUnitSuite) TestFinishedFile() { table := []struct { - name string - cached bool + name string + cached bool + differentPrevPath bool + dropInfo bool + expectToMergeEntries bool }{ { name: "all updated", cached: false, }, { - name: "all cached", + name: "all cached from assist base", cached: true, + // TODO(ashmrtn): Update to true when we add cached items to toMerge. + expectToMergeEntries: false, + }, + { + name: "all cached from merge base", + cached: true, + differentPrevPath: true, + dropInfo: true, + expectToMergeEntries: true, + }, + { + name: "all not cached from merge base", + cached: false, + differentPrevPath: true, + dropInfo: true, + expectToMergeEntries: true, }, } @@ -480,6 +502,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() { ctx: ctx, UploadProgress: &snapshotfs.NullUploadProgress{}, deets: bd, + toMerge: newMergeDetails(), pending: map[string]*itemDetails{}, errs: fault.New(true), } @@ -487,6 +510,29 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() { ci := test.cachedItems(suite.targetFileName, suite.targetFilePath) for k, v := range ci { + if v.info != nil { + v.info.prevPath = v.info.repoPath + + if cachedTest.differentPrevPath { + // Doesn't really matter how we change the path as long as it's + // different somehow. + p, err := path.FromDataLayerPath( + suite.targetFilePath.String()+"2", + true) + require.NoError( + t, + err, + "making prevPath: %v", + clues.ToCore(err)) + + v.info.prevPath = p + } + + if cachedTest.dropInfo { + v.info.info = nil + } + } + cp.put(k, v.info) } @@ -509,6 +555,17 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() { assert.Empty(t, cp.pending) entries := bd.Details().Entries + + if cachedTest.expectToMergeEntries { + assert.Equal( + t, + test.expectedNumItems, + cp.toMerge.ItemsToMerge(), + "merge entries") + + return + } + assert.Len(t, entries, test.expectedNumEntries) for _, entry := range entries { @@ -616,7 +673,10 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileBaseItemDoesntBuildHierarch assert.Empty(t, cp.deets) for _, expected := range expectedToMerge { - gotRef, _, _ := cp.toMerge.GetNewPathRefs(expected.oldRef, nil) + gotRef, _, _ := cp.toMerge.GetNewPathRefs( + expected.oldRef, + time.Now(), + nil) if !assert.NotNil(t, gotRef) { continue } diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index ed50dd368..4d1f43324 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -532,7 +532,10 @@ func getNewPathRefs( // able to assume we always have the location in the previous entry. We'll end // up doing some extra parsing, but it will simplify this code. if repoRef.Service() == path.ExchangeService { - newPath, newLoc, err := dataFromBackup.GetNewPathRefs(repoRef.ToBuilder(), nil) + newPath, newLoc, err := dataFromBackup.GetNewPathRefs( + repoRef.ToBuilder(), + entry.Modified(), + nil) if err != nil { return nil, nil, false, clues.Wrap(err, "getting new paths") } else if newPath == nil { @@ -565,7 +568,10 @@ func getNewPathRefs( return nil, nil, false, clues.New("entry with empty LocationRef") } - newPath, newLoc, err := dataFromBackup.GetNewPathRefs(repoRef.ToBuilder(), locRef) + newPath, newLoc, err := dataFromBackup.GetNewPathRefs( + repoRef.ToBuilder(), + entry.Modified(), + locRef) if err != nil { return nil, nil, false, clues.Wrap(err, "getting new paths with old location") } else if newPath == nil { diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index a2783e92e..e6fb2cd41 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -138,6 +138,7 @@ type mockDetailsMergeInfoer struct { locs map[string]*path.Builder } +// TODO(ashmrtn): Update this to take mod time? func (m *mockDetailsMergeInfoer) add(oldRef, newRef path.Path, newLoc *path.Builder) { oldPB := oldRef.ToBuilder() // Items are indexed individually. @@ -149,6 +150,7 @@ func (m *mockDetailsMergeInfoer) add(oldRef, newRef path.Path, newLoc *path.Buil func (m *mockDetailsMergeInfoer) GetNewPathRefs( oldRef *path.Builder, + _ time.Time, _ details.LocationIDer, ) (path.Path, *path.Builder, error) { return m.repoRefs[oldRef.ShortRef()], m.locs[oldRef.ShortRef()], nil