Track more info about collections during backup (#1765)

## Description

Track additional information about collections and their items during backup so we can properly merge directories and items in directories when doing incremental backups

## Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [x] 🐹 Trivial/Minor

## Issue(s)

* #1740 

## Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2022-12-09 15:55:45 -08:00 committed by GitHub
parent 21c717fbe8
commit be9b214c0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -195,15 +195,16 @@ func collectionEntries(
cb func(context.Context, fs.Entry) error, cb func(context.Context, fs.Entry) error,
streamedEnts data.Collection, streamedEnts data.Collection,
progress *corsoProgress, progress *corsoProgress,
) *multierror.Error { ) (map[string]struct{}, *multierror.Error) {
if streamedEnts == nil { if streamedEnts == nil {
return nil return nil, nil
} }
var ( var (
errs *multierror.Error errs *multierror.Error
// Track which items have already been seen so we can skip them if we see // Track which items have already been seen so we can skip them if we see
// them again in the data from the base snapshot. // them again in the data from the base snapshot.
seen = map[string]struct{}{}
items = streamedEnts.Items() items = streamedEnts.Items()
log = logger.Ctx(ctx) log = logger.Ctx(ctx)
) )
@ -212,13 +213,28 @@ func collectionEntries(
select { select {
case <-ctx.Done(): case <-ctx.Done():
errs = multierror.Append(errs, ctx.Err()) errs = multierror.Append(errs, ctx.Err())
return errs return seen, errs
case e, ok := <-items: case e, ok := <-items:
if !ok { if !ok {
return errs return seen, errs
} }
// Even if this item has been deleted and should not appear at all in
// the new snapshot we need to record that we've seen it here so we know
// to skip it if it's also present in the base snapshot.
//
// TODO(ashmrtn): Determine if we want to try to use the old version of
// the data (if it exists in the base) if we fail uploading the new
// version. If so, we should split this call into where we check for the
// item being deleted and then again after we do the kopia callback.
//
// TODO(ashmrtn): With a little more info, we could reduce the number of
// items we need to track. Namely, we can track the created time of the
// item and if it's after the base snapshot was finalized we can skip it
// because it's not possible for the base snapshot to contain that item.
seen[e.UUID()] = struct{}{}
// For now assuming that item IDs don't need escaping. // For now assuming that item IDs don't need escaping.
itemPath, err := streamedEnts.FullPath().Append(e.UUID(), true) itemPath, err := streamedEnts.FullPath().Append(e.UUID(), true)
if err != nil { if err != nil {
@ -231,7 +247,11 @@ func collectionEntries(
} }
log.Debugw("reading item", "path", itemPath.String()) log.Debugw("reading item", "path", itemPath.String())
trace.Log(ctx, "kopia:collectionEntries:item", itemPath.String()) trace.Log(ctx, "kopia:streamEntries:item", itemPath.String())
if e.Deleted() {
continue
}
// Not all items implement StreamInfo. For example, the metadata files // Not all items implement StreamInfo. For example, the metadata files
// do not because they don't contain information directly backed up or // do not because they don't contain information directly backed up or
@ -264,7 +284,7 @@ func collectionEntries(
// Kopia's uploader swallows errors in most cases, so if we see // Kopia's uploader swallows errors in most cases, so if we see
// something here it's probably a big issue and we should return. // something here it's probably a big issue and we should return.
errs = multierror.Append(errs, errors.Wrapf(err, "executing callback on %q", itemPath)) errs = multierror.Append(errs, errors.Wrapf(err, "executing callback on %q", itemPath))
return errs return seen, errs
} }
} }
} }
@ -290,7 +310,7 @@ func getStreamItemFunc(
} }
} }
errs := collectionEntries(ctx, cb, streamedEnts, progress) _, errs := collectionEntries(ctx, cb, streamedEnts, progress)
// TODO(ashmrtn): Stream entries from base snapshot if they exist. // TODO(ashmrtn): Stream entries from base snapshot if they exist.
@ -370,21 +390,34 @@ func getTreeNode(roots map[string]*treeMap, pathElements []string) *treeMap {
func inflateCollectionTree( func inflateCollectionTree(
ctx context.Context, ctx context.Context,
collections []data.Collection, collections []data.Collection,
) (map[string]*treeMap, *OwnersCats, error) { ) (map[string]*treeMap, map[string]path.Path, *OwnersCats, error) {
roots := make(map[string]*treeMap) roots := make(map[string]*treeMap)
// Contains the old path for collections that have been moved or renamed.
// Allows resolving what the new path should be when walking the base
// snapshot(s)'s hierarchy. Nil represents a collection that was deleted.
updatedPaths := make(map[string]path.Path)
ownerCats := &OwnersCats{ ownerCats := &OwnersCats{
ResourceOwners: make(map[string]struct{}), ResourceOwners: make(map[string]struct{}),
ServiceCats: make(map[string]struct{}), ServiceCats: make(map[string]struct{}),
} }
for _, s := range collections { for _, s := range collections {
switch s.State() {
case data.DeletedState:
updatedPaths[s.PreviousPath().String()] = nil
continue
case data.MovedState:
updatedPaths[s.PreviousPath().String()] = s.FullPath()
}
if s.FullPath() == nil || len(s.FullPath().Elements()) == 0 { if s.FullPath() == nil || len(s.FullPath().Elements()) == 0 {
return nil, nil, errors.New("no identifier for collection") return nil, nil, nil, errors.New("no identifier for collection")
} }
node := getTreeNode(roots, s.FullPath().Elements()) node := getTreeNode(roots, s.FullPath().Elements())
if node == nil { if node == nil {
return nil, nil, errors.Errorf( return nil, nil, nil, errors.Errorf(
"unable to get tree node for path %s", "unable to get tree node for path %s",
s.FullPath(), s.FullPath(),
) )
@ -397,7 +430,7 @@ func inflateCollectionTree(
node.collection = s node.collection = s
} }
return roots, ownerCats, nil return roots, updatedPaths, ownerCats, nil
} }
// inflateDirTree returns a set of tags representing all the resource owners and // inflateDirTree returns a set of tags representing all the resource owners and
@ -411,7 +444,7 @@ func inflateDirTree(
collections []data.Collection, collections []data.Collection,
progress *corsoProgress, progress *corsoProgress,
) (fs.Directory, *OwnersCats, error) { ) (fs.Directory, *OwnersCats, error) {
roots, ownerCats, err := inflateCollectionTree(ctx, collections) roots, _, ownerCats, err := inflateCollectionTree(ctx, collections)
if err != nil { if err != nil {
return nil, nil, errors.Wrap(err, "inflating collection tree") return nil, nil, errors.Wrap(err, "inflating collection tree")
} }