diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index fe6cf3f51..a06851543 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -437,7 +437,9 @@ func collectionEntries( if err != nil { // Kopia's uploader swallows errors in most cases, so if we see // something here it's probably a big issue and we should return. - return seen, clues.Wrap(err, "executing callback").WithClues(ctx).With("item_path", itemPath) + return seen, clues.Wrap(err, "executing callback"). + WithClues(ctx). + With("item_path", itemPath) } } } @@ -446,16 +448,13 @@ func collectionEntries( func streamBaseEntries( ctx context.Context, ctr func(context.Context, fs.Entry) error, - curPath path.Path, - prevPath path.Path, + params snapshotParams, locationPath *path.Builder, - dir fs.Directory, encodedSeen map[string]struct{}, globalExcludeSet prefixmatcher.StringSetReader, - streamDirEnts bool, progress *corsoProgress, ) error { - if dir == nil { + if params.baseDir == nil { return nil } @@ -465,124 +464,134 @@ func streamBaseEntries( ) if globalExcludeSet != nil { - longest, excludeSet, _ = globalExcludeSet.LongestPrefix(curPath.String()) + longest, excludeSet, _ = globalExcludeSet.LongestPrefix( + params.currentPath.String()) } ctx = clues.Add( ctx, - "current_directory_path", curPath, + "current_directory_path", params.currentPath, "longest_prefix", path.LoggableDir(longest)) - err := dir.IterateEntries(ctx, func(innerCtx context.Context, entry fs.Entry) error { - if err := innerCtx.Err(); err != nil { - return clues.Stack(err).WithClues(ctx) - } + err := params.baseDir.IterateEntries( + ctx, + func(innerCtx context.Context, entry fs.Entry) error { + if err := innerCtx.Err(); err != nil { + return clues.Stack(err).WithClues(ctx) + } - entName, err := decodeElement(entry.Name()) - if err != nil { - return clues.Wrap(err, "decoding entry name"). - WithClues(ctx). - With("entry_name", entry.Name()) - } + entName, err := decodeElement(entry.Name()) + if err != nil { + return clues.Wrap(err, "decoding entry name"). + WithClues(ctx). + With("entry_name", entry.Name()) + } - if d, ok := entry.(fs.Directory); ok { - // Don't walk subdirectories in this function. - if !streamDirEnts { + if d, ok := entry.(fs.Directory); ok { + // Don't walk subdirectories in this function. + if !params.streamBaseEnts { + return nil + } + + // Do walk subdirectories. The previous and current path of the + // directory can be generated by appending the directory name onto the + // previous and current path of this directory. Since the directory has + // no BackupCollection associated with it (part of the criteria for + // allowing walking directories in this function) there shouldn't be any + // LocationPath information associated with the directory. + newP, err := params.currentPath.Append(false, entName) + if err != nil { + return clues.Wrap(err, "getting current directory path"). + WithClues(ctx) + } + + oldP, err := params.prevPath.Append(false, entName) + if err != nil { + return clues.Wrap(err, "getting previous directory path"). + WithClues(ctx) + } + + e := virtualfs.NewStreamingDirectory( + entry.Name(), + getStreamItemFunc( + snapshotParams{ + currentPath: newP, + prevPath: oldP, + collection: nil, + baseDir: d, + streamBaseEnts: params.streamBaseEnts, + }, + nil, + globalExcludeSet, + progress)) + + return clues.Wrap(ctr(ctx, e), "executing callback on subdirectory"). + WithClues(ctx). + With("directory_path", newP). + OrNil() + } + + // This entry was either updated or deleted. In either case, the external + // service notified us about it and it's already been handled so we should + // skip it here. + if _, ok := encodedSeen[entry.Name()]; ok { return nil } - // Do walk subdirectories. The previous and current path of the directory - // can be generated by appending the directory name onto the previous and - // current path of this directory. Since the directory has no - // BackupCollection associated with it (part of the criteria for allowing - // walking directories in this function) there shouldn't be any - // LocationPath information associated with the directory. - newP, err := curPath.Append(false, entName) + // This entry was marked as deleted by a service that can't tell us the + // previous path of deleted items, only the item ID. + if _, ok := excludeSet[entName]; ok { + return nil + } + + // For now assuming that item IDs don't need escaping. + itemPath, err := params.currentPath.AppendItem(entName) if err != nil { - return clues.Wrap(err, "getting current directory path").WithClues(ctx) + return clues.Wrap(err, "getting full item path for base entry"). + WithClues(ctx) } - oldP, err := prevPath.Append(false, entName) + // We need the previous path so we can find this item in the base snapshot's + // backup details. If the item moved and we had only the new path, we'd be + // unable to find it in the old backup details because we wouldn't know what + // to look for. + prevItemPath, err := params.prevPath.AppendItem(entName) if err != nil { - return clues.Wrap(err, "getting previous directory path").WithClues(ctx) + return clues.Wrap(err, "getting previous full item path for base entry"). + WithClues(ctx) } - e := virtualfs.NewStreamingDirectory( - entry.Name(), - getStreamItemFunc( - newP, - oldP, - nil, - nil, - d, - globalExcludeSet, - streamDirEnts, - progress)) - - return clues.Wrap(ctr(ctx, e), "executing callback on subdirectory"). - WithClues(ctx). - With("directory_path", newP). - OrNil() - } - - // This entry was either updated or deleted. In either case, the external - // service notified us about it and it's already been handled so we should - // skip it here. - if _, ok := encodedSeen[entry.Name()]; ok { - return nil - } - - // This entry was marked as deleted by a service that can't tell us the - // previous path of deleted items, only the item ID. - if _, ok := excludeSet[entName]; ok { - return nil - } - - // For now assuming that item IDs don't need escaping. - itemPath, err := curPath.AppendItem(entName) - if err != nil { - return clues.Wrap(err, "getting full item path for base entry").WithClues(ctx) - } - - // We need the previous path so we can find this item in the base snapshot's - // backup details. If the item moved and we had only the new path, we'd be - // unable to find it in the old backup details because we wouldn't know what - // to look for. - prevItemPath, err := prevPath.AppendItem(entName) - if err != nil { - return clues.Wrap(err, "getting previous full item path for base entry").WithClues(ctx) - } - - // Meta files aren't in backup details since it's the set of items the user - // sees. - // - // TODO(ashmrtn): We may eventually want to make this a function that is - // passed in so that we can more easily switch it between different external - // service provider implementations. - if !metadata.IsMetadataFile(itemPath) { - // All items have item info in the base backup. However, we need to make - // sure we have enough metadata to find those entries. To do that we add - // the item to progress and having progress aggregate everything for - // later. - d := &itemDetails{ - repoPath: itemPath, - prevPath: prevItemPath, - locationPath: locationPath, - modTime: ptr.To(entry.ModTime()), + // Meta files aren't in backup details since it's the set of items the + // user sees. + // + // TODO(ashmrtn): We may eventually want to make this a function that is + // passed in so that we can more easily switch it between different + // external service provider implementations. + if !metadata.IsMetadataFile(itemPath) { + // All items have item info in the base backup. However, we need to make + // sure we have enough metadata to find those entries. To do that we add + // the item to progress and having progress aggregate everything for + // later. + d := &itemDetails{ + repoPath: itemPath, + prevPath: prevItemPath, + locationPath: locationPath, + modTime: ptr.To(entry.ModTime()), + } + progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) } - progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) - } - if err := ctr(ctx, entry); err != nil { - return clues.Wrap(err, "executing callback on item"). - WithClues(ctx). - With("item_path", itemPath) - } + if err := ctr(ctx, entry); err != nil { + return clues.Wrap(err, "executing callback on item"). + WithClues(ctx). + With("item_path", itemPath) + } - return nil - }) + return nil + }) if err != nil { - return clues.Wrap(err, "traversing items in base snapshot directory").WithClues(ctx) + return clues.Wrap(err, "traversing items in base snapshot directory"). + WithClues(ctx) } return nil @@ -593,13 +602,9 @@ func streamBaseEntries( // kopia callbacks on directory entries. It binds the directory to the given // DataCollection. func getStreamItemFunc( - curPath path.Path, - prevPath path.Path, + params snapshotParams, staticEnts []fs.Entry, - streamedEnts data.BackupCollection, - baseDir fs.Directory, globalExcludeSet prefixmatcher.StringSetReader, - streamDirEnts bool, progress *corsoProgress, ) func(context.Context, func(context.Context, fs.Entry) error) error { return func(ctx context.Context, ctr func(context.Context, fs.Entry) error) error { @@ -609,17 +614,18 @@ func getStreamItemFunc( // Return static entries in this directory first. for _, d := range staticEnts { if err := ctr(ctx, d); err != nil { - return clues.Wrap(err, "executing callback on static directory").WithClues(ctx) + return clues.Wrap(err, "executing callback on static directory"). + WithClues(ctx) } } var locationPath *path.Builder - if lp, ok := streamedEnts.(data.LocationPather); ok { + if lp, ok := params.collection.(data.LocationPather); ok { locationPath = lp.LocationPath() } - seen, err := collectionEntries(ctx, ctr, streamedEnts, progress) + seen, err := collectionEntries(ctx, ctr, params.collection, progress) if err != nil { return clues.Wrap(err, "streaming collection entries") } @@ -627,13 +633,10 @@ func getStreamItemFunc( if err := streamBaseEntries( ctx, ctr, - curPath, - prevPath, + params, locationPath, - baseDir, seen, globalExcludeSet, - streamDirEnts, progress); err != nil { return clues.Wrap(err, "streaming base snapshot entries") } @@ -664,7 +667,11 @@ func buildKopiaDirs( // backup's backup details for childName, childDir := range dir.childDirs { - child, err := buildKopiaDirs(childName, childDir, globalExcludeSet, progress) + child, err := buildKopiaDirs( + childName, + childDir, + globalExcludeSet, + progress) if err != nil { return nil, err } @@ -675,17 +682,13 @@ func buildKopiaDirs( return virtualfs.NewStreamingDirectory( encodeAsPath(dirName), getStreamItemFunc( - dir.currentPath, - dir.prevPath, + dir.snapshotParams, childDirs, - dir.collection, - dir.baseDir, globalExcludeSet, - !dir.subtreeChanged, progress)), nil } -type treeMap struct { +type snapshotParams struct { // path.Path representing the node's path. This is passed as a parameter to // the stream item function so that even baseDir directories can properly // generate the full path of items. @@ -694,8 +697,6 @@ type treeMap struct { // base snapshot. prevPath path.Path - // Child directories of this directory. - childDirs map[string]*treeMap // Reference to data pulled from the external service. Contains only items in // this directory. Does not contain references to subdirectories. collection data.BackupCollection @@ -711,7 +712,13 @@ type treeMap struct { // moved, renamed, deleted, or added. If not then we should return both the // kopia files and the kopia directories in the base entry because we're also // doing selective subtree pruning during hierarchy merging. - subtreeChanged bool + streamBaseEnts bool +} + +type treeMap struct { + snapshotParams + // Child directories of this directory. + childDirs map[string]*treeMap } func newTreeMap() *treeMap { @@ -827,7 +834,8 @@ func inflateCollectionTree( switch s.State() { case data.DeletedState: if s.PreviousPath() == nil { - return nil, nil, clues.New("nil previous path on deleted collection").WithClues(ictx) + return nil, nil, clues.New("nil previous path on deleted collection"). + WithClues(ictx) } changedPaths = append(changedPaths, s.PreviousPath()) @@ -857,7 +865,8 @@ func inflateCollectionTree( // changed via one of the ancestor folders being moved. This catches the // ancestor folder move. if err := addMergeLocation(s, toMerge); err != nil { - return nil, nil, clues.Wrap(err, "adding merge location").WithClues(ictx) + return nil, nil, clues.Wrap(err, "adding merge location"). + WithClues(ictx) } case data.NotMovedState: p := s.PreviousPath().String() @@ -881,7 +890,8 @@ func inflateCollectionTree( // Make sure there's only a single collection adding items for any given // path in the new hierarchy. if node.collection != nil { - return nil, nil, clues.New("multiple instances of collection").WithClues(ictx) + return nil, nil, clues.New("multiple instances of collection"). + WithClues(ictx) } node.collection = s @@ -1118,7 +1128,7 @@ func traverseBaseDir( node.baseDir = dir node.currentPath = curP node.prevPath = oldP - node.subtreeChanged = changed + node.streamBaseEnts = !changed } return nil