Snapshot function parameter refactor (#4143)

Pass struct to functions for snapshot upload
instead of a bunch of individual parameters.

Use embedding to remove the need to change
existing code that configures all the
parameters, only passing and polarity of
one variable is really changed

May be easier to view ignoring whitespace
changes

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [x] 🧹 Tech Debt/Cleanup

#### Test Plan

- [ ] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
ashmrtn 2023-09-20 09:37:37 -07:00 committed by GitHub
parent 83815eee0e
commit 4758724393
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -437,7 +437,9 @@ func collectionEntries(
if err != nil {
// Kopia's uploader swallows errors in most cases, so if we see
// something here it's probably a big issue and we should return.
return seen, clues.Wrap(err, "executing callback").WithClues(ctx).With("item_path", itemPath)
return seen, clues.Wrap(err, "executing callback").
WithClues(ctx).
With("item_path", itemPath)
}
}
}
@ -446,16 +448,13 @@ func collectionEntries(
func streamBaseEntries(
ctx context.Context,
ctr func(context.Context, fs.Entry) error,
curPath path.Path,
prevPath path.Path,
params snapshotParams,
locationPath *path.Builder,
dir fs.Directory,
encodedSeen map[string]struct{},
globalExcludeSet prefixmatcher.StringSetReader,
streamDirEnts bool,
progress *corsoProgress,
) error {
if dir == nil {
if params.baseDir == nil {
return nil
}
@ -465,15 +464,18 @@ func streamBaseEntries(
)
if globalExcludeSet != nil {
longest, excludeSet, _ = globalExcludeSet.LongestPrefix(curPath.String())
longest, excludeSet, _ = globalExcludeSet.LongestPrefix(
params.currentPath.String())
}
ctx = clues.Add(
ctx,
"current_directory_path", curPath,
"current_directory_path", params.currentPath,
"longest_prefix", path.LoggableDir(longest))
err := dir.IterateEntries(ctx, func(innerCtx context.Context, entry fs.Entry) error {
err := params.baseDir.IterateEntries(
ctx,
func(innerCtx context.Context, entry fs.Entry) error {
if err := innerCtx.Err(); err != nil {
return clues.Stack(err).WithClues(ctx)
}
@ -487,36 +489,40 @@ func streamBaseEntries(
if d, ok := entry.(fs.Directory); ok {
// Don't walk subdirectories in this function.
if !streamDirEnts {
if !params.streamBaseEnts {
return nil
}
// Do walk subdirectories. The previous and current path of the directory
// can be generated by appending the directory name onto the previous and
// current path of this directory. Since the directory has no
// BackupCollection associated with it (part of the criteria for allowing
// walking directories in this function) there shouldn't be any
// Do walk subdirectories. The previous and current path of the
// directory can be generated by appending the directory name onto the
// previous and current path of this directory. Since the directory has
// no BackupCollection associated with it (part of the criteria for
// allowing walking directories in this function) there shouldn't be any
// LocationPath information associated with the directory.
newP, err := curPath.Append(false, entName)
newP, err := params.currentPath.Append(false, entName)
if err != nil {
return clues.Wrap(err, "getting current directory path").WithClues(ctx)
return clues.Wrap(err, "getting current directory path").
WithClues(ctx)
}
oldP, err := prevPath.Append(false, entName)
oldP, err := params.prevPath.Append(false, entName)
if err != nil {
return clues.Wrap(err, "getting previous directory path").WithClues(ctx)
return clues.Wrap(err, "getting previous directory path").
WithClues(ctx)
}
e := virtualfs.NewStreamingDirectory(
entry.Name(),
getStreamItemFunc(
newP,
oldP,
snapshotParams{
currentPath: newP,
prevPath: oldP,
collection: nil,
baseDir: d,
streamBaseEnts: params.streamBaseEnts,
},
nil,
nil,
d,
globalExcludeSet,
streamDirEnts,
progress))
return clues.Wrap(ctr(ctx, e), "executing callback on subdirectory").
@ -539,26 +545,28 @@ func streamBaseEntries(
}
// For now assuming that item IDs don't need escaping.
itemPath, err := curPath.AppendItem(entName)
itemPath, err := params.currentPath.AppendItem(entName)
if err != nil {
return clues.Wrap(err, "getting full item path for base entry").WithClues(ctx)
return clues.Wrap(err, "getting full item path for base entry").
WithClues(ctx)
}
// We need the previous path so we can find this item in the base snapshot's
// backup details. If the item moved and we had only the new path, we'd be
// unable to find it in the old backup details because we wouldn't know what
// to look for.
prevItemPath, err := prevPath.AppendItem(entName)
prevItemPath, err := params.prevPath.AppendItem(entName)
if err != nil {
return clues.Wrap(err, "getting previous full item path for base entry").WithClues(ctx)
return clues.Wrap(err, "getting previous full item path for base entry").
WithClues(ctx)
}
// Meta files aren't in backup details since it's the set of items the user
// sees.
// Meta files aren't in backup details since it's the set of items the
// user sees.
//
// TODO(ashmrtn): We may eventually want to make this a function that is
// passed in so that we can more easily switch it between different external
// service provider implementations.
// passed in so that we can more easily switch it between different
// external service provider implementations.
if !metadata.IsMetadataFile(itemPath) {
// All items have item info in the base backup. However, we need to make
// sure we have enough metadata to find those entries. To do that we add
@ -582,7 +590,8 @@ func streamBaseEntries(
return nil
})
if err != nil {
return clues.Wrap(err, "traversing items in base snapshot directory").WithClues(ctx)
return clues.Wrap(err, "traversing items in base snapshot directory").
WithClues(ctx)
}
return nil
@ -593,13 +602,9 @@ func streamBaseEntries(
// kopia callbacks on directory entries. It binds the directory to the given
// DataCollection.
func getStreamItemFunc(
curPath path.Path,
prevPath path.Path,
params snapshotParams,
staticEnts []fs.Entry,
streamedEnts data.BackupCollection,
baseDir fs.Directory,
globalExcludeSet prefixmatcher.StringSetReader,
streamDirEnts bool,
progress *corsoProgress,
) func(context.Context, func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, ctr func(context.Context, fs.Entry) error) error {
@ -609,17 +614,18 @@ func getStreamItemFunc(
// Return static entries in this directory first.
for _, d := range staticEnts {
if err := ctr(ctx, d); err != nil {
return clues.Wrap(err, "executing callback on static directory").WithClues(ctx)
return clues.Wrap(err, "executing callback on static directory").
WithClues(ctx)
}
}
var locationPath *path.Builder
if lp, ok := streamedEnts.(data.LocationPather); ok {
if lp, ok := params.collection.(data.LocationPather); ok {
locationPath = lp.LocationPath()
}
seen, err := collectionEntries(ctx, ctr, streamedEnts, progress)
seen, err := collectionEntries(ctx, ctr, params.collection, progress)
if err != nil {
return clues.Wrap(err, "streaming collection entries")
}
@ -627,13 +633,10 @@ func getStreamItemFunc(
if err := streamBaseEntries(
ctx,
ctr,
curPath,
prevPath,
params,
locationPath,
baseDir,
seen,
globalExcludeSet,
streamDirEnts,
progress); err != nil {
return clues.Wrap(err, "streaming base snapshot entries")
}
@ -664,7 +667,11 @@ func buildKopiaDirs(
// backup's backup details
for childName, childDir := range dir.childDirs {
child, err := buildKopiaDirs(childName, childDir, globalExcludeSet, progress)
child, err := buildKopiaDirs(
childName,
childDir,
globalExcludeSet,
progress)
if err != nil {
return nil, err
}
@ -675,17 +682,13 @@ func buildKopiaDirs(
return virtualfs.NewStreamingDirectory(
encodeAsPath(dirName),
getStreamItemFunc(
dir.currentPath,
dir.prevPath,
dir.snapshotParams,
childDirs,
dir.collection,
dir.baseDir,
globalExcludeSet,
!dir.subtreeChanged,
progress)), nil
}
type treeMap struct {
type snapshotParams struct {
// path.Path representing the node's path. This is passed as a parameter to
// the stream item function so that even baseDir directories can properly
// generate the full path of items.
@ -694,8 +697,6 @@ type treeMap struct {
// base snapshot.
prevPath path.Path
// Child directories of this directory.
childDirs map[string]*treeMap
// Reference to data pulled from the external service. Contains only items in
// this directory. Does not contain references to subdirectories.
collection data.BackupCollection
@ -711,7 +712,13 @@ type treeMap struct {
// moved, renamed, deleted, or added. If not then we should return both the
// kopia files and the kopia directories in the base entry because we're also
// doing selective subtree pruning during hierarchy merging.
subtreeChanged bool
streamBaseEnts bool
}
type treeMap struct {
snapshotParams
// Child directories of this directory.
childDirs map[string]*treeMap
}
func newTreeMap() *treeMap {
@ -827,7 +834,8 @@ func inflateCollectionTree(
switch s.State() {
case data.DeletedState:
if s.PreviousPath() == nil {
return nil, nil, clues.New("nil previous path on deleted collection").WithClues(ictx)
return nil, nil, clues.New("nil previous path on deleted collection").
WithClues(ictx)
}
changedPaths = append(changedPaths, s.PreviousPath())
@ -857,7 +865,8 @@ func inflateCollectionTree(
// changed via one of the ancestor folders being moved. This catches the
// ancestor folder move.
if err := addMergeLocation(s, toMerge); err != nil {
return nil, nil, clues.Wrap(err, "adding merge location").WithClues(ictx)
return nil, nil, clues.Wrap(err, "adding merge location").
WithClues(ictx)
}
case data.NotMovedState:
p := s.PreviousPath().String()
@ -881,7 +890,8 @@ func inflateCollectionTree(
// Make sure there's only a single collection adding items for any given
// path in the new hierarchy.
if node.collection != nil {
return nil, nil, clues.New("multiple instances of collection").WithClues(ictx)
return nil, nil, clues.New("multiple instances of collection").
WithClues(ictx)
}
node.collection = s
@ -1118,7 +1128,7 @@ func traverseBaseDir(
node.baseDir = dir
node.currentPath = curP
node.prevPath = oldP
node.subtreeChanged = changed
node.streamBaseEnts = !changed
}
return nil