Produce map of items that need backup details entries (#1892)

## Description

Generate a `map[old ShortRef]new path.Path` for every item sourced from a base snapshot during backup. Return this information at the end of BackupCollections so that callers can use it to merge backup details if desired.

This PR does a few auxiliary things as well:
* thread current and previous path through hierarchy merging
* add extra field to info tracked by corsoProgress to hold previous path
* provide current and previous paths to streamBaseItems function

## Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No 

## Type of change

- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🐹 Trivial/Minor

## Issue(s)

* #1800 

## Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2022-12-22 14:29:15 -08:00 committed by GitHub
parent aacb013b60
commit 6b689b76c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 196 additions and 46 deletions

View File

@ -118,14 +118,18 @@ func (rw *restoreStreamReader) Read(p []byte) (n int, err error) {
}
type itemDetails struct {
info details.ItemInfo
info *details.ItemInfo
repoPath path.Path
prevPath path.Path
}
type corsoProgress struct {
snapshotfs.UploadProgress
pending map[string]*itemDetails
deets *details.Builder
pending map[string]*itemDetails
deets *details.Builder
// toMerge represents items that we don't have in-memory item info for. The
// item info for these items should be sourced from a base snapshot later on.
toMerge map[string]path.Path
mu sync.RWMutex
totalBytes int64
}
@ -153,6 +157,22 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
return
}
// These items were sourced from a base snapshot or were cached in kopia so we
// never had to materialize their details in-memory.
if d.info == nil {
// TODO(ashmrtn): We should probably be returning an error here?
if d.prevPath == nil {
return
}
cp.mu.Lock()
defer cp.mu.Unlock()
cp.toMerge[d.prevPath.ShortRef()] = d.repoPath
return
}
parent := d.repoPath.ToBuilder().Dir()
cp.deets.Add(
@ -160,7 +180,7 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
d.repoPath.ShortRef(),
parent.ShortRef(),
true,
d.info,
*d.info,
)
folders := []details.FolderEntry{}
@ -182,7 +202,7 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
parent = nextParent
}
cp.deets.AddFoldersForItem(folders, d.info)
cp.deets.AddFoldersForItem(folders, *d.info)
}
// Kopia interface function used as a callback when kopia finishes hashing a file.
@ -282,7 +302,12 @@ func collectionEntries(
// Relative path given to us in the callback is missing the root
// element. Add to pending set before calling the callback to avoid race
// conditions when the item is completed.
d := &itemDetails{info: ei.Info(), repoPath: itemPath}
//
// TODO(ashmrtn): If we want to pull item info for cached item from a
// previous snapshot then we should populate prevPath here and leave
// info nil.
itemInfo := ei.Info()
d := &itemDetails{info: &itemInfo, repoPath: itemPath}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
}
@ -309,6 +334,8 @@ func collectionEntries(
func streamBaseEntries(
ctx context.Context,
cb func(context.Context, fs.Entry) error,
curPath path.Path,
prevPath path.Path,
dir fs.Directory,
encodedSeen map[string]struct{},
progress *corsoProgress,
@ -334,27 +361,43 @@ func streamBaseEntries(
return nil
}
if err := cb(ctx, entry); err != nil {
entName, err := decodeElement(entry.Name())
if err != nil {
entName = entry.Name()
}
entName, err := decodeElement(entry.Name())
if err != nil {
return errors.Wrapf(err, "unable to decode entry name %s", entry.Name())
}
return errors.Wrapf(err, "executing callback on item %q", entName)
// For now assuming that item IDs don't need escaping.
itemPath, err := curPath.Append(entName, true)
if err != nil {
return errors.Wrap(err, "getting full item path for base entry")
}
// We need the previous path so we can find this item in the base snapshot's
// backup details. If the item moved and we had only the new path, we'd be
// unable to find it in the old backup details because we wouldn't know what
// to look for.
prevItemPath, err := prevPath.Append(entName, true)
if err != nil {
return errors.Wrap(err, "getting previous full item path for base entry")
}
// All items have item info in the base backup. However, we need to make
// sure we have enough metadata to find those entries. To do that we add the
// item to progress and having progress aggregate everything for later.
d := &itemDetails{info: nil, repoPath: itemPath, prevPath: prevItemPath}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
if err := cb(ctx, entry); err != nil {
return errors.Wrapf(err, "executing callback on item %q", itemPath)
}
return nil
})
if err != nil {
name, err := decodeElement(dir.Name())
if err != nil {
name = dir.Name()
}
return errors.Wrapf(
err,
"traversing items in base snapshot directory %q",
name,
curPath,
)
}
@ -366,6 +409,8 @@ func streamBaseEntries(
// kopia callbacks on directory entries. It binds the directory to the given
// DataCollection.
func getStreamItemFunc(
curPath path.Path,
prevPath path.Path,
staticEnts []fs.Entry,
streamedEnts data.Collection,
baseDir fs.Directory,
@ -384,7 +429,15 @@ func getStreamItemFunc(
seen, errs := collectionEntries(ctx, cb, streamedEnts, progress)
if err := streamBaseEntries(ctx, cb, baseDir, seen, progress); err != nil {
if err := streamBaseEntries(
ctx,
cb,
curPath,
prevPath,
baseDir,
seen,
progress,
); err != nil {
errs = multierror.Append(
errs,
errors.Wrap(err, "streaming base snapshot entries"),
@ -430,11 +483,25 @@ func buildKopiaDirs(dirName string, dir *treeMap, progress *corsoProgress) (fs.D
return virtualfs.NewStreamingDirectory(
encodeAsPath(dirName),
getStreamItemFunc(childDirs, dir.collection, dir.baseDir, progress),
getStreamItemFunc(
dir.currentPath,
dir.prevPath,
childDirs,
dir.collection,
dir.baseDir,
progress,
),
), nil
}
type treeMap struct {
// path.Path representing the node's path. This is passed as a parameter to
// the stream item function so that even baseDir directories can properly
// generate the full path of items.
currentPath path.Path
// Previous path this directory may have resided at if it is sourced from a
// base snapshot.
prevPath path.Path
// Child directories of this directory.
childDirs map[string]*treeMap
// Reference to data pulled from the external service. Contains only items in
@ -583,6 +650,8 @@ func inflateCollectionTree(
}
node.collection = s
node.currentPath = s.FullPath()
node.prevPath = s.PreviousPath()
}
// Check that each previous path has only one of the states of deleted, moved,
@ -708,7 +777,25 @@ func traverseBaseDir(
return errors.Errorf("unable to get tree node for path %s", currentPath)
}
curP, err := path.FromDataLayerPath(currentPath.String(), false)
if err != nil {
return errors.Errorf(
"unable to convert current path %s to path.Path",
currentPath,
)
}
oldP, err := path.FromDataLayerPath(oldDirPath.String(), false)
if err != nil {
return errors.Errorf(
"unable to convert old path %s to path.Path",
oldDirPath,
)
}
node.baseDir = dir
node.currentPath = curP
node.prevPath = oldP
}
return nil

View File

@ -25,8 +25,8 @@ import (
"github.com/alcionai/corso/src/pkg/path"
)
func makePath(t *testing.T, elements []string) path.Path {
p, err := path.FromDataLayerPath(stdpath.Join(elements...), false)
func makePath(t *testing.T, elements []string, isItem bool) path.Path {
p, err := path.FromDataLayerPath(stdpath.Join(elements...), isItem)
require.NoError(t, err)
return p
@ -389,7 +389,7 @@ var finishedFileTable = []struct {
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
return map[string]testInfo{
fname: {
info: &itemDetails{details.ItemInfo{}, fpath},
info: &itemDetails{info: &details.ItemInfo{}, repoPath: fpath},
err: nil,
totalBytes: 100,
},
@ -416,7 +416,7 @@ var finishedFileTable = []struct {
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
return map[string]testInfo{
fname: {
info: &itemDetails{details.ItemInfo{}, fpath},
info: &itemDetails{info: &details.ItemInfo{}, repoPath: fpath},
err: assert.AnError,
},
}
@ -460,7 +460,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() {
}
}
func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchy() {
func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchyNewItem() {
t := suite.T()
// Order of folders in hierarchy from root to leaf (excluding the item).
expectedFolderOrder := suite.targetFilePath.ToBuilder().Dir().Elements()
@ -471,14 +471,17 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchy() {
UploadProgress: &snapshotfs.NullUploadProgress{},
deets: bd,
pending: map[string]*itemDetails{},
toMerge: map[string]path.Path{},
}
deets := &itemDetails{details.ItemInfo{}, suite.targetFilePath}
deets := &itemDetails{info: &details.ItemInfo{}, repoPath: suite.targetFilePath}
cp.put(suite.targetFileName, deets)
require.Len(t, cp.pending, 1)
cp.FinishedFile(suite.targetFileName, nil)
assert.Empty(t, cp.toMerge)
// Gather information about the current state.
var (
curRef *details.DetailsEntry
@ -521,6 +524,42 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchy() {
assert.Empty(t, rootRef.ParentRef)
}
func (suite *CorsoProgressUnitSuite) TestFinishedFileBaseItemDoesntBuildHierarchy() {
t := suite.T()
prevPath := makePath(
suite.T(),
[]string{testTenant, service, testUser, category, testInboxDir, testFileName2},
true,
)
expectedToMerge := map[string]path.Path{
prevPath.ShortRef(): suite.targetFilePath,
}
// Setup stuff.
bd := &details.Builder{}
cp := corsoProgress{
UploadProgress: &snapshotfs.NullUploadProgress{},
deets: bd,
pending: map[string]*itemDetails{},
toMerge: map[string]path.Path{},
}
deets := &itemDetails{
info: nil,
repoPath: suite.targetFilePath,
prevPath: prevPath,
}
cp.put(suite.targetFileName, deets)
require.Len(t, cp.pending, 1)
cp.FinishedFile(suite.targetFileName, nil)
assert.Equal(t, expectedToMerge, cp.toMerge)
assert.Empty(t, cp.deets)
}
func (suite *CorsoProgressUnitSuite) TestFinishedHashingFile() {
for _, test := range finishedFileTable {
suite.T().Run(test.name, func(t *testing.T) {
@ -552,6 +591,7 @@ func (suite *HierarchyBuilderUnitSuite) SetupSuite() {
suite.testPath = makePath(
suite.T(),
[]string{testTenant, service, testUser, category, testInboxDir},
false,
)
}
@ -572,7 +612,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree() {
user2 := "user2"
user2Encoded := encodeAsPath(user2)
p2 := makePath(t, []string{tenant, service, user2, category, testInboxDir})
p2 := makePath(t, []string{tenant, service, user2, category, testInboxDir}, false)
// Encode user names here so we don't have to decode things later.
expectedFileCount := map[string]int{
@ -644,7 +684,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_MixedDirectory()
subdir := "subfolder"
p2 := makePath(suite.T(), append(suite.testPath.Elements(), subdir))
p2 := makePath(suite.T(), append(suite.testPath.Elements(), subdir), false)
// Test multiple orders of items because right now order can matter. Both
// orders result in a directory structure like:
@ -739,6 +779,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_Fails() {
p2 := makePath(
suite.T(),
[]string{"tenant2", service, "user2", category, testInboxDir},
false,
)
table := []struct {
@ -820,10 +861,12 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeErrors() {
dirPath := makePath(
suite.T(),
[]string{testTenant, service, testUser, category, testInboxDir},
false,
)
dirPath2 := makePath(
suite.T(),
[]string{testTenant, service, testUser, category, testArchiveDir},
false,
)
table := []struct {
@ -898,10 +941,12 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSingleSubtree() {
dirPath := makePath(
suite.T(),
[]string{testTenant, service, testUser, category, testInboxDir},
false,
)
dirPath2 := makePath(
suite.T(),
[]string{testTenant, service, testUser, category, testArchiveDir},
false,
)
// Must be a function that returns a new instance each time as StreamingFile
@ -1142,11 +1187,13 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto
inboxPath := makePath(
suite.T(),
[]string{testTenant, service, testUser, category, testInboxDir},
false,
)
personalPath := makePath(
suite.T(),
append(inboxPath.Elements(), personalDir),
false,
)
personalFileName1 := testFileName
personalFileName2 := testFileName2
@ -1154,6 +1201,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto
workPath := makePath(
suite.T(),
append(inboxPath.Elements(), workDir),
false,
)
workFileName := testFileName3
@ -1224,6 +1272,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto
newPath := makePath(
t,
[]string{testTenant, service, testUser, category, testInboxDir + "2"},
false,
)
mc := mockconnector.NewMockExchangeCollection(newPath, 0)
@ -1276,10 +1325,12 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto
newInboxPath := makePath(
t,
[]string{testTenant, service, testUser, category, testInboxDir + "2"},
false,
)
newWorkPath := makePath(
t,
[]string{testTenant, service, testUser, category, workDir},
false,
)
inbox := mockconnector.NewMockExchangeCollection(newInboxPath, 0)
@ -1336,6 +1387,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto
newWorkPath := makePath(
t,
[]string{testTenant, service, testUser, category, workDir},
false,
)
inbox := mockconnector.NewMockExchangeCollection(inboxPath, 0)
@ -1411,6 +1463,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto
newPersonalPath := makePath(
t,
[]string{testTenant, service, testUser, category, personalDir},
false,
)
personal := mockconnector.NewMockExchangeCollection(newPersonalPath, 0)
@ -1464,6 +1517,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeMultipleSubdirecto
newPersonalPath := makePath(
t,
[]string{testTenant, service, testUser, category, workDir},
false,
)
personal := mockconnector.NewMockExchangeCollection(newPersonalPath, 2)
@ -1729,6 +1783,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTreeSelectsCorrectSubt
inboxPath := makePath(
suite.T(),
[]string{testTenant, service, testUser, category, testInboxDir},
false,
)
inboxFileName1 := testFileName

View File

@ -122,28 +122,29 @@ func (w Wrapper) BackupCollections(
service path.ServiceType,
oc *OwnersCats,
tags map[string]string,
) (*BackupStats, *details.Details, error) {
) (*BackupStats, *details.Details, map[string]path.Path, error) {
if w.c == nil {
return nil, nil, errNotConnected
return nil, nil, nil, errNotConnected
}
ctx, end := D.Span(ctx, "kopia:backupCollections")
defer end()
if len(collections) == 0 {
return &BackupStats{}, (&details.Builder{}).Details(), nil
return &BackupStats{}, (&details.Builder{}).Details(), nil, nil
}
progress := &corsoProgress{
pending: map[string]*itemDetails{},
deets: &details.Builder{},
toMerge: map[string]path.Path{},
}
// TODO(ashmrtn): Pass previousSnapshots here to enable building the directory
// hierarchy with them.
dirTree, err := inflateDirTree(ctx, w.c, nil, collections, progress)
if err != nil {
return nil, nil, errors.Wrap(err, "building kopia directories")
return nil, nil, nil, errors.Wrap(err, "building kopia directories")
}
s, err := w.makeSnapshotWithRoot(
@ -155,10 +156,10 @@ func (w Wrapper) BackupCollections(
progress,
)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
return s, progress.deets.Details(), nil
return s, progress.deets.Details(), progress.toMerge, nil
}
func (w Wrapper) makeSnapshotWithRoot(

View File

@ -265,7 +265,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
stats, deets, err := suite.w.BackupCollections(
stats, deets, _, err := suite.w.BackupCollections(
suite.ctx,
prevSnaps,
collections,
@ -346,7 +346,7 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() {
fp2, err := suite.testPath2.Append(dc2.Names[0], true)
require.NoError(t, err)
stats, _, err := w.BackupCollections(
stats, _, _, err := w.BackupCollections(
ctx,
nil,
[]data.Collection{dc1, dc2},
@ -428,7 +428,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() {
},
}
stats, deets, err := suite.w.BackupCollections(
stats, deets, _, err := suite.w.BackupCollections(
suite.ctx,
nil,
collections,
@ -473,7 +473,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollectionsHandlesNoCollections()
ctx, flush := tester.NewContext()
defer flush()
s, d, err := suite.w.BackupCollections(
s, d, _, err := suite.w.BackupCollections(
ctx,
nil,
test.collections,
@ -634,7 +634,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
},
}
stats, deets, err := suite.w.BackupCollections(
stats, deets, _, err := suite.w.BackupCollections(
suite.ctx,
nil,
collections,

View File

@ -149,7 +149,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
return opStats.readErr
}
opStats.k, backupDetails, err = consumeBackupDataCollections(
opStats.k, backupDetails, _, err = consumeBackupDataCollections(
ctx,
op.kopia,
tenantID,
@ -363,7 +363,7 @@ type backuper interface {
service path.ServiceType,
oc *kopia.OwnersCats,
tags map[string]string,
) (*kopia.BackupStats, *details.Details, error)
) (*kopia.BackupStats, *details.Details, map[string]path.Path, error)
}
// calls kopia to backup the collections of data
@ -376,7 +376,7 @@ func consumeBackupDataCollections(
mans []*kopia.ManifestEntry,
cs []data.Collection,
backupID model.StableID,
) (*kopia.BackupStats, *details.Details, error) {
) (*kopia.BackupStats, *details.Details, map[string]path.Path, error) {
complete, closer := observe.MessageWithCompletion("Backing up data:")
defer func() {
complete <- struct{}{}
@ -397,7 +397,7 @@ func consumeBackupDataCollections(
for _, reason := range m.Reasons {
pb, err := builderFromReason(tenantID, reason)
if err != nil {
return nil, nil, errors.Wrap(err, "getting subtree paths for bases")
return nil, nil, nil, errors.Wrap(err, "getting subtree paths for bases")
}
paths = append(paths, pb)

View File

@ -296,12 +296,12 @@ func (mbu mockBackuper) BackupCollections(
service path.ServiceType,
oc *kopia.OwnersCats,
tags map[string]string,
) (*kopia.BackupStats, *details.Details, error) {
) (*kopia.BackupStats, *details.Details, map[string]path.Path, error) {
if mbu.checkFunc != nil {
mbu.checkFunc(bases, cs, service, oc, tags)
}
return &kopia.BackupStats{}, &details.Details{}, nil
return &kopia.BackupStats{}, &details.Details{}, nil, nil
}
func (suite *BackupOpSuite) TestBackupOperation_ConsumeBackupDataCollections_Paths() {

View File

@ -73,7 +73,14 @@ func (ss *streamStore) WriteBackupDetails(
},
}
backupStats, _, err := ss.kw.BackupCollections(ctx, nil, []data.Collection{dc}, ss.service, nil, nil)
backupStats, _, _, err := ss.kw.BackupCollections(
ctx,
nil,
[]data.Collection{dc},
ss.service,
nil,
nil,
)
if err != nil {
return "", nil
}