Merge items in a single directory during snapshot (#1797)
## Description Create and wire up a helper function that merges items in the base snapshot with items that have been seen from the collection Will not change the output of execution at all since the base snapshot directory will always be nil at the moment Will require further modifications to properly handle deleted items where only a single delta endpoint is used to fetch changes (e.x. OneDrive file deletions) ## Type of change - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Test - [ ] 💻 CI/Deployment - [ ] 🐹 Trivial/Minor ## Issue(s) * #1740 ## Test Plan - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
c1a0b74d22
commit
0fbe65a304
@ -234,6 +234,8 @@ func collectionEntries(
|
||||
return seen, errs
|
||||
}
|
||||
|
||||
encodedName := encodeAsPath(e.UUID())
|
||||
|
||||
// Even if this item has been deleted and should not appear at all in
|
||||
// the new snapshot we need to record that we've seen it here so we know
|
||||
// to skip it if it's also present in the base snapshot.
|
||||
@ -247,7 +249,7 @@ func collectionEntries(
|
||||
// items we need to track. Namely, we can track the created time of the
|
||||
// item and if it's after the base snapshot was finalized we can skip it
|
||||
// because it's not possible for the base snapshot to contain that item.
|
||||
seen[e.UUID()] = struct{}{}
|
||||
seen[encodedName] = struct{}{}
|
||||
|
||||
// For now assuming that item IDs don't need escaping.
|
||||
itemPath, err := streamedEnts.FullPath().Append(e.UUID(), true)
|
||||
@ -287,7 +289,7 @@ func collectionEntries(
|
||||
}
|
||||
|
||||
entry := virtualfs.StreamingFileWithModTimeFromReader(
|
||||
encodeAsPath(e.UUID()),
|
||||
encodedName,
|
||||
modTime,
|
||||
newBackupStreamReader(serializationVersion, e.ToReader()),
|
||||
)
|
||||
@ -301,6 +303,61 @@ func collectionEntries(
|
||||
}
|
||||
}
|
||||
|
||||
func streamBaseEntries(
|
||||
ctx context.Context,
|
||||
cb func(context.Context, fs.Entry) error,
|
||||
dir fs.Directory,
|
||||
encodedSeen map[string]struct{},
|
||||
progress *corsoProgress,
|
||||
) error {
|
||||
if dir == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := dir.IterateEntries(ctx, func(innerCtx context.Context, entry fs.Entry) error {
|
||||
if err := innerCtx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Don't walk subdirectories in this function.
|
||||
if _, ok := entry.(fs.Directory); ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// This entry was either updated or deleted. In either case, the external
|
||||
// service notified us about it and it's already been handled so we should
|
||||
// skip it here.
|
||||
if _, ok := encodedSeen[entry.Name()]; ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := cb(ctx, entry); err != nil {
|
||||
entName, err := decodeElement(entry.Name())
|
||||
if err != nil {
|
||||
entName = entry.Name()
|
||||
}
|
||||
|
||||
return errors.Wrapf(err, "executing callback on item %q", entName)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
name, err := decodeElement(dir.Name())
|
||||
if err != nil {
|
||||
name = dir.Name()
|
||||
}
|
||||
|
||||
return errors.Wrapf(
|
||||
err,
|
||||
"traversing items in base snapshot directory %q",
|
||||
name,
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getStreamItemFunc returns a function that can be used by kopia's
|
||||
// virtualfs.StreamingDirectory to iterate through directory entries and call
|
||||
// kopia callbacks on directory entries. It binds the directory to the given
|
||||
@ -308,6 +365,7 @@ func collectionEntries(
|
||||
func getStreamItemFunc(
|
||||
staticEnts []fs.Entry,
|
||||
streamedEnts data.Collection,
|
||||
baseDir fs.Directory,
|
||||
progress *corsoProgress,
|
||||
) func(context.Context, func(context.Context, fs.Entry) error) error {
|
||||
return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
|
||||
@ -321,9 +379,14 @@ func getStreamItemFunc(
|
||||
}
|
||||
}
|
||||
|
||||
_, errs := collectionEntries(ctx, cb, streamedEnts, progress)
|
||||
seen, errs := collectionEntries(ctx, cb, streamedEnts, progress)
|
||||
|
||||
// TODO(ashmrtn): Stream entries from base snapshot if they exist.
|
||||
if err := streamBaseEntries(ctx, cb, baseDir, seen, progress); err != nil {
|
||||
errs = multierror.Append(
|
||||
errs,
|
||||
errors.Wrap(err, "streaming base snapshot entries"),
|
||||
)
|
||||
}
|
||||
|
||||
return errs.ErrorOrNil()
|
||||
}
|
||||
@ -347,13 +410,23 @@ func buildKopiaDirs(dirName string, dir *treeMap, progress *corsoProgress) (fs.D
|
||||
|
||||
return virtualfs.NewStreamingDirectory(
|
||||
encodeAsPath(dirName),
|
||||
getStreamItemFunc(childDirs, dir.collection, progress),
|
||||
getStreamItemFunc(childDirs, dir.collection, dir.baseDir, progress),
|
||||
), nil
|
||||
}
|
||||
|
||||
type treeMap struct {
|
||||
childDirs map[string]*treeMap
|
||||
// Child directories of this directory.
|
||||
childDirs map[string]*treeMap
|
||||
// Reference to data pulled from the external service. Contains only items in
|
||||
// this directory. Does not contain references to subdirectories.
|
||||
collection data.Collection
|
||||
// Reference to directory in base snapshot. The referenced directory itself
|
||||
// may contain files and subdirectories, but the subdirectories should
|
||||
// eventually be added when walking the base snapshot to build the hierarchy,
|
||||
// not when handing items to kopia for the new snapshot. Subdirectories should
|
||||
// be added to childDirs while building the hierarchy. They will be ignored
|
||||
// when iterating through the directory to hand items to kopia.
|
||||
baseDir fs.Directory
|
||||
}
|
||||
|
||||
func newTreeMap() *treeMap {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user