From 5209e8be93886ac6c036d52b0238ae5bd69e99ad Mon Sep 17 00:00:00 2001 From: Ashlie Martinez Date: Wed, 4 Oct 2023 08:05:37 -0700 Subject: [PATCH] PoC for updated kopia iterator Some small refinements like using the iterator during merging could be done. This fixes streaming directory traversal. --- src/go.mod | 15 +- src/go.sum | 26 +- src/internal/kopia/upload.go | 586 +++++++++++++++++------------- src/internal/kopia/upload_test.go | 9 +- 4 files changed, 360 insertions(+), 276 deletions(-) diff --git a/src/go.mod b/src/go.mod index 96d163731..799903841 100644 --- a/src/go.mod +++ b/src/go.mod @@ -2,7 +2,8 @@ module github.com/alcionai/corso/src go 1.21 -replace github.com/kopia/kopia => github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3 +//replace github.com/kopia/kopia => github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3 +replace github.com/kopia/kopia => ./submodules/kopia require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.1 @@ -61,8 +62,8 @@ require ( github.com/subosito/gotenv v1.4.2 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.48.0 // indirect - go.opentelemetry.io/otel/metric v1.18.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 // indirect + go.opentelemetry.io/otel/metric v1.19.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect ) require ( @@ -84,7 +85,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/compress v1.17.0 // indirect github.com/klauspost/cpuid/v2 v2.2.5 // indirect github.com/klauspost/pgzip v1.2.6 // indirect github.com/klauspost/reedsolomon v1.11.8 // indirect @@ -117,8 +118,8 @@ require ( github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect github.com/zeebo/blake3 v0.2.3 // indirect - go.opentelemetry.io/otel v1.18.0 // indirect - go.opentelemetry.io/otel/trace v1.18.0 // indirect + go.opentelemetry.io/otel v1.19.0 // indirect + go.opentelemetry.io/otel/trace v1.19.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.13.0 // indirect golang.org/x/mod v0.12.0 // indirect @@ -126,7 +127,7 @@ require ( golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.12.0 // indirect golang.org/x/text v0.13.0 // indirect - google.golang.org/grpc v1.57.0 // indirect + google.golang.org/grpc v1.58.2 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/src/go.sum b/src/go.sum index 054c65071..435d0bf03 100644 --- a/src/go.sum +++ b/src/go.sum @@ -57,8 +57,6 @@ github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpH github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/alcionai/clues v0.0.0-20230920212840-728ac1a1d8b8 h1:8KZyOE9IOxJ9Dg/n4pJKukcOxDAO3fxUcEP2MM0tNCg= github.com/alcionai/clues v0.0.0-20230920212840-728ac1a1d8b8/go.mod h1:iyJK9p061Zb1CqE+DFsofdjAc7/QTHGxdbJsf/mtVHo= -github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3 h1:6YjRGjEZr/Bmux1XkS13Re1m1LI7VAcbFsA3PiqO2BI= -github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3/go.mod h1:u5wAx1XN07PJsO1BLBkGicwSrbmAC1biONnumSCA210= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -246,8 +244,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= -github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= @@ -449,12 +447,12 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= -go.opentelemetry.io/otel v1.18.0 h1:TgVozPGZ01nHyDZxK5WGPFB9QexeTMXEH7+tIClWfzs= -go.opentelemetry.io/otel v1.18.0/go.mod h1:9lWqYO0Db579XzVuCKFNPDl4s73Voa+zEck3wHaAYQI= -go.opentelemetry.io/otel/metric v1.18.0 h1:JwVzw94UYmbx3ej++CwLUQZxEODDj/pOuTCvzhtRrSQ= -go.opentelemetry.io/otel/metric v1.18.0/go.mod h1:nNSpsVDjWGfb7chbRLUNW+PBNdcSTHD4Uu5pfFMOI0k= -go.opentelemetry.io/otel/trace v1.18.0 h1:NY+czwbHbmndxojTEKiSMHkG2ClNH2PwmcHrdo0JY10= -go.opentelemetry.io/otel/trace v1.18.0/go.mod h1:T2+SGJGuYZY3bjj5rgh/hN7KIrlpWC5nS8Mjvzckz+0= +go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= +go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= +go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= +go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= +go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= +go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -756,8 +754,8 @@ google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 h1:wukfNtZmZUurLN/atp2hiIeTKn7QJWIQdHzqmsOnAOk= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 h1:N3bU/SQDCDyD6R528GJ/PwW9KjYcJA3dgyH+MovAkIM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13/go.mod h1:KSqppvjFjtoCI+KGd4PELB0qLNxdJHRGqRI09mB6pQA= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -774,8 +772,8 @@ google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= -google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= -google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I= +google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index 6030ec838..c209a4ff0 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -20,7 +20,6 @@ import ( "github.com/alcionai/corso/src/internal/common/prefixmatcher" "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/data" - "github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/graph/metadata" "github.com/alcionai/corso/src/pkg/backup/details" @@ -235,38 +234,77 @@ func (cp *corsoProgress) get(k string) *itemDetails { return cp.pending[k] } -func collectionEntries( +// These define a small state machine as to which source to return an entry from +// next. Since these are in-memory only values we can use iota. All iterators +// start by streaming static entries. +// +// Since some phases require initialization of the underlying data source we +// insert additional phases to allow that. Once initialization is completed the +// phase should be updated to the next phase. +// +// A similar tactic can be used to handle tearing down resources for underlying +// data sources if needed. +const ( + staticEntsPhase = iota + preStreamEntsPhase + streamEntsPhase + preBaseDirEntsPhase + baseDirEntsPhase + postBaseDirEntsPhase + terminationPhase +) + +type corsoDirectoryIterator struct { + params snapshotParams + staticEnts []fs.Entry + globalExcludeSet prefixmatcher.StringSetReader + progress *corsoProgress + + // seenEnts contains the encoded names of entries that we've already streamed + // so we can skip returning them again when looking at base entries. + seenEnts map[string]struct{} + // locationPath contains the human-readable location of the underlying + // collection. + locationPath *path.Builder + + // excludeSet is the individual exclude set to use for the longest prefix for + // this iterator. + excludeSet map[string]struct{} + + // traversalPhase is the current state in the state machine. + traversalPhase int + + // streamItemsChan contains the channel for the backing collection if there is + // one. Once the backing collection has been traversed this is set to nil. + streamItemsChan <-chan data.Item + // staticEntsIdx contains the index in staticEnts of the next item to be + // returned. Once all static entries have been traversed this is set to + // len(staticEnts). + staticEntsIdx int + // baseDirIter contains the handle to the iterator for the base directory + // found during hierarchy merging. Once all base directory entries have been + // traversed this is set to nil. + baseDirIter fs.DirectoryIterator +} + +func (d *corsoDirectoryIterator) nextStreamEnt( ctx context.Context, - ctr func(context.Context, fs.Entry) error, - streamedEnts data.BackupCollection, - progress *corsoProgress, -) (map[string]struct{}, error) { - if streamedEnts == nil { - return nil, nil - } - - var ( - locationPath *path.Builder - // Track which items have already been seen so we can skip them if we see - // them again in the data from the base snapshot. - seen = map[string]struct{}{} - items = streamedEnts.Items(ctx, progress.errs) - ) - - if lp, ok := streamedEnts.(data.LocationPather); ok { - locationPath = lp.LocationPath() - } - +) (fs.Entry, error) { + // Loop over results until we get something we can return. Required because we + // could see deleted items. for { select { case <-ctx.Done(): - return seen, clues.Stack(ctx.Err()).WithClues(ctx) + return nil, clues.Stack(ctx.Err()).WithClues(ctx) - case e, ok := <-items: + case e, ok := <-d.streamItemsChan: + // Channel was closed, no more entries to return. if !ok { - return seen, nil + return nil, nil } + // Got an entry to process, see if it's a deletion marker or something to + // return to kopia. encodedName := encodeAsPath(e.ID()) // Even if this item has been deleted and should not appear at all in @@ -282,13 +320,13 @@ func collectionEntries( // items we need to track. Namely, we can track the created time of the // item and if it's after the base snapshot was finalized we can skip it // because it's not possible for the base snapshot to contain that item. - seen[encodedName] = struct{}{} + d.seenEnts[encodedName] = struct{}{} // For now assuming that item IDs don't need escaping. - itemPath, err := streamedEnts.FullPath().AppendItem(e.ID()) + itemPath, err := d.params.currentPath.AppendItem(e.ID()) if err != nil { err = clues.Wrap(err, "getting full item path") - progress.errs.AddRecoverable(ctx, err) + d.progress.errs.AddRecoverable(ctx, err) logger.CtxErr(ctx, err).Error("getting full item path") @@ -316,7 +354,7 @@ func collectionEntries( // Relative path given to us in the callback is missing the root // element. Add to pending set before calling the callback to avoid race // conditions when the item is completed. - d := &itemDetails{ + deetsEnt := &itemDetails{ infoer: ei, repoPath: itemPath, // Also use the current path as the previous path for this item. This @@ -327,229 +365,271 @@ func collectionEntries( // This all works out because cached item checks in kopia are direct // path + metadata comparisons. prevPath: itemPath, - locationPath: locationPath, + locationPath: d.locationPath, modTime: &modTime, } - progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) + d.progress.put( + encodeAsPath(itemPath.PopFront().Elements()...), + deetsEnt) } - entry := virtualfs.StreamingFileWithModTimeFromReader( + return virtualfs.StreamingFileWithModTimeFromReader( encodedName, modTime, - e.ToReader()) - - err = ctr(ctx, entry) - if err != nil { - // Kopia's uploader swallows errors in most cases, so if we see - // something here it's probably a big issue and we should return. - return seen, clues.Wrap(err, "executing callback"). - WithClues(ctx). - With("item_path", itemPath) - } + e.ToReader()), nil } } } -func streamBaseEntries( +func (d *corsoDirectoryIterator) nextBaseEnt( ctx context.Context, - ctr func(context.Context, fs.Entry) error, - params snapshotParams, - locationPath *path.Builder, - encodedSeen map[string]struct{}, - globalExcludeSet prefixmatcher.StringSetReader, - progress *corsoProgress, -) error { - if params.baseDir == nil { - return nil - } - +) (fs.Entry, error) { var ( - longest string - excludeSet map[string]struct{} + entry fs.Entry + err error ) - if globalExcludeSet != nil { - longest, excludeSet, _ = globalExcludeSet.LongestPrefix( - params.currentPath.String()) - } - - ctx = clues.Add( - ctx, - "current_directory_path", params.currentPath, - "longest_prefix", path.LoggableDir(longest)) - - err := params.baseDir.IterateEntries( - ctx, - func(innerCtx context.Context, entry fs.Entry) error { - if err := innerCtx.Err(); err != nil { - return clues.Stack(err).WithClues(ctx) - } - - entName, err := decodeElement(entry.Name()) - if err != nil { - return clues.Wrap(err, "decoding entry name"). - WithClues(ctx). - With("entry_name", entry.Name()) - } - - if d, ok := entry.(fs.Directory); ok { - // Don't walk subdirectories in this function. - if !params.streamBaseEnts { - return nil - } - - // Do walk subdirectories. The previous and current path of the - // directory can be generated by appending the directory name onto the - // previous and current path of this directory. Since the directory has - // no BackupCollection associated with it (part of the criteria for - // allowing walking directories in this function) there shouldn't be any - // LocationPath information associated with the directory. - newP, err := params.currentPath.Append(false, entName) - if err != nil { - return clues.Wrap(err, "getting current directory path"). - WithClues(ctx) - } - - oldP, err := params.prevPath.Append(false, entName) - if err != nil { - return clues.Wrap(err, "getting previous directory path"). - WithClues(ctx) - } - - e := virtualfs.NewStreamingDirectory( - entry.Name(), - getStreamItemFunc( - snapshotParams{ - currentPath: newP, - prevPath: oldP, - collection: nil, - baseDir: d, - streamBaseEnts: params.streamBaseEnts, - }, - nil, - globalExcludeSet, - progress)) - - return clues.Wrap(ctr(ctx, e), "executing callback on subdirectory"). - WithClues(ctx). - With("directory_path", newP). - OrNil() - } - - // This entry was either updated or deleted. In either case, the external - // service notified us about it and it's already been handled so we should - // skip it here. - if _, ok := encodedSeen[entry.Name()]; ok { - return nil - } - - // This entry was marked as deleted by a service that can't tell us the - // previous path of deleted items, only the item ID. - if _, ok := excludeSet[entName]; ok { - return nil - } - - // For now assuming that item IDs don't need escaping. - itemPath, err := params.currentPath.AppendItem(entName) - if err != nil { - return clues.Wrap(err, "getting full item path for base entry"). - WithClues(ctx) - } - - // We need the previous path so we can find this item in the base snapshot's - // backup details. If the item moved and we had only the new path, we'd be - // unable to find it in the old backup details because we wouldn't know what - // to look for. - prevItemPath, err := params.prevPath.AppendItem(entName) - if err != nil { - return clues.Wrap(err, "getting previous full item path for base entry"). - WithClues(ctx) - } - - // Meta files aren't in backup details since it's the set of items the - // user sees. - // - // TODO(ashmrtn): We may eventually want to make this a function that is - // passed in so that we can more easily switch it between different - // external service provider implementations. - if !metadata.IsMetadataFile(itemPath) { - // All items have item info in the base backup. However, we need to make - // sure we have enough metadata to find those entries. To do that we add - // the item to progress and having progress aggregate everything for - // later. - d := &itemDetails{ - repoPath: itemPath, - prevPath: prevItemPath, - locationPath: locationPath, - modTime: ptr.To(entry.ModTime()), - } - progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) - } - - if err := ctr(ctx, entry); err != nil { - return clues.Wrap(err, "executing callback on item"). - WithClues(ctx). - With("item_path", itemPath) - } - - return nil - }) - if err != nil { - return clues.Wrap(err, "traversing items in base snapshot directory"). - WithClues(ctx) - } - - return nil -} - -// getStreamItemFunc returns a function that can be used by kopia's -// virtualfs.StreamingDirectory to iterate through directory entries and call -// kopia callbacks on directory entries. It binds the directory to the given -// DataCollection. -func getStreamItemFunc( - params snapshotParams, - staticEnts []fs.Entry, - globalExcludeSet prefixmatcher.StringSetReader, - progress *corsoProgress, -) func(context.Context, func(context.Context, fs.Entry) error) error { - return func(ctx context.Context, ctr func(context.Context, fs.Entry) error) error { - ctx, end := diagnostics.Span(ctx, "kopia:getStreamItemFunc") - defer end() - - // Return static entries in this directory first. - for _, d := range staticEnts { - if err := ctr(ctx, d); err != nil { - return clues.Wrap(err, "executing callback on static directory"). - WithClues(ctx) - } - } - - var locationPath *path.Builder - - if lp, ok := params.collection.(data.LocationPather); ok { - locationPath = lp.LocationPath() - } - - seen, err := collectionEntries(ctx, ctr, params.collection, progress) + for entry, err = d.baseDirIter.Next(ctx); entry != nil && err == nil; entry, err = d.baseDirIter.Next(ctx) { + entName, err := decodeElement(entry.Name()) if err != nil { - return clues.Wrap(err, "streaming collection entries") + err = clues.Wrap(err, "decoding entry name"). + WithClues(ctx). + With("entry_name", clues.Hide(entry.Name())) + d.progress.errs.AddRecoverable(ctx, err) + + continue } - if err := streamBaseEntries( - ctx, - ctr, - params, - locationPath, - seen, - globalExcludeSet, - progress); err != nil { - return clues.Wrap(err, "streaming base snapshot entries") + ctx = clues.Add(ctx, "entry_name", clues.Hide(entName)) + + if dir, ok := entry.(fs.Directory); ok { + // Don't walk subdirectories in this function. + if !d.params.streamBaseEnts { + continue + } + + // Do walk subdirectories. The previous and current path of the + // directory can be generated by appending the directory name onto the + // previous and current path of this directory. Since the directory has + // no BackupCollection associated with it (part of the criteria for + // allowing walking directories in this function) there shouldn't be any + // LocationPath information associated with the directory. + newP, err := d.params.currentPath.Append(false, entName) + if err != nil { + err = clues.Wrap(err, "getting current directory path"). + WithClues(ctx) + d.progress.errs.AddRecoverable(ctx, err) + + continue + } + + ctx = clues.Add(ctx, "child_directory_path", newP) + + oldP, err := d.params.prevPath.Append(false, entName) + if err != nil { + err = clues.Wrap(err, "getting previous directory path"). + WithClues(ctx) + d.progress.errs.AddRecoverable(ctx, err) + + continue + } + + return virtualfs.NewStreamingDirectory( + entry.Name(), + &corsoDirectoryIterator{ + params: snapshotParams{ + currentPath: newP, + prevPath: oldP, + collection: nil, + baseDir: dir, + streamBaseEnts: d.params.streamBaseEnts, + }, + globalExcludeSet: d.globalExcludeSet, + progress: d.progress, + }), nil } - return nil + // This entry was either updated or deleted. In either case, the external + // service notified us about it and it's already been handled so we should + // skip it here. + if _, ok := d.seenEnts[entry.Name()]; ok { + continue + } + + // This entry was marked as deleted by a service that can't tell us the + // previous path of deleted items, only the item ID. + if _, ok := d.excludeSet[entName]; ok { + continue + } + + // This is a path used in corso not kopia so it doesn't need to encode the + // item name. + itemPath, err := d.params.currentPath.AppendItem(entName) + if err != nil { + err = clues.Wrap(err, "getting full item path for base entry"). + WithClues(ctx) + d.progress.errs.AddRecoverable(ctx, err) + + continue + } + + ctx = clues.Add(ctx, "item_path", itemPath) + + // We need the previous path so we can find this item in the base snapshot's + // backup details. If the item moved and we had only the new path, we'd be + // unable to find it in the old backup details because we wouldn't know what + // to look for. + prevItemPath, err := d.params.prevPath.AppendItem(entName) + if err != nil { + err = clues.Wrap(err, "getting previous full item path for base entry"). + WithClues(ctx) + d.progress.errs.AddRecoverable(ctx, err) + + continue + } + + // Meta files aren't in backup details since it's the set of items the + // user sees. + // + // TODO(ashmrtn): We may eventually want to make this a function that is + // passed in so that we can more easily switch it between different + // external service provider implementations. + if !metadata.IsMetadataFile(itemPath) { + // All items have item info in the base backup. However, we need to make + // sure we have enough metadata to find those entries. To do that we add + // the item to progress and having progress aggregate everything for + // later. + detailsEnt := &itemDetails{ + repoPath: itemPath, + prevPath: prevItemPath, + locationPath: d.locationPath, + modTime: ptr.To(entry.ModTime()), + } + d.progress.put( + encodeAsPath(itemPath.PopFront().Elements()...), + detailsEnt) + } + + return entry, nil } + + return nil, clues.Stack(err).OrNil() } +func (d *corsoDirectoryIterator) Next(ctx context.Context) (fs.Entry, error) { + // TODO(ashmrtn): Figure out the expected way to do error handling. + // Execute the state machine until either: + // * we get an entry to return + // * we get an error to return + // * we exhaust all underlying data sources (end of iteration) + // + // Multiple executions of the state machine may be required for things like + // setting up underlying data sources or finding that there's no more entries + // in the current data source and needing to switch to the next one. + // + // Returned entries and errors are handled with inline return statements. + for d.traversalPhase != terminationPhase { + switch d.traversalPhase { + case staticEntsPhase: + if d.staticEntsIdx < len(d.staticEnts) { + ent := d.staticEnts[d.staticEntsIdx] + d.staticEntsIdx++ + + return ent, nil + } + + d.traversalPhase = preStreamEntsPhase + + case preStreamEntsPhase: + if d.params.collection != nil { + if lp, ok := d.params.collection.(data.LocationPather); ok { + d.locationPath = lp.LocationPath() + } + + d.streamItemsChan = d.params.collection.Items(ctx, d.progress.errs) + d.seenEnts = map[string]struct{}{} + d.traversalPhase = streamEntsPhase + } else { + d.traversalPhase = preBaseDirEntsPhase + } + + case streamEntsPhase: + ent, err := d.nextStreamEnt(ctx) + if ent != nil { + return ent, nil + } else if err != nil { + // This assumes that once we hit an error we won't generate any more + // valid entries. + d.traversalPhase = preBaseDirEntsPhase + return nil, clues.Stack(err) + } + + // Done iterating through stream entries, advance the state machine state. + d.traversalPhase = preBaseDirEntsPhase + + case preBaseDirEntsPhase: + if d.params.baseDir != nil { + var err error + + d.baseDirIter, err = d.params.baseDir.Iterate(ctx) + if err != nil { + // We have no iterator from which to pull entries, switch to the next + // state machine state. + d.traversalPhase = postBaseDirEntsPhase + return nil, clues.Wrap(err, "getting base directory iterator") + } + + if d.globalExcludeSet != nil { + longest, excludeSet, _ := d.globalExcludeSet.LongestPrefix( + d.params.currentPath.String()) + d.excludeSet = excludeSet + + logger.Ctx(ctx).Debugw("found exclude set", "set_prefix", longest) + } + + d.traversalPhase = baseDirEntsPhase + } else { + // We have no iterator from which to pull entries, switch to the next + // state machine state. + d.traversalPhase = postBaseDirEntsPhase + } + + case baseDirEntsPhase: + ent, err := d.nextBaseEnt(ctx) + if ent != nil { + return ent, nil + } else if err != nil { + // This assumes that once we hit an error we won't generate any more + // valid entries. + d.traversalPhase = postBaseDirEntsPhase + return nil, clues.Stack(err) + } + + // Done iterating through base entries, advance the state machine state. + d.traversalPhase = postBaseDirEntsPhase + + case postBaseDirEntsPhase: + // Making a separate phase so adding additional phases after this one is + // less error prone if we ever need to do that. + if d.baseDirIter != nil { + d.baseDirIter.Close() + d.baseDirIter = nil + } + + d.seenEnts = nil + d.excludeSet = nil + + d.traversalPhase = terminationPhase + } + } + + return nil, nil +} + +// Close releases any remaining resources the iterator may have at the end of +// iteration. +func (d *corsoDirectoryIterator) Close() {} + // buildKopiaDirs recursively builds a directory hierarchy from the roots up. // Returned directories are virtualfs.StreamingDirectory. func buildKopiaDirs( @@ -586,11 +666,12 @@ func buildKopiaDirs( return virtualfs.NewStreamingDirectory( encodeAsPath(dirName), - getStreamItemFunc( - dir.snapshotParams, - childDirs, - globalExcludeSet, - progress)), nil + &corsoDirectoryIterator{ + params: dir.snapshotParams, + staticEnts: childDirs, + globalExcludeSet: globalExcludeSet, + progress: progress, + }), nil } type snapshotParams struct { @@ -958,23 +1039,26 @@ func traverseBaseDir( var hasItems bool if changed { - err = dir.IterateEntries(ctx, func(innerCtx context.Context, entry fs.Entry) error { - dEntry, ok := entry.(fs.Directory) - if !ok { - hasItems = true - return nil - } + err = fs.IterateEntries( + ctx, + dir, + func(innerCtx context.Context, entry fs.Entry) error { + dEntry, ok := entry.(fs.Directory) + if !ok { + hasItems = true + return nil + } - return traverseBaseDir( - innerCtx, - depth+1, - updatedPaths, - oldDirPath, - currentPath, - dEntry, - roots, - stats) - }) + return traverseBaseDir( + innerCtx, + depth+1, + updatedPaths, + oldDirPath, + currentPath, + dEntry, + roots, + stats) + }) if err != nil { return clues.Wrap(err, "traversing base directory").WithClues(ctx) } diff --git a/src/internal/kopia/upload_test.go b/src/internal/kopia/upload_test.go index 168d32617..d1cf2de02 100644 --- a/src/internal/kopia/upload_test.go +++ b/src/internal/kopia/upload_test.go @@ -2553,12 +2553,13 @@ type mockStaticDirectory struct { iterateCount int } -func (msd *mockStaticDirectory) IterateEntries( +func (msd *mockStaticDirectory) Iterate( ctx context.Context, - callback func(context.Context, fs.Entry) error, -) error { +) (fs.DirectoryIterator, error) { msd.iterateCount++ - return msd.Directory.IterateEntries(ctx, callback) + iter, err := msd.Directory.Iterate(ctx) + + return iter, clues.Stack(err).OrNil() } func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreePruning() {