PoC for updated kopia iterator

Some small refinements like using the iterator during merging could be
done. This fixes streaming directory traversal.
This commit is contained in:
Ashlie Martinez 2023-10-04 08:05:37 -07:00
parent deaa48f472
commit 5209e8be93
4 changed files with 360 additions and 276 deletions

View File

@ -2,7 +2,8 @@ module github.com/alcionai/corso/src
go 1.21
replace github.com/kopia/kopia => github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3
//replace github.com/kopia/kopia => github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3
replace github.com/kopia/kopia => ./submodules/kopia
require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.1
@ -61,8 +62,8 @@ require (
github.com/subosito/gotenv v1.4.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.48.0 // indirect
go.opentelemetry.io/otel/metric v1.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect
)
require (
@ -84,7 +85,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/klauspost/pgzip v1.2.6 // indirect
github.com/klauspost/reedsolomon v1.11.8 // indirect
@ -117,8 +118,8 @@ require (
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
github.com/zeebo/blake3 v0.2.3 // indirect
go.opentelemetry.io/otel v1.18.0 // indirect
go.opentelemetry.io/otel/trace v1.18.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/mod v0.12.0 // indirect
@ -126,7 +127,7 @@ require (
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/grpc v1.58.2 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect

View File

@ -57,8 +57,6 @@ github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpH
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
github.com/alcionai/clues v0.0.0-20230920212840-728ac1a1d8b8 h1:8KZyOE9IOxJ9Dg/n4pJKukcOxDAO3fxUcEP2MM0tNCg=
github.com/alcionai/clues v0.0.0-20230920212840-728ac1a1d8b8/go.mod h1:iyJK9p061Zb1CqE+DFsofdjAc7/QTHGxdbJsf/mtVHo=
github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3 h1:6YjRGjEZr/Bmux1XkS13Re1m1LI7VAcbFsA3PiqO2BI=
github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3/go.mod h1:u5wAx1XN07PJsO1BLBkGicwSrbmAC1biONnumSCA210=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@ -246,8 +244,8 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
@ -449,12 +447,12 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opentelemetry.io/otel v1.18.0 h1:TgVozPGZ01nHyDZxK5WGPFB9QexeTMXEH7+tIClWfzs=
go.opentelemetry.io/otel v1.18.0/go.mod h1:9lWqYO0Db579XzVuCKFNPDl4s73Voa+zEck3wHaAYQI=
go.opentelemetry.io/otel/metric v1.18.0 h1:JwVzw94UYmbx3ej++CwLUQZxEODDj/pOuTCvzhtRrSQ=
go.opentelemetry.io/otel/metric v1.18.0/go.mod h1:nNSpsVDjWGfb7chbRLUNW+PBNdcSTHD4Uu5pfFMOI0k=
go.opentelemetry.io/otel/trace v1.18.0 h1:NY+czwbHbmndxojTEKiSMHkG2ClNH2PwmcHrdo0JY10=
go.opentelemetry.io/otel/trace v1.18.0/go.mod h1:T2+SGJGuYZY3bjj5rgh/hN7KIrlpWC5nS8Mjvzckz+0=
go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs=
go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY=
go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE=
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
@ -756,8 +754,8 @@ google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 h1:wukfNtZmZUurLN/atp2hiIeTKn7QJWIQdHzqmsOnAOk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 h1:N3bU/SQDCDyD6R528GJ/PwW9KjYcJA3dgyH+MovAkIM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13/go.mod h1:KSqppvjFjtoCI+KGd4PELB0qLNxdJHRGqRI09mB6pQA=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@ -774,8 +772,8 @@ google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/grpc v1.58.2 h1:SXUpjxeVF3FKrTYQI4f4KvbGD5u2xccdYdurwowix5I=
google.golang.org/grpc v1.58.2/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View File

@ -20,7 +20,6 @@ import (
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/graph/metadata"
"github.com/alcionai/corso/src/pkg/backup/details"
@ -235,38 +234,77 @@ func (cp *corsoProgress) get(k string) *itemDetails {
return cp.pending[k]
}
func collectionEntries(
// These define a small state machine as to which source to return an entry from
// next. Since these are in-memory only values we can use iota. All iterators
// start by streaming static entries.
//
// Since some phases require initialization of the underlying data source we
// insert additional phases to allow that. Once initialization is completed the
// phase should be updated to the next phase.
//
// A similar tactic can be used to handle tearing down resources for underlying
// data sources if needed.
const (
staticEntsPhase = iota
preStreamEntsPhase
streamEntsPhase
preBaseDirEntsPhase
baseDirEntsPhase
postBaseDirEntsPhase
terminationPhase
)
type corsoDirectoryIterator struct {
params snapshotParams
staticEnts []fs.Entry
globalExcludeSet prefixmatcher.StringSetReader
progress *corsoProgress
// seenEnts contains the encoded names of entries that we've already streamed
// so we can skip returning them again when looking at base entries.
seenEnts map[string]struct{}
// locationPath contains the human-readable location of the underlying
// collection.
locationPath *path.Builder
// excludeSet is the individual exclude set to use for the longest prefix for
// this iterator.
excludeSet map[string]struct{}
// traversalPhase is the current state in the state machine.
traversalPhase int
// streamItemsChan contains the channel for the backing collection if there is
// one. Once the backing collection has been traversed this is set to nil.
streamItemsChan <-chan data.Item
// staticEntsIdx contains the index in staticEnts of the next item to be
// returned. Once all static entries have been traversed this is set to
// len(staticEnts).
staticEntsIdx int
// baseDirIter contains the handle to the iterator for the base directory
// found during hierarchy merging. Once all base directory entries have been
// traversed this is set to nil.
baseDirIter fs.DirectoryIterator
}
func (d *corsoDirectoryIterator) nextStreamEnt(
ctx context.Context,
ctr func(context.Context, fs.Entry) error,
streamedEnts data.BackupCollection,
progress *corsoProgress,
) (map[string]struct{}, error) {
if streamedEnts == nil {
return nil, nil
}
var (
locationPath *path.Builder
// Track which items have already been seen so we can skip them if we see
// them again in the data from the base snapshot.
seen = map[string]struct{}{}
items = streamedEnts.Items(ctx, progress.errs)
)
if lp, ok := streamedEnts.(data.LocationPather); ok {
locationPath = lp.LocationPath()
}
) (fs.Entry, error) {
// Loop over results until we get something we can return. Required because we
// could see deleted items.
for {
select {
case <-ctx.Done():
return seen, clues.Stack(ctx.Err()).WithClues(ctx)
return nil, clues.Stack(ctx.Err()).WithClues(ctx)
case e, ok := <-items:
case e, ok := <-d.streamItemsChan:
// Channel was closed, no more entries to return.
if !ok {
return seen, nil
return nil, nil
}
// Got an entry to process, see if it's a deletion marker or something to
// return to kopia.
encodedName := encodeAsPath(e.ID())
// Even if this item has been deleted and should not appear at all in
@ -282,13 +320,13 @@ func collectionEntries(
// items we need to track. Namely, we can track the created time of the
// item and if it's after the base snapshot was finalized we can skip it
// because it's not possible for the base snapshot to contain that item.
seen[encodedName] = struct{}{}
d.seenEnts[encodedName] = struct{}{}
// For now assuming that item IDs don't need escaping.
itemPath, err := streamedEnts.FullPath().AppendItem(e.ID())
itemPath, err := d.params.currentPath.AppendItem(e.ID())
if err != nil {
err = clues.Wrap(err, "getting full item path")
progress.errs.AddRecoverable(ctx, err)
d.progress.errs.AddRecoverable(ctx, err)
logger.CtxErr(ctx, err).Error("getting full item path")
@ -316,7 +354,7 @@ func collectionEntries(
// Relative path given to us in the callback is missing the root
// element. Add to pending set before calling the callback to avoid race
// conditions when the item is completed.
d := &itemDetails{
deetsEnt := &itemDetails{
infoer: ei,
repoPath: itemPath,
// Also use the current path as the previous path for this item. This
@ -327,229 +365,271 @@ func collectionEntries(
// This all works out because cached item checks in kopia are direct
// path + metadata comparisons.
prevPath: itemPath,
locationPath: locationPath,
locationPath: d.locationPath,
modTime: &modTime,
}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
d.progress.put(
encodeAsPath(itemPath.PopFront().Elements()...),
deetsEnt)
}
entry := virtualfs.StreamingFileWithModTimeFromReader(
return virtualfs.StreamingFileWithModTimeFromReader(
encodedName,
modTime,
e.ToReader())
err = ctr(ctx, entry)
if err != nil {
// Kopia's uploader swallows errors in most cases, so if we see
// something here it's probably a big issue and we should return.
return seen, clues.Wrap(err, "executing callback").
WithClues(ctx).
With("item_path", itemPath)
}
e.ToReader()), nil
}
}
}
func streamBaseEntries(
func (d *corsoDirectoryIterator) nextBaseEnt(
ctx context.Context,
ctr func(context.Context, fs.Entry) error,
params snapshotParams,
locationPath *path.Builder,
encodedSeen map[string]struct{},
globalExcludeSet prefixmatcher.StringSetReader,
progress *corsoProgress,
) error {
if params.baseDir == nil {
return nil
}
) (fs.Entry, error) {
var (
longest string
excludeSet map[string]struct{}
entry fs.Entry
err error
)
if globalExcludeSet != nil {
longest, excludeSet, _ = globalExcludeSet.LongestPrefix(
params.currentPath.String())
}
ctx = clues.Add(
ctx,
"current_directory_path", params.currentPath,
"longest_prefix", path.LoggableDir(longest))
err := params.baseDir.IterateEntries(
ctx,
func(innerCtx context.Context, entry fs.Entry) error {
if err := innerCtx.Err(); err != nil {
return clues.Stack(err).WithClues(ctx)
}
entName, err := decodeElement(entry.Name())
if err != nil {
return clues.Wrap(err, "decoding entry name").
WithClues(ctx).
With("entry_name", entry.Name())
}
if d, ok := entry.(fs.Directory); ok {
// Don't walk subdirectories in this function.
if !params.streamBaseEnts {
return nil
}
// Do walk subdirectories. The previous and current path of the
// directory can be generated by appending the directory name onto the
// previous and current path of this directory. Since the directory has
// no BackupCollection associated with it (part of the criteria for
// allowing walking directories in this function) there shouldn't be any
// LocationPath information associated with the directory.
newP, err := params.currentPath.Append(false, entName)
if err != nil {
return clues.Wrap(err, "getting current directory path").
WithClues(ctx)
}
oldP, err := params.prevPath.Append(false, entName)
if err != nil {
return clues.Wrap(err, "getting previous directory path").
WithClues(ctx)
}
e := virtualfs.NewStreamingDirectory(
entry.Name(),
getStreamItemFunc(
snapshotParams{
currentPath: newP,
prevPath: oldP,
collection: nil,
baseDir: d,
streamBaseEnts: params.streamBaseEnts,
},
nil,
globalExcludeSet,
progress))
return clues.Wrap(ctr(ctx, e), "executing callback on subdirectory").
WithClues(ctx).
With("directory_path", newP).
OrNil()
}
// This entry was either updated or deleted. In either case, the external
// service notified us about it and it's already been handled so we should
// skip it here.
if _, ok := encodedSeen[entry.Name()]; ok {
return nil
}
// This entry was marked as deleted by a service that can't tell us the
// previous path of deleted items, only the item ID.
if _, ok := excludeSet[entName]; ok {
return nil
}
// For now assuming that item IDs don't need escaping.
itemPath, err := params.currentPath.AppendItem(entName)
if err != nil {
return clues.Wrap(err, "getting full item path for base entry").
WithClues(ctx)
}
// We need the previous path so we can find this item in the base snapshot's
// backup details. If the item moved and we had only the new path, we'd be
// unable to find it in the old backup details because we wouldn't know what
// to look for.
prevItemPath, err := params.prevPath.AppendItem(entName)
if err != nil {
return clues.Wrap(err, "getting previous full item path for base entry").
WithClues(ctx)
}
// Meta files aren't in backup details since it's the set of items the
// user sees.
//
// TODO(ashmrtn): We may eventually want to make this a function that is
// passed in so that we can more easily switch it between different
// external service provider implementations.
if !metadata.IsMetadataFile(itemPath) {
// All items have item info in the base backup. However, we need to make
// sure we have enough metadata to find those entries. To do that we add
// the item to progress and having progress aggregate everything for
// later.
d := &itemDetails{
repoPath: itemPath,
prevPath: prevItemPath,
locationPath: locationPath,
modTime: ptr.To(entry.ModTime()),
}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
}
if err := ctr(ctx, entry); err != nil {
return clues.Wrap(err, "executing callback on item").
WithClues(ctx).
With("item_path", itemPath)
}
return nil
})
if err != nil {
return clues.Wrap(err, "traversing items in base snapshot directory").
WithClues(ctx)
}
return nil
}
// getStreamItemFunc returns a function that can be used by kopia's
// virtualfs.StreamingDirectory to iterate through directory entries and call
// kopia callbacks on directory entries. It binds the directory to the given
// DataCollection.
func getStreamItemFunc(
params snapshotParams,
staticEnts []fs.Entry,
globalExcludeSet prefixmatcher.StringSetReader,
progress *corsoProgress,
) func(context.Context, func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, ctr func(context.Context, fs.Entry) error) error {
ctx, end := diagnostics.Span(ctx, "kopia:getStreamItemFunc")
defer end()
// Return static entries in this directory first.
for _, d := range staticEnts {
if err := ctr(ctx, d); err != nil {
return clues.Wrap(err, "executing callback on static directory").
WithClues(ctx)
}
}
var locationPath *path.Builder
if lp, ok := params.collection.(data.LocationPather); ok {
locationPath = lp.LocationPath()
}
seen, err := collectionEntries(ctx, ctr, params.collection, progress)
for entry, err = d.baseDirIter.Next(ctx); entry != nil && err == nil; entry, err = d.baseDirIter.Next(ctx) {
entName, err := decodeElement(entry.Name())
if err != nil {
return clues.Wrap(err, "streaming collection entries")
err = clues.Wrap(err, "decoding entry name").
WithClues(ctx).
With("entry_name", clues.Hide(entry.Name()))
d.progress.errs.AddRecoverable(ctx, err)
continue
}
if err := streamBaseEntries(
ctx,
ctr,
params,
locationPath,
seen,
globalExcludeSet,
progress); err != nil {
return clues.Wrap(err, "streaming base snapshot entries")
ctx = clues.Add(ctx, "entry_name", clues.Hide(entName))
if dir, ok := entry.(fs.Directory); ok {
// Don't walk subdirectories in this function.
if !d.params.streamBaseEnts {
continue
}
// Do walk subdirectories. The previous and current path of the
// directory can be generated by appending the directory name onto the
// previous and current path of this directory. Since the directory has
// no BackupCollection associated with it (part of the criteria for
// allowing walking directories in this function) there shouldn't be any
// LocationPath information associated with the directory.
newP, err := d.params.currentPath.Append(false, entName)
if err != nil {
err = clues.Wrap(err, "getting current directory path").
WithClues(ctx)
d.progress.errs.AddRecoverable(ctx, err)
continue
}
ctx = clues.Add(ctx, "child_directory_path", newP)
oldP, err := d.params.prevPath.Append(false, entName)
if err != nil {
err = clues.Wrap(err, "getting previous directory path").
WithClues(ctx)
d.progress.errs.AddRecoverable(ctx, err)
continue
}
return virtualfs.NewStreamingDirectory(
entry.Name(),
&corsoDirectoryIterator{
params: snapshotParams{
currentPath: newP,
prevPath: oldP,
collection: nil,
baseDir: dir,
streamBaseEnts: d.params.streamBaseEnts,
},
globalExcludeSet: d.globalExcludeSet,
progress: d.progress,
}), nil
}
return nil
// This entry was either updated or deleted. In either case, the external
// service notified us about it and it's already been handled so we should
// skip it here.
if _, ok := d.seenEnts[entry.Name()]; ok {
continue
}
// This entry was marked as deleted by a service that can't tell us the
// previous path of deleted items, only the item ID.
if _, ok := d.excludeSet[entName]; ok {
continue
}
// This is a path used in corso not kopia so it doesn't need to encode the
// item name.
itemPath, err := d.params.currentPath.AppendItem(entName)
if err != nil {
err = clues.Wrap(err, "getting full item path for base entry").
WithClues(ctx)
d.progress.errs.AddRecoverable(ctx, err)
continue
}
ctx = clues.Add(ctx, "item_path", itemPath)
// We need the previous path so we can find this item in the base snapshot's
// backup details. If the item moved and we had only the new path, we'd be
// unable to find it in the old backup details because we wouldn't know what
// to look for.
prevItemPath, err := d.params.prevPath.AppendItem(entName)
if err != nil {
err = clues.Wrap(err, "getting previous full item path for base entry").
WithClues(ctx)
d.progress.errs.AddRecoverable(ctx, err)
continue
}
// Meta files aren't in backup details since it's the set of items the
// user sees.
//
// TODO(ashmrtn): We may eventually want to make this a function that is
// passed in so that we can more easily switch it between different
// external service provider implementations.
if !metadata.IsMetadataFile(itemPath) {
// All items have item info in the base backup. However, we need to make
// sure we have enough metadata to find those entries. To do that we add
// the item to progress and having progress aggregate everything for
// later.
detailsEnt := &itemDetails{
repoPath: itemPath,
prevPath: prevItemPath,
locationPath: d.locationPath,
modTime: ptr.To(entry.ModTime()),
}
d.progress.put(
encodeAsPath(itemPath.PopFront().Elements()...),
detailsEnt)
}
return entry, nil
}
return nil, clues.Stack(err).OrNil()
}
func (d *corsoDirectoryIterator) Next(ctx context.Context) (fs.Entry, error) {
// TODO(ashmrtn): Figure out the expected way to do error handling.
// Execute the state machine until either:
// * we get an entry to return
// * we get an error to return
// * we exhaust all underlying data sources (end of iteration)
//
// Multiple executions of the state machine may be required for things like
// setting up underlying data sources or finding that there's no more entries
// in the current data source and needing to switch to the next one.
//
// Returned entries and errors are handled with inline return statements.
for d.traversalPhase != terminationPhase {
switch d.traversalPhase {
case staticEntsPhase:
if d.staticEntsIdx < len(d.staticEnts) {
ent := d.staticEnts[d.staticEntsIdx]
d.staticEntsIdx++
return ent, nil
}
d.traversalPhase = preStreamEntsPhase
case preStreamEntsPhase:
if d.params.collection != nil {
if lp, ok := d.params.collection.(data.LocationPather); ok {
d.locationPath = lp.LocationPath()
}
d.streamItemsChan = d.params.collection.Items(ctx, d.progress.errs)
d.seenEnts = map[string]struct{}{}
d.traversalPhase = streamEntsPhase
} else {
d.traversalPhase = preBaseDirEntsPhase
}
case streamEntsPhase:
ent, err := d.nextStreamEnt(ctx)
if ent != nil {
return ent, nil
} else if err != nil {
// This assumes that once we hit an error we won't generate any more
// valid entries.
d.traversalPhase = preBaseDirEntsPhase
return nil, clues.Stack(err)
}
// Done iterating through stream entries, advance the state machine state.
d.traversalPhase = preBaseDirEntsPhase
case preBaseDirEntsPhase:
if d.params.baseDir != nil {
var err error
d.baseDirIter, err = d.params.baseDir.Iterate(ctx)
if err != nil {
// We have no iterator from which to pull entries, switch to the next
// state machine state.
d.traversalPhase = postBaseDirEntsPhase
return nil, clues.Wrap(err, "getting base directory iterator")
}
if d.globalExcludeSet != nil {
longest, excludeSet, _ := d.globalExcludeSet.LongestPrefix(
d.params.currentPath.String())
d.excludeSet = excludeSet
logger.Ctx(ctx).Debugw("found exclude set", "set_prefix", longest)
}
d.traversalPhase = baseDirEntsPhase
} else {
// We have no iterator from which to pull entries, switch to the next
// state machine state.
d.traversalPhase = postBaseDirEntsPhase
}
case baseDirEntsPhase:
ent, err := d.nextBaseEnt(ctx)
if ent != nil {
return ent, nil
} else if err != nil {
// This assumes that once we hit an error we won't generate any more
// valid entries.
d.traversalPhase = postBaseDirEntsPhase
return nil, clues.Stack(err)
}
// Done iterating through base entries, advance the state machine state.
d.traversalPhase = postBaseDirEntsPhase
case postBaseDirEntsPhase:
// Making a separate phase so adding additional phases after this one is
// less error prone if we ever need to do that.
if d.baseDirIter != nil {
d.baseDirIter.Close()
d.baseDirIter = nil
}
d.seenEnts = nil
d.excludeSet = nil
d.traversalPhase = terminationPhase
}
}
return nil, nil
}
// Close releases any remaining resources the iterator may have at the end of
// iteration.
func (d *corsoDirectoryIterator) Close() {}
// buildKopiaDirs recursively builds a directory hierarchy from the roots up.
// Returned directories are virtualfs.StreamingDirectory.
func buildKopiaDirs(
@ -586,11 +666,12 @@ func buildKopiaDirs(
return virtualfs.NewStreamingDirectory(
encodeAsPath(dirName),
getStreamItemFunc(
dir.snapshotParams,
childDirs,
globalExcludeSet,
progress)), nil
&corsoDirectoryIterator{
params: dir.snapshotParams,
staticEnts: childDirs,
globalExcludeSet: globalExcludeSet,
progress: progress,
}), nil
}
type snapshotParams struct {
@ -958,23 +1039,26 @@ func traverseBaseDir(
var hasItems bool
if changed {
err = dir.IterateEntries(ctx, func(innerCtx context.Context, entry fs.Entry) error {
dEntry, ok := entry.(fs.Directory)
if !ok {
hasItems = true
return nil
}
err = fs.IterateEntries(
ctx,
dir,
func(innerCtx context.Context, entry fs.Entry) error {
dEntry, ok := entry.(fs.Directory)
if !ok {
hasItems = true
return nil
}
return traverseBaseDir(
innerCtx,
depth+1,
updatedPaths,
oldDirPath,
currentPath,
dEntry,
roots,
stats)
})
return traverseBaseDir(
innerCtx,
depth+1,
updatedPaths,
oldDirPath,
currentPath,
dEntry,
roots,
stats)
})
if err != nil {
return clues.Wrap(err, "traversing base directory").WithClues(ctx)
}

View File

@ -2553,12 +2553,13 @@ type mockStaticDirectory struct {
iterateCount int
}
func (msd *mockStaticDirectory) IterateEntries(
func (msd *mockStaticDirectory) Iterate(
ctx context.Context,
callback func(context.Context, fs.Entry) error,
) error {
) (fs.DirectoryIterator, error) {
msd.iterateCount++
return msd.Directory.IterateEntries(ctx, callback)
iter, err := msd.Directory.Iterate(ctx)
return iter, clues.Stack(err).OrNil()
}
func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreePruning() {