diff --git a/src/go.mod b/src/go.mod index 36daf6735..bfceb840d 100644 --- a/src/go.mod +++ b/src/go.mod @@ -2,7 +2,7 @@ module github.com/alcionai/corso/src go 1.21 -replace github.com/kopia/kopia => github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3 +replace github.com/kopia/kopia => github.com/alcionai/kopia v0.12.2-0.20231128180219-e763f7c81f23 require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 diff --git a/src/go.sum b/src/go.sum index 5a3a0c7aa..fdf7382fc 100644 --- a/src/go.sum +++ b/src/go.sum @@ -17,8 +17,8 @@ github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpH github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/alcionai/clues v0.0.0-20231115004051-523cbddac8e8 h1:kdpkhcKWWxnZFteGjglHvFTTPCPsvjhKDDY9J1Od6Sg= github.com/alcionai/clues v0.0.0-20231115004051-523cbddac8e8/go.mod h1:hGnRqQtV7YoojQSNMtqFK0TvfcwAljGZhUEQwx1lw34= -github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3 h1:6YjRGjEZr/Bmux1XkS13Re1m1LI7VAcbFsA3PiqO2BI= -github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3/go.mod h1:u5wAx1XN07PJsO1BLBkGicwSrbmAC1biONnumSCA210= +github.com/alcionai/kopia v0.12.2-0.20231128180219-e763f7c81f23 h1:ojlHvw65O21oFTC6kQldVDO69VQ6JEzMdD6aq9aOrY4= +github.com/alcionai/kopia v0.12.2-0.20231128180219-e763f7c81f23/go.mod h1:f4PligAuyEicX+lfTlZltc69nM0eMoXX2nE5sCBdo6Y= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -108,16 +108,16 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= -github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y= github.com/h2non/gock v1.2.0 h1:K6ol8rfrRkUOefooBC8elXoaNGYkpp7y2qcxGG6BzUE= github.com/h2non/gock v1.2.0/go.mod h1:tNhoxHYW2W42cYkYb1WqzdbYIieALC99kpYr7rH/BQk= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= -github.com/hanwen/go-fuse/v2 v2.3.0 h1:t5ivNIH2PK+zw4OBul/iJjsoG9K6kXo4nMDoBpciC8A= -github.com/hanwen/go-fuse/v2 v2.3.0/go.mod h1:xKwi1cF7nXAOBCXujD5ie0ZKsxc8GGSA1rlMJc+8IJs= +github.com/hanwen/go-fuse/v2 v2.4.0 h1:12OhD7CkXXQdvxG2osIdBQLdXh+nmLXY9unkUIe/xaU= +github.com/hanwen/go-fuse/v2 v2.4.0/go.mod h1:xKwi1cF7nXAOBCXujD5ie0ZKsxc8GGSA1rlMJc+8IJs= github.com/hashicorp/cronexpr v1.1.2 h1:wG/ZYIKT+RT3QkOdgYc+xsKWVRgnxJ1OJtjjy84fJ9A= github.com/hashicorp/cronexpr v1.1.2/go.mod h1:P4wA0KBl9C5q2hABiMO7cp6jcIg96CDh1Efb3g1PWA4= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= @@ -158,8 +158,8 @@ github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQ github.com/klauspost/reedsolomon v1.11.8 h1:s8RpUW5TK4hjr+djiOpbZJB4ksx+TdYbRH7vHQpwPOY= github.com/klauspost/reedsolomon v1.11.8/go.mod h1:4bXRN+cVzMdml6ti7qLouuYi32KHJ5MGv0Qd8a47h6A= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/kopia/htmluibuild v0.0.0-20230716183504-d78b44b3a9bd h1:Vskpc00T65HkkDSWbkiXOG5yYsgWg5LN48daUfGZ+u0= -github.com/kopia/htmluibuild v0.0.0-20230716183504-d78b44b3a9bd/go.mod h1:eWer4rx9P8lJo2eKc+Q7AZ1dE1x1hJNdkbDFPzMu1Hw= +github.com/kopia/htmluibuild v0.0.1-0.20231019063300-75c2a788c7d0 h1:TvupyyfbUZzsO4DQJpQhKZnUa61xERcJ+ejCbHWG2NY= +github.com/kopia/htmluibuild v0.0.1-0.20231019063300-75c2a788c7d0/go.mod h1:cSImbrlwvv2phvj5RfScL2v08ghX6xli0PcK6f+t8S0= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index 933fbe692..de4f027cc 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -244,38 +244,233 @@ func (cp *corsoProgress) get(k string) *itemDetails { return cp.pending[k] } -func collectionEntries( +// These define a small state machine describing which source to return an entry +// from next. Phases are traversed in the order defined unless the underlying +// data source isn't present. If an underlying data source is missing, the +// non-pre/post phase associated with that data source is skipped. +// +// Since some phases require initialization of the underlying data source we +// insert additional phases to allow that. Once initialization is completed the +// phase should be updated to the next phase. +// +// A similar tactic can be used to handle tearing down resources for underlying +// data sources if needed. +const ( + initPhase = 0 + staticEntsPhase = 1 + preStreamEntsPhase = 2 + streamEntsPhase = 3 + postStreamEntsPhase = 4 + preBaseDirEntsPhase = 5 + baseDirEntsPhase = 6 + postBaseDirEntsPhase = 7 + terminationPhase = 8 +) + +type corsoDirectoryIterator struct { + ctx context.Context + params snapshotParams + // staticEnts is the set of fs.StreamingDirectory child directories that we've + // generated based on the collections passed in. These entries may or may not + // contain an underlying data.BackupCollection (depending on what was passed + // in) and may or may not contain an fs.Directory (depending on hierarchy + // merging). + staticEnts []fs.Entry + globalExcludeSet prefixmatcher.StringSetReader + progress *corsoProgress + + // endSpan is the callback to stop diagnostic span collection for iteration. + endSpan func() + + // seenEnts contains the encoded names of entries that we've already streamed + // so we can skip returning them again when looking at base entries. Since + // this struct already deals with a single directory these can directly be + // file names instead of paths. + seenEnts map[string]struct{} + // locationPath contains the human-readable location of the underlying + // collection. + locationPath *path.Builder + + // excludeSet is the individual exclude set to use for the longest path prefix + // for this iterator. We use path prefixes because some services (e.x. + // OneDrive) can only provide excluded items for the entire service backup not + // individual directories (e.x. OneDrive deleted items are reported as + // children of the root folder). + excludeSet map[string]struct{} + + // currentPhase is the current state in the state machine. + currentPhase int + + // streamItemsChan contains the channel for the backing collection if there is + // one. Once the backing collection has been traversed this is set to nil. + streamItemsChan <-chan data.Item + // staticEntsIdx contains the index in staticEnts of the next item to be + // returned. Once all static entries have been traversed this is set to + // len(staticEnts). + staticEntsIdx int + // baseDirIter contains the handle to the iterator for the base directory + // found during hierarchy merging. Once all base directory entries have been + // traversed this is set to nil. + baseDirIter fs.DirectoryIterator +} + +// Close releases any remaining resources the iterator may have at the end of +// iteration. +func (d *corsoDirectoryIterator) Close() { + if d.endSpan != nil { + d.endSpan() + } +} + +func (d *corsoDirectoryIterator) Next(ctx context.Context) (fs.Entry, error) { + // Execute the state machine until either: + // * we get an entry to return + // * we exhaust all underlying data sources (end of iteration) + // + // Multiple executions of the state machine may be required for things like + // setting up underlying data sources or finding that there's no more entries + // in the current data source and needing to switch to the next one. + // + // Returned entries are handled with inline return statements. + // + // When an error is encountered it's added to the fault.Bus. We can't return + // these errors since doing so will result in kopia stopping iteration of the + // directory. Since these errors are recorded we won't lose track of them at + // the end of the backup. + for d.currentPhase != terminationPhase { + switch d.currentPhase { + case initPhase: + d.ctx, d.endSpan = diagnostics.Span(d.ctx, "kopia:DirectoryIterator") + d.currentPhase = staticEntsPhase + + case staticEntsPhase: + if d.staticEntsIdx < len(d.staticEnts) { + ent := d.staticEnts[d.staticEntsIdx] + d.staticEntsIdx++ + + return ent, nil + } + + d.currentPhase = preStreamEntsPhase + + case preStreamEntsPhase: + if d.params.collection == nil { + d.currentPhase = preBaseDirEntsPhase + break + } + + if lp, ok := d.params.collection.(data.LocationPather); ok { + d.locationPath = lp.LocationPath() + } + + d.streamItemsChan = d.params.collection.Items(d.ctx, d.progress.errs) + d.seenEnts = map[string]struct{}{} + d.currentPhase = streamEntsPhase + + case streamEntsPhase: + ent, err := d.nextStreamEnt(d.ctx) + if ent != nil { + return ent, nil + } + + // This assumes that once we hit an error we won't generate any more valid + // entries. Record the error in progress but don't return it to kopia + // since doing so will terminate iteration. + if err != nil { + d.progress.errs.AddRecoverable(d.ctx, clues.Stack(err)) + } + + // Done iterating through stream entries, advance the state machine state. + d.currentPhase = postStreamEntsPhase + + case postStreamEntsPhase: + d.streamItemsChan = nil + d.currentPhase = preBaseDirEntsPhase + + case preBaseDirEntsPhase: + // We have no iterator from which to pull entries, switch to the next + // state machine state. + if d.params.baseDir == nil { + d.currentPhase = postBaseDirEntsPhase + break + } + + var err error + + d.baseDirIter, err = d.params.baseDir.Iterate(d.ctx) + if err != nil { + // We have no iterator from which to pull entries, switch to the next + // state machine state. + d.currentPhase = postBaseDirEntsPhase + d.progress.errs.AddRecoverable( + d.ctx, + clues.Wrap(err, "getting base directory iterator")) + + break + } + + if d.globalExcludeSet != nil { + longest, excludeSet, _ := d.globalExcludeSet.LongestPrefix( + d.params.currentPath.String()) + d.excludeSet = excludeSet + + logger.Ctx(d.ctx).Debugw("found exclude set", "set_prefix", longest) + } + + d.currentPhase = baseDirEntsPhase + + case baseDirEntsPhase: + ent, err := d.nextBaseEnt(d.ctx) + if ent != nil { + return ent, nil + } + + // This assumes that once we hit an error we won't generate any more valid + // entries. Record the error in progress but don't return it to kopia + // since doing so will terminate iteration. + if err != nil { + d.progress.errs.AddRecoverable(d.ctx, clues.Stack(err)) + } + + // Done iterating through base entries, advance the state machine state. + d.currentPhase = postBaseDirEntsPhase + + case postBaseDirEntsPhase: + // Making a separate phase so adding additional phases after this one is + // less error prone if we ever need to do that. + if d.baseDirIter != nil { + d.baseDirIter.Close() + d.baseDirIter = nil + } + + d.seenEnts = nil + d.excludeSet = nil + + d.currentPhase = terminationPhase + } + } + + return nil, nil +} + +func (d *corsoDirectoryIterator) nextStreamEnt( ctx context.Context, - ctr func(context.Context, fs.Entry) error, - streamedEnts data.BackupCollection, - progress *corsoProgress, -) (map[string]struct{}, error) { - if streamedEnts == nil { - return nil, nil - } - - var ( - locationPath *path.Builder - // Track which items have already been seen so we can skip them if we see - // them again in the data from the base snapshot. - seen = map[string]struct{}{} - items = streamedEnts.Items(ctx, progress.errs) - ) - - if lp, ok := streamedEnts.(data.LocationPather); ok { - locationPath = lp.LocationPath() - } - +) (fs.Entry, error) { + // Loop over results until we get something we can return. Required because we + // could see deleted items. for { select { case <-ctx.Done(): - return seen, clues.StackWC(ctx, ctx.Err()) + return nil, clues.StackWC(ctx, ctx.Err()) - case e, ok := <-items: + case e, ok := <-d.streamItemsChan: + // Channel was closed, no more entries to return. if !ok { - return seen, nil + return nil, nil } + // Got an entry to process, see if it's a deletion marker or something to + // return to kopia. encodedName := encodeAsPath(e.ID()) // Even if this item has been deleted and should not appear at all in @@ -291,13 +486,13 @@ func collectionEntries( // items we need to track. Namely, we can track the created time of the // item and if it's after the base snapshot was finalized we can skip it // because it's not possible for the base snapshot to contain that item. - seen[encodedName] = struct{}{} + d.seenEnts[encodedName] = struct{}{} // For now assuming that item IDs don't need escaping. - itemPath, err := streamedEnts.FullPath().AppendItem(e.ID()) + itemPath, err := d.params.currentPath.AppendItem(e.ID()) if err != nil { err = clues.Wrap(err, "getting full item path") - progress.errs.AddRecoverable(ctx, err) + d.progress.errs.AddRecoverable(ctx, err) logger.CtxErr(ctx, err).Error("getting full item path") @@ -323,9 +518,8 @@ func collectionEntries( ei, ok := e.(data.ItemInfo) if ok { // Relative path given to us in the callback is missing the root - // element. Add to pending set before calling the callback to avoid race - // conditions when the item is completed. - d := &itemDetails{ + // element. + deetsEnt := &itemDetails{ infoer: ei, repoPath: itemPath, // Also use the current path as the previous path for this item. This @@ -336,222 +530,160 @@ func collectionEntries( // This all works out because cached item checks in kopia are direct // path + metadata comparisons. prevPath: itemPath, - locationPath: locationPath, + locationPath: d.locationPath, modTime: &modTime, } - progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) + d.progress.put( + encodeAsPath(itemPath.PopFront().Elements()...), + deetsEnt) } - entry := virtualfs.StreamingFileWithModTimeFromReader( + return virtualfs.StreamingFileWithModTimeFromReader( encodedName, modTime, - e.ToReader()) - - err = ctr(ctx, entry) - if err != nil { - // Kopia's uploader swallows errors in most cases, so if we see - // something here it's probably a big issue and we should return. - return seen, clues.WrapWC(ctx, err, "executing callback"). - With("item_path", itemPath) - } + e.ToReader()), nil } } } -func streamBaseEntries( +func (d *corsoDirectoryIterator) nextBaseEnt( ctx context.Context, - ctr func(context.Context, fs.Entry) error, - params snapshotParams, - locationPath *path.Builder, - encodedSeen map[string]struct{}, - globalExcludeSet prefixmatcher.StringSetReader, - progress *corsoProgress, -) error { - if params.baseDir == nil { - return nil - } - +) (fs.Entry, error) { var ( - longest string - excludeSet map[string]struct{} + entry fs.Entry + err error ) - if globalExcludeSet != nil { - longest, excludeSet, _ = globalExcludeSet.LongestPrefix( - params.currentPath.String()) - } - - ctx = clues.Add( - ctx, - "current_directory_path", params.currentPath, - "longest_prefix", path.LoggableDir(longest)) - - err := params.baseDir.IterateEntries( - ctx, - func(innerCtx context.Context, entry fs.Entry) error { - if err := innerCtx.Err(); err != nil { - return clues.StackWC(ctx, err) - } - - entName, err := decodeElement(entry.Name()) - if err != nil { - return clues.WrapWC(ctx, err, "decoding entry name"). - With("entry_name", entry.Name()) - } - - if d, ok := entry.(fs.Directory); ok { - // Don't walk subdirectories in this function. - if !params.streamBaseEnts { - return nil - } - - // Do walk subdirectories. The previous and current path of the - // directory can be generated by appending the directory name onto the - // previous and current path of this directory. Since the directory has - // no BackupCollection associated with it (part of the criteria for - // allowing walking directories in this function) there shouldn't be any - // LocationPath information associated with the directory. - newP, err := params.currentPath.Append(false, entName) - if err != nil { - return clues.WrapWC(ctx, err, "getting current directory path") - } - - oldP, err := params.prevPath.Append(false, entName) - if err != nil { - return clues.WrapWC(ctx, err, "getting previous directory path") - } - - e := virtualfs.NewStreamingDirectory( - entry.Name(), - getStreamItemFunc( - snapshotParams{ - currentPath: newP, - prevPath: oldP, - collection: nil, - baseDir: d, - streamBaseEnts: params.streamBaseEnts, - }, - nil, - globalExcludeSet, - progress)) - - return clues.WrapWC(ctx, ctr(ctx, e), "executing callback on subdirectory"). - With("directory_path", newP). - OrNil() - } - - // This entry was either updated or deleted. In either case, the external - // service notified us about it and it's already been handled so we should - // skip it here. - if _, ok := encodedSeen[entry.Name()]; ok { - return nil - } - - // This entry was marked as deleted by a service that can't tell us the - // previous path of deleted items, only the item ID. - if _, ok := excludeSet[entName]; ok { - return nil - } - - // For now assuming that item IDs don't need escaping. - itemPath, err := params.currentPath.AppendItem(entName) - if err != nil { - return clues.WrapWC(ctx, err, "getting full item path for base entry") - } - - // We need the previous path so we can find this item in the base snapshot's - // backup details. If the item moved and we had only the new path, we'd be - // unable to find it in the old backup details because we wouldn't know what - // to look for. - prevItemPath, err := params.prevPath.AppendItem(entName) - if err != nil { - return clues.WrapWC(ctx, err, "getting previous full item path for base entry") - } - - // Meta files aren't in backup details since it's the set of items the - // user sees. - // - // TODO(ashmrtn): We may eventually want to make this a function that is - // passed in so that we can more easily switch it between different - // external service provider implementations. - if !metadata.IsMetadataFile(itemPath) { - // All items have item info in the base backup. However, we need to make - // sure we have enough metadata to find those entries. To do that we add - // the item to progress and having progress aggregate everything for - // later. - d := &itemDetails{ - repoPath: itemPath, - prevPath: prevItemPath, - locationPath: locationPath, - modTime: ptr.To(entry.ModTime()), - } - progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) - } - - if err := ctr(ctx, entry); err != nil { - return clues.WrapWC(ctx, err, "executing callback on item"). - With("item_path", itemPath) - } - - return nil - }) - if err != nil { - return clues.WrapWC(ctx, err, "traversing items in base snapshot directory") - } - - return nil -} - -// getStreamItemFunc returns a function that can be used by kopia's -// virtualfs.StreamingDirectory to iterate through directory entries and call -// kopia callbacks on directory entries. It binds the directory to the given -// DataCollection. -func getStreamItemFunc( - params snapshotParams, - staticEnts []fs.Entry, - globalExcludeSet prefixmatcher.StringSetReader, - progress *corsoProgress, -) func(context.Context, func(context.Context, fs.Entry) error) error { - return func(ctx context.Context, ctr func(context.Context, fs.Entry) error) error { - ctx, end := diagnostics.Span(ctx, "kopia:getStreamItemFunc") - defer end() - - // Return static entries in this directory first. - for _, d := range staticEnts { - if err := ctr(ctx, d); err != nil { - return clues.WrapWC(ctx, err, "executing callback on static directory") - } - } - - var locationPath *path.Builder - - if lp, ok := params.collection.(data.LocationPather); ok { - locationPath = lp.LocationPath() - } - - seen, err := collectionEntries(ctx, ctr, params.collection, progress) + for entry, err = d.baseDirIter.Next(ctx); entry != nil && err == nil; entry, err = d.baseDirIter.Next(ctx) { + entName, err := decodeElement(entry.Name()) if err != nil { - return clues.Wrap(err, "streaming collection entries") + err = clues.WrapWC(ctx, err, "decoding entry name"). + With("entry_name", clues.Hide(entry.Name())) + d.progress.errs.AddRecoverable(ctx, err) + + continue } - if err := streamBaseEntries( - ctx, - ctr, - params, - locationPath, - seen, - globalExcludeSet, - progress); err != nil { - return clues.Wrap(err, "streaming base snapshot entries") + ctx = clues.Add(ctx, "entry_name", clues.Hide(entName)) + + if dir, ok := entry.(fs.Directory); ok { + // Don't walk subdirectories in this function. + if !d.params.streamBaseEnts { + continue + } + + // Do walk subdirectories. The previous and current path of the + // directory can be generated by appending the directory name onto the + // previous and current path of this directory. Since the directory has + // no BackupCollection associated with it (part of the criteria for + // allowing walking directories in this function) there shouldn't be any + // LocationPath information associated with the directory. + newP, err := d.params.currentPath.Append(false, entName) + if err != nil { + err = clues.WrapWC(ctx, err, "getting current directory path") + d.progress.errs.AddRecoverable(ctx, err) + + continue + } + + ctx = clues.Add(ctx, "child_directory_path", newP) + + oldP, err := d.params.prevPath.Append(false, entName) + if err != nil { + err = clues.WrapWC(ctx, err, "getting previous directory path") + d.progress.errs.AddRecoverable(ctx, err) + + continue + } + + return virtualfs.NewStreamingDirectory( + entry.Name(), + &corsoDirectoryIterator{ + ctx: ctx, + params: snapshotParams{ + currentPath: newP, + prevPath: oldP, + collection: nil, + baseDir: dir, + streamBaseEnts: d.params.streamBaseEnts, + }, + globalExcludeSet: d.globalExcludeSet, + progress: d.progress, + }), nil } - return nil + // This entry was either updated or deleted. In either case, the external + // service notified us about it and it's already been handled so we should + // skip it here. + if _, ok := d.seenEnts[entry.Name()]; ok { + continue + } + + // This entry was marked as deleted by a service that can't tell us the + // previous path of deleted items, only the item ID. + if _, ok := d.excludeSet[entName]; ok { + continue + } + + // This is a path used in corso not kopia so it doesn't need to encode the + // item name. + itemPath, err := d.params.currentPath.AppendItem(entName) + if err != nil { + err = clues.WrapWC(ctx, err, "getting full item path for base entry") + d.progress.errs.AddRecoverable(ctx, err) + + continue + } + + ctx = clues.Add(ctx, "item_path", itemPath) + + // We need the previous path so we can find this item in the base snapshot's + // backup details. If the item moved and we had only the new path, we'd be + // unable to find it in the old backup details because we wouldn't know what + // to look for. + prevItemPath, err := d.params.prevPath.AppendItem(entName) + if err != nil { + err = clues.WrapWC( + ctx, + err, + "getting previous full item path for base entry") + d.progress.errs.AddRecoverable(ctx, err) + + continue + } + + // Meta files aren't in backup details since it's the set of items the + // user sees. + // + // TODO(ashmrtn): We may eventually want to make this a function that is + // passed in so that we can more easily switch it between different + // external service provider implementations. + if !metadata.IsMetadataFile(itemPath) { + // All items have item info in the base backup. However, we need to make + // sure we have enough metadata to find those entries. To do that we add + // the item to progress and having progress aggregate everything for + // later. + detailsEnt := &itemDetails{ + repoPath: itemPath, + prevPath: prevItemPath, + locationPath: d.locationPath, + modTime: ptr.To(entry.ModTime()), + } + d.progress.put( + encodeAsPath(itemPath.PopFront().Elements()...), + detailsEnt) + } + + return entry, nil } + + return nil, clues.Stack(err).OrNil() } // buildKopiaDirs recursively builds a directory hierarchy from the roots up. // Returned directories are virtualfs.StreamingDirectory. func buildKopiaDirs( + ctx context.Context, dirName string, dir *treeMap, globalExcludeSet prefixmatcher.StringSetReader, @@ -572,6 +704,7 @@ func buildKopiaDirs( for childName, childDir := range dir.childDirs { child, err := buildKopiaDirs( + ctx, childName, childDir, globalExcludeSet, @@ -585,11 +718,13 @@ func buildKopiaDirs( return virtualfs.NewStreamingDirectory( encodeAsPath(dirName), - getStreamItemFunc( - dir.snapshotParams, - childDirs, - globalExcludeSet, - progress)), nil + &corsoDirectoryIterator{ + ctx: ctx, + params: dir.snapshotParams, + staticEnts: childDirs, + globalExcludeSet: globalExcludeSet, + progress: progress, + }), nil } type snapshotParams struct { @@ -987,23 +1122,26 @@ func traverseBaseDir( var hasItems bool if changed { - err = dir.IterateEntries(ctx, func(innerCtx context.Context, entry fs.Entry) error { - dEntry, ok := entry.(fs.Directory) - if !ok { - hasItems = true - return nil - } + err = fs.IterateEntries( + ctx, + dir, + func(innerCtx context.Context, entry fs.Entry) error { + dEntry, ok := entry.(fs.Directory) + if !ok { + hasItems = true + return nil + } - return traverseBaseDir( - innerCtx, - depth+1, - updatedPaths, - oldDirPath, - currentPath, - dEntry, - roots, - stats) - }) + return traverseBaseDir( + innerCtx, + depth+1, + updatedPaths, + oldDirPath, + currentPath, + dEntry, + roots, + stats) + }) if err != nil { return clues.WrapWC(ctx, err, "traversing base directory") } @@ -1260,7 +1398,7 @@ func inflateDirTree( var res fs.Directory for dirName, dir := range roots { - tmp, err := buildKopiaDirs(dirName, dir, globalExcludeSet, progress) + tmp, err := buildKopiaDirs(ctx, dirName, dir, globalExcludeSet, progress) if err != nil { return nil, err } diff --git a/src/internal/kopia/upload_test.go b/src/internal/kopia/upload_test.go index 44c0437ef..bd88db1f3 100644 --- a/src/internal/kopia/upload_test.go +++ b/src/internal/kopia/upload_test.go @@ -2556,12 +2556,13 @@ type mockStaticDirectory struct { iterateCount int } -func (msd *mockStaticDirectory) IterateEntries( +func (msd *mockStaticDirectory) Iterate( ctx context.Context, - callback func(context.Context, fs.Entry) error, -) error { +) (fs.DirectoryIterator, error) { msd.iterateCount++ - return msd.Directory.IterateEntries(ctx, callback) + iter, err := msd.Directory.Iterate(ctx) + + return iter, clues.Stack(err).OrNil() } func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreePruning() {