Use modTime in details merging (#3962)

Add modTime as one of the things that the
DetailsMergeInfoer knows how to check against,
wire it into details merging, and populate it
during item upload

This will help merge assist backup base items
by allowing us to do a direct comparison on
modTimes if we add them to the
DetailsMergeInfoer during upload

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2023-08-08 19:04:06 -07:00 committed by GitHub
parent 9ec638f763
commit 975f15b42b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 203 additions and 61 deletions

View File

@ -1,6 +1,8 @@
package kopia package kopia
import ( import (
"time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/common/prefixmatcher" "github.com/alcionai/corso/src/internal/common/prefixmatcher"
@ -12,14 +14,11 @@ type DetailsMergeInfoer interface {
// ItemsToMerge returns the number of items that need to be merged. // ItemsToMerge returns the number of items that need to be merged.
ItemsToMerge() int ItemsToMerge() int
// GetNewPathRefs takes the old RepoRef and old LocationRef of an item and // GetNewPathRefs takes the old RepoRef and old LocationRef of an item and
// returns the new RepoRef, a prefix of the old LocationRef to replace, and // returns the new RepoRef and the new location of the item the item. If the
// the new LocationRefPrefix of the item if the item should be merged. If the
// item shouldn't be merged nils are returned. // item shouldn't be merged nils are returned.
//
// If the returned old LocationRef prefix is equal to the old LocationRef then
// the entire LocationRef should be replaced with the returned value.
GetNewPathRefs( GetNewPathRefs(
oldRef *path.Builder, oldRef *path.Builder,
modTime time.Time,
oldLoc details.LocationIDer, oldLoc details.LocationIDer,
) (path.Path, *path.Builder, error) ) (path.Path, *path.Builder, error)
} }
@ -27,6 +26,7 @@ type DetailsMergeInfoer interface {
type prevRef struct { type prevRef struct {
repoRef path.Path repoRef path.Path
locRef *path.Builder locRef *path.Builder
modTime *time.Time
} }
type mergeDetails struct { type mergeDetails struct {
@ -42,8 +42,12 @@ func (m *mergeDetails) ItemsToMerge() int {
return len(m.repoRefs) return len(m.repoRefs)
} }
// addRepoRef adds an entry in mergeDetails that can be looked up later. If
// modTime is non-nil then it's checked during lookup. If it is nil then the
// mod time provided during lookup is ignored.
func (m *mergeDetails) addRepoRef( func (m *mergeDetails) addRepoRef(
oldRef *path.Builder, oldRef *path.Builder,
modTime *time.Time,
newRef path.Path, newRef path.Path,
newLocRef *path.Builder, newLocRef *path.Builder,
) error { ) error {
@ -58,6 +62,7 @@ func (m *mergeDetails) addRepoRef(
pr := prevRef{ pr := prevRef{
repoRef: newRef, repoRef: newRef,
locRef: newLocRef, locRef: newLocRef,
modTime: modTime,
} }
m.repoRefs[oldRef.ShortRef()] = pr m.repoRefs[oldRef.ShortRef()] = pr
@ -67,6 +72,7 @@ func (m *mergeDetails) addRepoRef(
func (m *mergeDetails) GetNewPathRefs( func (m *mergeDetails) GetNewPathRefs(
oldRef *path.Builder, oldRef *path.Builder,
modTime time.Time,
oldLoc details.LocationIDer, oldLoc details.LocationIDer,
) (path.Path, *path.Builder, error) { ) (path.Path, *path.Builder, error) {
pr, ok := m.repoRefs[oldRef.ShortRef()] pr, ok := m.repoRefs[oldRef.ShortRef()]
@ -74,6 +80,14 @@ func (m *mergeDetails) GetNewPathRefs(
return nil, nil, nil return nil, nil, nil
} }
// ModTimes don't match which means we're attempting to merge a different
// version of the item (i.e. an older version from an assist base). We
// shouldn't return a match because it could cause us to source out-of-date
// details for the item.
if pr.modTime != nil && !pr.modTime.Equal(modTime) {
return nil, nil, nil
}
// This was a location specified directly by a collection. // This was a location specified directly by a collection.
if pr.locRef != nil { if pr.locRef != nil {
return pr.repoRef, pr.locRef, nil return pr.repoRef, pr.locRef, nil

View File

@ -2,6 +2,7 @@ package kopia
import ( import (
"testing" "testing"
"time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -47,10 +48,10 @@ func (suite *DetailsMergeInfoerUnitSuite) TestAddRepoRef_DuplicateFails() {
dm := newMergeDetails() dm := newMergeDetails()
err := dm.addRepoRef(oldRef1.ToBuilder(), oldRef1, nil) err := dm.addRepoRef(oldRef1.ToBuilder(), nil, oldRef1, nil)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
err = dm.addRepoRef(oldRef1.ToBuilder(), oldRef1, nil) err = dm.addRepoRef(oldRef1.ToBuilder(), nil, oldRef1, nil)
require.Error(t, err, clues.ToCore(err)) require.Error(t, err, clues.ToCore(err))
} }
@ -58,6 +59,10 @@ func (suite *DetailsMergeInfoerUnitSuite) TestAddRepoRef_DuplicateFails() {
// for stored RepoRefs. // for stored RepoRefs.
func (suite *DetailsMergeInfoerUnitSuite) TestGetNewPathRefs() { func (suite *DetailsMergeInfoerUnitSuite) TestGetNewPathRefs() {
t := suite.T() t := suite.T()
t1 := time.Now()
t2 := t1.Add(time.Second * 30)
oldRef1 := makePath( oldRef1 := makePath(
t, t,
[]string{ []string{
@ -110,10 +115,13 @@ func (suite *DetailsMergeInfoerUnitSuite) TestGetNewPathRefs() {
dm := newMergeDetails() dm := newMergeDetails()
err := dm.addRepoRef(oldRef1.ToBuilder(), newRef1, newLoc1) err := dm.addRepoRef(oldRef1.ToBuilder(), &t1, newRef1, newLoc1)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
err = dm.addRepoRef(oldRef2.ToBuilder(), newRef2, nil) err = dm.addRepoRef(oldRef2.ToBuilder(), &t2, newRef2, nil)
require.NoError(t, err, clues.ToCore(err))
err = dm.addRepoRef(newRef1.ToBuilder(), nil, oldRef1, oldLoc1)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
// Add prefix matcher entry. // Add prefix matcher entry.
@ -121,58 +129,89 @@ func (suite *DetailsMergeInfoerUnitSuite) TestGetNewPathRefs() {
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
table := []struct { table := []struct {
name string name string
searchRef *path.Builder searchRef *path.Builder
searchLoc mockLocationIDer searchModTime time.Time
errCheck require.ErrorAssertionFunc searchLoc mockLocationIDer
expectedRef path.Path errCheck require.ErrorAssertionFunc
expectedLoc *path.Builder expectFound bool
expectedRef path.Path
expectedLoc *path.Builder
}{ }{
{ {
name: "Exact Match With Loc", name: "Exact Match With Loc",
searchRef: oldRef1.ToBuilder(), searchRef: oldRef1.ToBuilder(),
searchLoc: searchLoc1, searchModTime: t1,
errCheck: require.NoError, searchLoc: searchLoc1,
expectedRef: newRef1, errCheck: require.NoError,
expectedLoc: newLoc1, expectFound: true,
expectedRef: newRef1,
expectedLoc: newLoc1,
}, },
{ {
name: "Exact Match Without Loc", name: "Exact Match Without Loc",
searchRef: oldRef1.ToBuilder(), searchRef: oldRef1.ToBuilder(),
errCheck: require.NoError, searchModTime: t1,
expectedRef: newRef1, errCheck: require.NoError,
expectedLoc: newLoc1, expectFound: true,
expectedRef: newRef1,
expectedLoc: newLoc1,
}, },
{ {
name: "Prefix Match", name: "Exact Match Without Loc ModTime Not In Merger",
searchRef: oldRef2.ToBuilder(), searchRef: newRef1.ToBuilder(),
searchLoc: searchLoc2, searchModTime: time.Now(),
errCheck: require.NoError, errCheck: require.NoError,
expectedRef: newRef2, expectFound: true,
expectedLoc: newLoc2, expectedRef: oldRef1,
expectedLoc: oldLoc1,
}, },
{ {
name: "Would Be Prefix Match Without Old Loc Errors", name: "Prefix Match",
searchRef: oldRef2.ToBuilder(), searchRef: oldRef2.ToBuilder(),
errCheck: require.Error, searchModTime: t2,
searchLoc: searchLoc2,
errCheck: require.NoError,
expectFound: true,
expectedRef: newRef2,
expectedLoc: newLoc2,
}, },
{ {
name: "Not Found With Old Loc", name: "Would Be Prefix Match Without Old Loc Errors",
searchRef: newRef1.ToBuilder(), searchRef: oldRef2.ToBuilder(),
searchLoc: searchLoc2, searchModTime: t2,
errCheck: require.NoError, errCheck: require.Error,
}, },
{ {
name: "Not Found Without Old Loc", name: "Not Found With Old Loc",
searchRef: newRef1.ToBuilder(), searchRef: newRef2.ToBuilder(),
errCheck: require.NoError, searchModTime: t1,
searchLoc: searchLoc2,
errCheck: require.NoError,
},
{
name: "Not Found Without Old Loc",
searchRef: newRef2.ToBuilder(),
searchModTime: t1,
errCheck: require.NoError,
},
{
name: "Not Found Due To Mod Time",
searchRef: oldRef1.ToBuilder(),
searchModTime: time.Now(),
searchLoc: searchLoc1,
errCheck: require.NoError,
}, },
} }
for _, test := range table { for _, test := range table {
suite.Run(test.name, func() { suite.Run(test.name, func() {
t := suite.T() t := suite.T()
newRef, newLoc, err := dm.GetNewPathRefs(test.searchRef, test.searchLoc) newRef, newLoc, err := dm.GetNewPathRefs(
test.searchRef,
test.searchModTime,
test.searchLoc)
test.errCheck(t, err, clues.ToCore(err)) test.errCheck(t, err, clues.ToCore(err))
assert.Equal(t, test.expectedRef, newRef, "RepoRef") assert.Equal(t, test.expectedRef, newRef, "RepoRef")

View File

@ -23,6 +23,7 @@ import (
"golang.org/x/exp/maps" "golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/common/prefixmatcher" "github.com/alcionai/corso/src/internal/common/prefixmatcher"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/graph"
@ -137,6 +138,7 @@ type itemDetails struct {
prevPath path.Path prevPath path.Path
locationPath *path.Builder locationPath *path.Builder
cached bool cached bool
modTime *time.Time
} }
type corsoProgress struct { type corsoProgress struct {
@ -148,9 +150,11 @@ type corsoProgress struct {
snapshotfs.UploadProgress snapshotfs.UploadProgress
pending map[string]*itemDetails pending map[string]*itemDetails
deets *details.Builder // deets contains entries that are complete and don't need merged with base
// toMerge represents items that we don't have in-memory item info for. The // backup data at all.
// item info for these items should be sourced from a base snapshot later on. deets *details.Builder
// toMerge represents items that we either don't have in-memory item info or
// that need sourced from a base backup due to caching etc.
toMerge *mergeDetails toMerge *mergeDetails
mu sync.RWMutex mu sync.RWMutex
totalBytes int64 totalBytes int64
@ -194,6 +198,9 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
// These items were sourced from a base snapshot or were cached in kopia so we // These items were sourced from a base snapshot or were cached in kopia so we
// never had to materialize their details in-memory. // never had to materialize their details in-memory.
//
// TODO(ashmrtn): When we're ready to merge with cached items add cached as a
// condition here.
if d.info == nil { if d.info == nil {
if d.prevPath == nil { if d.prevPath == nil {
cp.errs.AddRecoverable(cp.ctx, clues.New("item sourced from previous backup with no previous path"). cp.errs.AddRecoverable(cp.ctx, clues.New("item sourced from previous backup with no previous path").
@ -208,7 +215,11 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
cp.mu.Lock() cp.mu.Lock()
defer cp.mu.Unlock() defer cp.mu.Unlock()
err := cp.toMerge.addRepoRef(d.prevPath.ToBuilder(), d.repoPath, d.locationPath) err := cp.toMerge.addRepoRef(
d.prevPath.ToBuilder(),
d.modTime,
d.repoPath,
d.locationPath)
if err != nil { if err != nil {
cp.errs.AddRecoverable(cp.ctx, clues.Wrap(err, "adding item to merge list"). cp.errs.AddRecoverable(cp.ctx, clues.Wrap(err, "adding item to merge list").
With( With(
@ -375,6 +386,11 @@ func collectionEntries(
continue continue
} }
modTime := time.Now()
if smt, ok := e.(data.StreamModTime); ok {
modTime = smt.ModTime()
}
// Not all items implement StreamInfo. For example, the metadata files // Not all items implement StreamInfo. For example, the metadata files
// do not because they don't contain information directly backed up or // do not because they don't contain information directly backed up or
// used for restore. If progress does not contain information about a // used for restore. If progress does not contain information about a
@ -391,18 +407,22 @@ func collectionEntries(
// info nil. // info nil.
itemInfo := ei.Info() itemInfo := ei.Info()
d := &itemDetails{ d := &itemDetails{
info: &itemInfo, info: &itemInfo,
repoPath: itemPath, repoPath: itemPath,
// Also use the current path as the previous path for this item. This
// is so that if the item is marked as cached and we need to merge
// details with an assist backup base which sourced the cached item we
// can find it with the lookup in DetailsMergeInfoer.
//
// This all works out because cached item checks in kopia are direct
// path + metadata comparisons.
prevPath: itemPath,
locationPath: locationPath, locationPath: locationPath,
modTime: &modTime,
} }
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
} }
modTime := time.Now()
if smt, ok := e.(data.StreamModTime); ok {
modTime = smt.ModTime()
}
entry := virtualfs.StreamingFileWithModTimeFromReader( entry := virtualfs.StreamingFileWithModTimeFromReader(
encodedName, encodedName,
modTime, modTime,
@ -508,6 +528,7 @@ func streamBaseEntries(
repoPath: itemPath, repoPath: itemPath,
prevPath: prevItemPath, prevPath: prevItemPath,
locationPath: locationPath, locationPath: locationPath,
modTime: ptr.To(entry.ModTime()),
} }
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
} }

View File

@ -386,7 +386,9 @@ var finishedFileTable = []struct {
cachedItems func(fname string, fpath path.Path) map[string]testInfo cachedItems func(fname string, fpath path.Path) map[string]testInfo
expectedBytes int64 expectedBytes int64
expectedNumEntries int expectedNumEntries int
err error // Non-folder items.
expectedNumItems int
err error
}{ }{
{ {
name: "DetailsExist", name: "DetailsExist",
@ -410,6 +412,7 @@ var finishedFileTable = []struct {
expectedBytes: 100, expectedBytes: 100,
// 1 file and 5 folders. // 1 file and 5 folders.
expectedNumEntries: 2, expectedNumEntries: 2,
expectedNumItems: 1,
}, },
{ {
name: "PendingNoDetails", name: "PendingNoDetails",
@ -453,16 +456,35 @@ var finishedFileTable = []struct {
func (suite *CorsoProgressUnitSuite) TestFinishedFile() { func (suite *CorsoProgressUnitSuite) TestFinishedFile() {
table := []struct { table := []struct {
name string name string
cached bool cached bool
differentPrevPath bool
dropInfo bool
expectToMergeEntries bool
}{ }{
{ {
name: "all updated", name: "all updated",
cached: false, cached: false,
}, },
{ {
name: "all cached", name: "all cached from assist base",
cached: true, cached: true,
// TODO(ashmrtn): Update to true when we add cached items to toMerge.
expectToMergeEntries: false,
},
{
name: "all cached from merge base",
cached: true,
differentPrevPath: true,
dropInfo: true,
expectToMergeEntries: true,
},
{
name: "all not cached from merge base",
cached: false,
differentPrevPath: true,
dropInfo: true,
expectToMergeEntries: true,
}, },
} }
@ -480,6 +502,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() {
ctx: ctx, ctx: ctx,
UploadProgress: &snapshotfs.NullUploadProgress{}, UploadProgress: &snapshotfs.NullUploadProgress{},
deets: bd, deets: bd,
toMerge: newMergeDetails(),
pending: map[string]*itemDetails{}, pending: map[string]*itemDetails{},
errs: fault.New(true), errs: fault.New(true),
} }
@ -487,6 +510,29 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() {
ci := test.cachedItems(suite.targetFileName, suite.targetFilePath) ci := test.cachedItems(suite.targetFileName, suite.targetFilePath)
for k, v := range ci { for k, v := range ci {
if v.info != nil {
v.info.prevPath = v.info.repoPath
if cachedTest.differentPrevPath {
// Doesn't really matter how we change the path as long as it's
// different somehow.
p, err := path.FromDataLayerPath(
suite.targetFilePath.String()+"2",
true)
require.NoError(
t,
err,
"making prevPath: %v",
clues.ToCore(err))
v.info.prevPath = p
}
if cachedTest.dropInfo {
v.info.info = nil
}
}
cp.put(k, v.info) cp.put(k, v.info)
} }
@ -509,6 +555,17 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() {
assert.Empty(t, cp.pending) assert.Empty(t, cp.pending)
entries := bd.Details().Entries entries := bd.Details().Entries
if cachedTest.expectToMergeEntries {
assert.Equal(
t,
test.expectedNumItems,
cp.toMerge.ItemsToMerge(),
"merge entries")
return
}
assert.Len(t, entries, test.expectedNumEntries) assert.Len(t, entries, test.expectedNumEntries)
for _, entry := range entries { for _, entry := range entries {
@ -616,7 +673,10 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileBaseItemDoesntBuildHierarch
assert.Empty(t, cp.deets) assert.Empty(t, cp.deets)
for _, expected := range expectedToMerge { for _, expected := range expectedToMerge {
gotRef, _, _ := cp.toMerge.GetNewPathRefs(expected.oldRef, nil) gotRef, _, _ := cp.toMerge.GetNewPathRefs(
expected.oldRef,
time.Now(),
nil)
if !assert.NotNil(t, gotRef) { if !assert.NotNil(t, gotRef) {
continue continue
} }

View File

@ -532,7 +532,10 @@ func getNewPathRefs(
// able to assume we always have the location in the previous entry. We'll end // able to assume we always have the location in the previous entry. We'll end
// up doing some extra parsing, but it will simplify this code. // up doing some extra parsing, but it will simplify this code.
if repoRef.Service() == path.ExchangeService { if repoRef.Service() == path.ExchangeService {
newPath, newLoc, err := dataFromBackup.GetNewPathRefs(repoRef.ToBuilder(), nil) newPath, newLoc, err := dataFromBackup.GetNewPathRefs(
repoRef.ToBuilder(),
entry.Modified(),
nil)
if err != nil { if err != nil {
return nil, nil, false, clues.Wrap(err, "getting new paths") return nil, nil, false, clues.Wrap(err, "getting new paths")
} else if newPath == nil { } else if newPath == nil {
@ -565,7 +568,10 @@ func getNewPathRefs(
return nil, nil, false, clues.New("entry with empty LocationRef") return nil, nil, false, clues.New("entry with empty LocationRef")
} }
newPath, newLoc, err := dataFromBackup.GetNewPathRefs(repoRef.ToBuilder(), locRef) newPath, newLoc, err := dataFromBackup.GetNewPathRefs(
repoRef.ToBuilder(),
entry.Modified(),
locRef)
if err != nil { if err != nil {
return nil, nil, false, clues.Wrap(err, "getting new paths with old location") return nil, nil, false, clues.Wrap(err, "getting new paths with old location")
} else if newPath == nil { } else if newPath == nil {

View File

@ -138,6 +138,7 @@ type mockDetailsMergeInfoer struct {
locs map[string]*path.Builder locs map[string]*path.Builder
} }
// TODO(ashmrtn): Update this to take mod time?
func (m *mockDetailsMergeInfoer) add(oldRef, newRef path.Path, newLoc *path.Builder) { func (m *mockDetailsMergeInfoer) add(oldRef, newRef path.Path, newLoc *path.Builder) {
oldPB := oldRef.ToBuilder() oldPB := oldRef.ToBuilder()
// Items are indexed individually. // Items are indexed individually.
@ -149,6 +150,7 @@ func (m *mockDetailsMergeInfoer) add(oldRef, newRef path.Path, newLoc *path.Buil
func (m *mockDetailsMergeInfoer) GetNewPathRefs( func (m *mockDetailsMergeInfoer) GetNewPathRefs(
oldRef *path.Builder, oldRef *path.Builder,
_ time.Time,
_ details.LocationIDer, _ details.LocationIDer,
) (path.Path, *path.Builder, error) { ) (path.Path, *path.Builder, error) {
return m.repoRefs[oldRef.ShortRef()], m.locs[oldRef.ShortRef()], nil return m.repoRefs[oldRef.ShortRef()], m.locs[oldRef.ShortRef()], nil