Delay getting ItemInfo when uploading data (#4174)
Delay getting ItemInfo when uploading data a tad longer. Also add additional checks that the modTime we use is what we expect later on. Delaying getting ItemInfo will help for Exchange lazy readers by resolving a circular dependency we had between adding the ItemInfo to the cache set and getting the item data The extra modTime check will at least let us know if something unexpected happens that would affect incremental details merging later on --- #### Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [ ] 🕐 Yes, but in a later PR - [x] ⛔ No #### Type of change - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Supportability/Tests - [ ] 💻 CI/Deployment - [ ] 🧹 Tech Debt/Cleanup #### Issue(s) * #2023 #### Test Plan - [ ] 💪 Manual - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
db86ccabf8
commit
bf08c2a257
@ -132,7 +132,7 @@ func (rw *restoreStreamReader) Read(p []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type itemDetails struct {
|
type itemDetails struct {
|
||||||
info *details.ItemInfo
|
infoFunc func() (details.ItemInfo, error)
|
||||||
repoPath path.Path
|
repoPath path.Path
|
||||||
prevPath path.Path
|
prevPath path.Path
|
||||||
locationPath *path.Builder
|
locationPath *path.Builder
|
||||||
@ -198,13 +198,16 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
|
|||||||
ctx := clues.Add(
|
ctx := clues.Add(
|
||||||
cp.ctx,
|
cp.ctx,
|
||||||
"service", d.repoPath.Service().String(),
|
"service", d.repoPath.Service().String(),
|
||||||
"category", d.repoPath.Category().String())
|
"category", d.repoPath.Category().String(),
|
||||||
|
"item_path", d.repoPath,
|
||||||
|
"item_loc", d.locationPath)
|
||||||
|
|
||||||
// These items were sourced from a base snapshot or were cached in kopia so we
|
// These items were sourced from a base snapshot or were cached in kopia so we
|
||||||
// never had to materialize their details in-memory.
|
// never had to materialize their details in-memory.
|
||||||
if d.info == nil || d.cached {
|
if d.infoFunc == nil || d.cached {
|
||||||
if d.prevPath == nil {
|
if d.prevPath == nil {
|
||||||
cp.errs.AddRecoverable(ctx, clues.New("finished file sourced from previous backup with no previous path").
|
cp.errs.AddRecoverable(ctx, clues.New("finished file sourced from previous backup with no previous path").
|
||||||
|
WithClues(ctx).
|
||||||
Label(fault.LabelForceNoBackupCreation))
|
Label(fault.LabelForceNoBackupCreation))
|
||||||
|
|
||||||
return
|
return
|
||||||
@ -220,18 +223,32 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
|
|||||||
d.locationPath)
|
d.locationPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cp.errs.AddRecoverable(ctx, clues.Wrap(err, "adding finished file to merge list").
|
cp.errs.AddRecoverable(ctx, clues.Wrap(err, "adding finished file to merge list").
|
||||||
|
WithClues(ctx).
|
||||||
Label(fault.LabelForceNoBackupCreation))
|
Label(fault.LabelForceNoBackupCreation))
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = cp.deets.Add(
|
info, err := d.infoFunc()
|
||||||
d.repoPath,
|
if err != nil {
|
||||||
d.locationPath,
|
cp.errs.AddRecoverable(ctx, clues.Wrap(err, "getting ItemInfo").
|
||||||
*d.info)
|
WithClues(ctx).
|
||||||
|
Label(fault.LabelForceNoBackupCreation))
|
||||||
|
|
||||||
|
return
|
||||||
|
} else if !ptr.Val(d.modTime).Equal(info.Modified()) {
|
||||||
|
cp.errs.AddRecoverable(ctx, clues.New("item modTime mismatch").
|
||||||
|
WithClues(ctx).
|
||||||
|
Label(fault.LabelForceNoBackupCreation))
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = cp.deets.Add(d.repoPath, d.locationPath, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cp.errs.AddRecoverable(ctx, clues.Wrap(err, "adding finished file to details").
|
cp.errs.AddRecoverable(ctx, clues.Wrap(err, "adding finished file to details").
|
||||||
|
WithClues(ctx).
|
||||||
Label(fault.LabelForceNoBackupCreation))
|
Label(fault.LabelForceNoBackupCreation))
|
||||||
|
|
||||||
return
|
return
|
||||||
@ -394,13 +411,12 @@ func collectionEntries(
|
|||||||
// Relative path given to us in the callback is missing the root
|
// Relative path given to us in the callback is missing the root
|
||||||
// element. Add to pending set before calling the callback to avoid race
|
// element. Add to pending set before calling the callback to avoid race
|
||||||
// conditions when the item is completed.
|
// conditions when the item is completed.
|
||||||
//
|
|
||||||
// TODO(ashmrtn): If we want to pull item info for cached item from a
|
|
||||||
// previous snapshot then we should populate prevPath here and leave
|
|
||||||
// info nil.
|
|
||||||
itemInfo := ei.Info()
|
|
||||||
d := &itemDetails{
|
d := &itemDetails{
|
||||||
info: &itemInfo,
|
// TODO(ashmrtn): Update API in data package to return an error and
|
||||||
|
// then remove this wrapper.
|
||||||
|
infoFunc: func() (details.ItemInfo, error) {
|
||||||
|
return ei.Info(), nil
|
||||||
|
},
|
||||||
repoPath: itemPath,
|
repoPath: itemPath,
|
||||||
// Also use the current path as the previous path for this item. This
|
// Also use the current path as the previous path for this item. This
|
||||||
// is so that if the item is marked as cached and we need to merge
|
// is so that if the item is marked as cached and we need to merge
|
||||||
@ -517,7 +533,6 @@ func streamBaseEntries(
|
|||||||
// the item to progress and having progress aggregate everything for
|
// the item to progress and having progress aggregate everything for
|
||||||
// later.
|
// later.
|
||||||
d := &itemDetails{
|
d := &itemDetails{
|
||||||
info: nil,
|
|
||||||
repoPath: itemPath,
|
repoPath: itemPath,
|
||||||
prevPath: prevItemPath,
|
prevPath: prevItemPath,
|
||||||
locationPath: locationPath,
|
locationPath: locationPath,
|
||||||
|
|||||||
@ -394,10 +394,12 @@ var finishedFileTable = []struct {
|
|||||||
return map[string]testInfo{
|
return map[string]testInfo{
|
||||||
fname: {
|
fname: {
|
||||||
info: &itemDetails{
|
info: &itemDetails{
|
||||||
info: &details.ItemInfo{
|
infoFunc: func() (details.ItemInfo, error) {
|
||||||
Exchange: &details.ExchangeInfo{
|
return details.ItemInfo{
|
||||||
ItemType: details.ExchangeMail,
|
Exchange: &details.ExchangeInfo{
|
||||||
},
|
ItemType: details.ExchangeMail,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
},
|
},
|
||||||
repoPath: fpath,
|
repoPath: fpath,
|
||||||
locationPath: path.Builder{}.Append(fpath.Folders()...),
|
locationPath: path.Builder{}.Append(fpath.Folders()...),
|
||||||
@ -430,10 +432,12 @@ var finishedFileTable = []struct {
|
|||||||
return map[string]testInfo{
|
return map[string]testInfo{
|
||||||
fname: {
|
fname: {
|
||||||
info: &itemDetails{
|
info: &itemDetails{
|
||||||
info: &details.ItemInfo{
|
infoFunc: func() (details.ItemInfo, error) {
|
||||||
Exchange: &details.ExchangeInfo{
|
return details.ItemInfo{
|
||||||
ItemType: details.ExchangeMail,
|
Exchange: &details.ExchangeInfo{
|
||||||
},
|
ItemType: details.ExchangeMail,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
},
|
},
|
||||||
repoPath: fpath,
|
repoPath: fpath,
|
||||||
},
|
},
|
||||||
@ -526,7 +530,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if cachedTest.dropInfo {
|
if cachedTest.dropInfo {
|
||||||
v.info.info = nil
|
v.info.infoFunc = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -589,7 +593,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileCachedNoPrevPathErrors() {
|
|||||||
bd := &details.Builder{}
|
bd := &details.Builder{}
|
||||||
cachedItems := map[string]testInfo{
|
cachedItems := map[string]testInfo{
|
||||||
suite.targetFileName: {
|
suite.targetFileName: {
|
||||||
info: &itemDetails{info: nil, repoPath: suite.targetFilePath},
|
info: &itemDetails{repoPath: suite.targetFilePath},
|
||||||
err: nil,
|
err: nil,
|
||||||
totalBytes: 100,
|
totalBytes: 100,
|
||||||
},
|
},
|
||||||
@ -654,7 +658,6 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileBaseItemDoesntBuildHierarch
|
|||||||
}
|
}
|
||||||
|
|
||||||
deets := &itemDetails{
|
deets := &itemDetails{
|
||||||
info: nil,
|
|
||||||
repoPath: suite.targetFilePath,
|
repoPath: suite.targetFilePath,
|
||||||
prevPath: prevPath,
|
prevPath: prevPath,
|
||||||
locationPath: suite.targetFileLoc,
|
locationPath: suite.targetFileLoc,
|
||||||
|
|||||||
@ -121,13 +121,16 @@ func (medc *DataCollection) Items(
|
|||||||
defer close(res)
|
defer close(res)
|
||||||
|
|
||||||
for i := 0; i < medc.messageCount; i++ {
|
for i := 0; i < medc.messageCount; i++ {
|
||||||
|
info := StubMailInfo()
|
||||||
|
info.Exchange.Modified = medc.ModTimes[i]
|
||||||
|
|
||||||
res <- &dataMock.Item{
|
res <- &dataMock.Item{
|
||||||
ItemID: medc.Names[i],
|
ItemID: medc.Names[i],
|
||||||
Reader: io.NopCloser(bytes.NewReader(medc.Data[i])),
|
Reader: io.NopCloser(bytes.NewReader(medc.Data[i])),
|
||||||
ItemSize: int64(len(medc.Data[i])),
|
ItemSize: int64(len(medc.Data[i])),
|
||||||
ModifiedTime: medc.ModTimes[i],
|
ModifiedTime: medc.ModTimes[i],
|
||||||
DeletedFlag: medc.DeletedItems[i],
|
DeletedFlag: medc.DeletedItems[i],
|
||||||
ItemInfo: StubMailInfo(),
|
ItemInfo: info,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user