Rewrite Corso's kopia snapshot tie-ins to use directory iterators (#4466)

kopia/kopia#3365 switched kopia to use iterators
for traversing directory entries instead of
callbacks

This PR adds a new iterator struct that uses a
state machine to traverse the different data
sources we use to back a single directory passed
to kopia. While the state machine execution
logic is new, much of the logic for returning
individual entries is adapted from existing code

Recommend viewing by commit and possibly
ignoring whitespace changes

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [x] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)

* #4457

#### Test Plan

- [ ] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
ashmrtn 2023-11-28 15:44:41 -08:00 committed by GitHub
parent 6aff258c8b
commit e6477a67a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 395 additions and 256 deletions

View File

@ -2,7 +2,7 @@ module github.com/alcionai/corso/src
go 1.21
replace github.com/kopia/kopia => github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3
replace github.com/kopia/kopia => github.com/alcionai/kopia v0.12.2-0.20231128180219-e763f7c81f23
require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0

View File

@ -17,8 +17,8 @@ github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpH
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
github.com/alcionai/clues v0.0.0-20231115004051-523cbddac8e8 h1:kdpkhcKWWxnZFteGjglHvFTTPCPsvjhKDDY9J1Od6Sg=
github.com/alcionai/clues v0.0.0-20231115004051-523cbddac8e8/go.mod h1:hGnRqQtV7YoojQSNMtqFK0TvfcwAljGZhUEQwx1lw34=
github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3 h1:6YjRGjEZr/Bmux1XkS13Re1m1LI7VAcbFsA3PiqO2BI=
github.com/alcionai/kopia v0.12.2-0.20230822191057-17d4deff94a3/go.mod h1:u5wAx1XN07PJsO1BLBkGicwSrbmAC1biONnumSCA210=
github.com/alcionai/kopia v0.12.2-0.20231128180219-e763f7c81f23 h1:ojlHvw65O21oFTC6kQldVDO69VQ6JEzMdD6aq9aOrY4=
github.com/alcionai/kopia v0.12.2-0.20231128180219-e763f7c81f23/go.mod h1:f4PligAuyEicX+lfTlZltc69nM0eMoXX2nE5sCBdo6Y=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@ -108,16 +108,16 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
github.com/h2non/gock v1.2.0 h1:K6ol8rfrRkUOefooBC8elXoaNGYkpp7y2qcxGG6BzUE=
github.com/h2non/gock v1.2.0/go.mod h1:tNhoxHYW2W42cYkYb1WqzdbYIieALC99kpYr7rH/BQk=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
github.com/hanwen/go-fuse/v2 v2.3.0 h1:t5ivNIH2PK+zw4OBul/iJjsoG9K6kXo4nMDoBpciC8A=
github.com/hanwen/go-fuse/v2 v2.3.0/go.mod h1:xKwi1cF7nXAOBCXujD5ie0ZKsxc8GGSA1rlMJc+8IJs=
github.com/hanwen/go-fuse/v2 v2.4.0 h1:12OhD7CkXXQdvxG2osIdBQLdXh+nmLXY9unkUIe/xaU=
github.com/hanwen/go-fuse/v2 v2.4.0/go.mod h1:xKwi1cF7nXAOBCXujD5ie0ZKsxc8GGSA1rlMJc+8IJs=
github.com/hashicorp/cronexpr v1.1.2 h1:wG/ZYIKT+RT3QkOdgYc+xsKWVRgnxJ1OJtjjy84fJ9A=
github.com/hashicorp/cronexpr v1.1.2/go.mod h1:P4wA0KBl9C5q2hABiMO7cp6jcIg96CDh1Efb3g1PWA4=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
@ -158,8 +158,8 @@ github.com/klauspost/pgzip v1.2.6/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQ
github.com/klauspost/reedsolomon v1.11.8 h1:s8RpUW5TK4hjr+djiOpbZJB4ksx+TdYbRH7vHQpwPOY=
github.com/klauspost/reedsolomon v1.11.8/go.mod h1:4bXRN+cVzMdml6ti7qLouuYi32KHJ5MGv0Qd8a47h6A=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kopia/htmluibuild v0.0.0-20230716183504-d78b44b3a9bd h1:Vskpc00T65HkkDSWbkiXOG5yYsgWg5LN48daUfGZ+u0=
github.com/kopia/htmluibuild v0.0.0-20230716183504-d78b44b3a9bd/go.mod h1:eWer4rx9P8lJo2eKc+Q7AZ1dE1x1hJNdkbDFPzMu1Hw=
github.com/kopia/htmluibuild v0.0.1-0.20231019063300-75c2a788c7d0 h1:TvupyyfbUZzsO4DQJpQhKZnUa61xERcJ+ejCbHWG2NY=
github.com/kopia/htmluibuild v0.0.1-0.20231019063300-75c2a788c7d0/go.mod h1:cSImbrlwvv2phvj5RfScL2v08ghX6xli0PcK6f+t8S0=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=

View File

@ -244,38 +244,233 @@ func (cp *corsoProgress) get(k string) *itemDetails {
return cp.pending[k]
}
func collectionEntries(
// These define a small state machine describing which source to return an entry
// from next. Phases are traversed in the order defined unless the underlying
// data source isn't present. If an underlying data source is missing, the
// non-pre/post phase associated with that data source is skipped.
//
// Since some phases require initialization of the underlying data source we
// insert additional phases to allow that. Once initialization is completed the
// phase should be updated to the next phase.
//
// A similar tactic can be used to handle tearing down resources for underlying
// data sources if needed.
const (
initPhase = 0
staticEntsPhase = 1
preStreamEntsPhase = 2
streamEntsPhase = 3
postStreamEntsPhase = 4
preBaseDirEntsPhase = 5
baseDirEntsPhase = 6
postBaseDirEntsPhase = 7
terminationPhase = 8
)
type corsoDirectoryIterator struct {
ctx context.Context
params snapshotParams
// staticEnts is the set of fs.StreamingDirectory child directories that we've
// generated based on the collections passed in. These entries may or may not
// contain an underlying data.BackupCollection (depending on what was passed
// in) and may or may not contain an fs.Directory (depending on hierarchy
// merging).
staticEnts []fs.Entry
globalExcludeSet prefixmatcher.StringSetReader
progress *corsoProgress
// endSpan is the callback to stop diagnostic span collection for iteration.
endSpan func()
// seenEnts contains the encoded names of entries that we've already streamed
// so we can skip returning them again when looking at base entries. Since
// this struct already deals with a single directory these can directly be
// file names instead of paths.
seenEnts map[string]struct{}
// locationPath contains the human-readable location of the underlying
// collection.
locationPath *path.Builder
// excludeSet is the individual exclude set to use for the longest path prefix
// for this iterator. We use path prefixes because some services (e.x.
// OneDrive) can only provide excluded items for the entire service backup not
// individual directories (e.x. OneDrive deleted items are reported as
// children of the root folder).
excludeSet map[string]struct{}
// currentPhase is the current state in the state machine.
currentPhase int
// streamItemsChan contains the channel for the backing collection if there is
// one. Once the backing collection has been traversed this is set to nil.
streamItemsChan <-chan data.Item
// staticEntsIdx contains the index in staticEnts of the next item to be
// returned. Once all static entries have been traversed this is set to
// len(staticEnts).
staticEntsIdx int
// baseDirIter contains the handle to the iterator for the base directory
// found during hierarchy merging. Once all base directory entries have been
// traversed this is set to nil.
baseDirIter fs.DirectoryIterator
}
// Close releases any remaining resources the iterator may have at the end of
// iteration.
func (d *corsoDirectoryIterator) Close() {
if d.endSpan != nil {
d.endSpan()
}
}
func (d *corsoDirectoryIterator) Next(ctx context.Context) (fs.Entry, error) {
// Execute the state machine until either:
// * we get an entry to return
// * we exhaust all underlying data sources (end of iteration)
//
// Multiple executions of the state machine may be required for things like
// setting up underlying data sources or finding that there's no more entries
// in the current data source and needing to switch to the next one.
//
// Returned entries are handled with inline return statements.
//
// When an error is encountered it's added to the fault.Bus. We can't return
// these errors since doing so will result in kopia stopping iteration of the
// directory. Since these errors are recorded we won't lose track of them at
// the end of the backup.
for d.currentPhase != terminationPhase {
switch d.currentPhase {
case initPhase:
d.ctx, d.endSpan = diagnostics.Span(d.ctx, "kopia:DirectoryIterator")
d.currentPhase = staticEntsPhase
case staticEntsPhase:
if d.staticEntsIdx < len(d.staticEnts) {
ent := d.staticEnts[d.staticEntsIdx]
d.staticEntsIdx++
return ent, nil
}
d.currentPhase = preStreamEntsPhase
case preStreamEntsPhase:
if d.params.collection == nil {
d.currentPhase = preBaseDirEntsPhase
break
}
if lp, ok := d.params.collection.(data.LocationPather); ok {
d.locationPath = lp.LocationPath()
}
d.streamItemsChan = d.params.collection.Items(d.ctx, d.progress.errs)
d.seenEnts = map[string]struct{}{}
d.currentPhase = streamEntsPhase
case streamEntsPhase:
ent, err := d.nextStreamEnt(d.ctx)
if ent != nil {
return ent, nil
}
// This assumes that once we hit an error we won't generate any more valid
// entries. Record the error in progress but don't return it to kopia
// since doing so will terminate iteration.
if err != nil {
d.progress.errs.AddRecoverable(d.ctx, clues.Stack(err))
}
// Done iterating through stream entries, advance the state machine state.
d.currentPhase = postStreamEntsPhase
case postStreamEntsPhase:
d.streamItemsChan = nil
d.currentPhase = preBaseDirEntsPhase
case preBaseDirEntsPhase:
// We have no iterator from which to pull entries, switch to the next
// state machine state.
if d.params.baseDir == nil {
d.currentPhase = postBaseDirEntsPhase
break
}
var err error
d.baseDirIter, err = d.params.baseDir.Iterate(d.ctx)
if err != nil {
// We have no iterator from which to pull entries, switch to the next
// state machine state.
d.currentPhase = postBaseDirEntsPhase
d.progress.errs.AddRecoverable(
d.ctx,
clues.Wrap(err, "getting base directory iterator"))
break
}
if d.globalExcludeSet != nil {
longest, excludeSet, _ := d.globalExcludeSet.LongestPrefix(
d.params.currentPath.String())
d.excludeSet = excludeSet
logger.Ctx(d.ctx).Debugw("found exclude set", "set_prefix", longest)
}
d.currentPhase = baseDirEntsPhase
case baseDirEntsPhase:
ent, err := d.nextBaseEnt(d.ctx)
if ent != nil {
return ent, nil
}
// This assumes that once we hit an error we won't generate any more valid
// entries. Record the error in progress but don't return it to kopia
// since doing so will terminate iteration.
if err != nil {
d.progress.errs.AddRecoverable(d.ctx, clues.Stack(err))
}
// Done iterating through base entries, advance the state machine state.
d.currentPhase = postBaseDirEntsPhase
case postBaseDirEntsPhase:
// Making a separate phase so adding additional phases after this one is
// less error prone if we ever need to do that.
if d.baseDirIter != nil {
d.baseDirIter.Close()
d.baseDirIter = nil
}
d.seenEnts = nil
d.excludeSet = nil
d.currentPhase = terminationPhase
}
}
return nil, nil
}
func (d *corsoDirectoryIterator) nextStreamEnt(
ctx context.Context,
ctr func(context.Context, fs.Entry) error,
streamedEnts data.BackupCollection,
progress *corsoProgress,
) (map[string]struct{}, error) {
if streamedEnts == nil {
return nil, nil
}
var (
locationPath *path.Builder
// Track which items have already been seen so we can skip them if we see
// them again in the data from the base snapshot.
seen = map[string]struct{}{}
items = streamedEnts.Items(ctx, progress.errs)
)
if lp, ok := streamedEnts.(data.LocationPather); ok {
locationPath = lp.LocationPath()
}
) (fs.Entry, error) {
// Loop over results until we get something we can return. Required because we
// could see deleted items.
for {
select {
case <-ctx.Done():
return seen, clues.StackWC(ctx, ctx.Err())
return nil, clues.StackWC(ctx, ctx.Err())
case e, ok := <-items:
case e, ok := <-d.streamItemsChan:
// Channel was closed, no more entries to return.
if !ok {
return seen, nil
return nil, nil
}
// Got an entry to process, see if it's a deletion marker or something to
// return to kopia.
encodedName := encodeAsPath(e.ID())
// Even if this item has been deleted and should not appear at all in
@ -291,13 +486,13 @@ func collectionEntries(
// items we need to track. Namely, we can track the created time of the
// item and if it's after the base snapshot was finalized we can skip it
// because it's not possible for the base snapshot to contain that item.
seen[encodedName] = struct{}{}
d.seenEnts[encodedName] = struct{}{}
// For now assuming that item IDs don't need escaping.
itemPath, err := streamedEnts.FullPath().AppendItem(e.ID())
itemPath, err := d.params.currentPath.AppendItem(e.ID())
if err != nil {
err = clues.Wrap(err, "getting full item path")
progress.errs.AddRecoverable(ctx, err)
d.progress.errs.AddRecoverable(ctx, err)
logger.CtxErr(ctx, err).Error("getting full item path")
@ -323,9 +518,8 @@ func collectionEntries(
ei, ok := e.(data.ItemInfo)
if ok {
// Relative path given to us in the callback is missing the root
// element. Add to pending set before calling the callback to avoid race
// conditions when the item is completed.
d := &itemDetails{
// element.
deetsEnt := &itemDetails{
infoer: ei,
repoPath: itemPath,
// Also use the current path as the previous path for this item. This
@ -336,222 +530,160 @@ func collectionEntries(
// This all works out because cached item checks in kopia are direct
// path + metadata comparisons.
prevPath: itemPath,
locationPath: locationPath,
locationPath: d.locationPath,
modTime: &modTime,
}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
d.progress.put(
encodeAsPath(itemPath.PopFront().Elements()...),
deetsEnt)
}
entry := virtualfs.StreamingFileWithModTimeFromReader(
return virtualfs.StreamingFileWithModTimeFromReader(
encodedName,
modTime,
e.ToReader())
err = ctr(ctx, entry)
if err != nil {
// Kopia's uploader swallows errors in most cases, so if we see
// something here it's probably a big issue and we should return.
return seen, clues.WrapWC(ctx, err, "executing callback").
With("item_path", itemPath)
}
e.ToReader()), nil
}
}
}
func streamBaseEntries(
func (d *corsoDirectoryIterator) nextBaseEnt(
ctx context.Context,
ctr func(context.Context, fs.Entry) error,
params snapshotParams,
locationPath *path.Builder,
encodedSeen map[string]struct{},
globalExcludeSet prefixmatcher.StringSetReader,
progress *corsoProgress,
) error {
if params.baseDir == nil {
return nil
}
) (fs.Entry, error) {
var (
longest string
excludeSet map[string]struct{}
entry fs.Entry
err error
)
if globalExcludeSet != nil {
longest, excludeSet, _ = globalExcludeSet.LongestPrefix(
params.currentPath.String())
}
ctx = clues.Add(
ctx,
"current_directory_path", params.currentPath,
"longest_prefix", path.LoggableDir(longest))
err := params.baseDir.IterateEntries(
ctx,
func(innerCtx context.Context, entry fs.Entry) error {
if err := innerCtx.Err(); err != nil {
return clues.StackWC(ctx, err)
}
entName, err := decodeElement(entry.Name())
if err != nil {
return clues.WrapWC(ctx, err, "decoding entry name").
With("entry_name", entry.Name())
}
if d, ok := entry.(fs.Directory); ok {
// Don't walk subdirectories in this function.
if !params.streamBaseEnts {
return nil
}
// Do walk subdirectories. The previous and current path of the
// directory can be generated by appending the directory name onto the
// previous and current path of this directory. Since the directory has
// no BackupCollection associated with it (part of the criteria for
// allowing walking directories in this function) there shouldn't be any
// LocationPath information associated with the directory.
newP, err := params.currentPath.Append(false, entName)
if err != nil {
return clues.WrapWC(ctx, err, "getting current directory path")
}
oldP, err := params.prevPath.Append(false, entName)
if err != nil {
return clues.WrapWC(ctx, err, "getting previous directory path")
}
e := virtualfs.NewStreamingDirectory(
entry.Name(),
getStreamItemFunc(
snapshotParams{
currentPath: newP,
prevPath: oldP,
collection: nil,
baseDir: d,
streamBaseEnts: params.streamBaseEnts,
},
nil,
globalExcludeSet,
progress))
return clues.WrapWC(ctx, ctr(ctx, e), "executing callback on subdirectory").
With("directory_path", newP).
OrNil()
}
// This entry was either updated or deleted. In either case, the external
// service notified us about it and it's already been handled so we should
// skip it here.
if _, ok := encodedSeen[entry.Name()]; ok {
return nil
}
// This entry was marked as deleted by a service that can't tell us the
// previous path of deleted items, only the item ID.
if _, ok := excludeSet[entName]; ok {
return nil
}
// For now assuming that item IDs don't need escaping.
itemPath, err := params.currentPath.AppendItem(entName)
if err != nil {
return clues.WrapWC(ctx, err, "getting full item path for base entry")
}
// We need the previous path so we can find this item in the base snapshot's
// backup details. If the item moved and we had only the new path, we'd be
// unable to find it in the old backup details because we wouldn't know what
// to look for.
prevItemPath, err := params.prevPath.AppendItem(entName)
if err != nil {
return clues.WrapWC(ctx, err, "getting previous full item path for base entry")
}
// Meta files aren't in backup details since it's the set of items the
// user sees.
//
// TODO(ashmrtn): We may eventually want to make this a function that is
// passed in so that we can more easily switch it between different
// external service provider implementations.
if !metadata.IsMetadataFile(itemPath) {
// All items have item info in the base backup. However, we need to make
// sure we have enough metadata to find those entries. To do that we add
// the item to progress and having progress aggregate everything for
// later.
d := &itemDetails{
repoPath: itemPath,
prevPath: prevItemPath,
locationPath: locationPath,
modTime: ptr.To(entry.ModTime()),
}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
}
if err := ctr(ctx, entry); err != nil {
return clues.WrapWC(ctx, err, "executing callback on item").
With("item_path", itemPath)
}
return nil
})
if err != nil {
return clues.WrapWC(ctx, err, "traversing items in base snapshot directory")
}
return nil
}
// getStreamItemFunc returns a function that can be used by kopia's
// virtualfs.StreamingDirectory to iterate through directory entries and call
// kopia callbacks on directory entries. It binds the directory to the given
// DataCollection.
func getStreamItemFunc(
params snapshotParams,
staticEnts []fs.Entry,
globalExcludeSet prefixmatcher.StringSetReader,
progress *corsoProgress,
) func(context.Context, func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, ctr func(context.Context, fs.Entry) error) error {
ctx, end := diagnostics.Span(ctx, "kopia:getStreamItemFunc")
defer end()
// Return static entries in this directory first.
for _, d := range staticEnts {
if err := ctr(ctx, d); err != nil {
return clues.WrapWC(ctx, err, "executing callback on static directory")
}
}
var locationPath *path.Builder
if lp, ok := params.collection.(data.LocationPather); ok {
locationPath = lp.LocationPath()
}
seen, err := collectionEntries(ctx, ctr, params.collection, progress)
for entry, err = d.baseDirIter.Next(ctx); entry != nil && err == nil; entry, err = d.baseDirIter.Next(ctx) {
entName, err := decodeElement(entry.Name())
if err != nil {
return clues.Wrap(err, "streaming collection entries")
err = clues.WrapWC(ctx, err, "decoding entry name").
With("entry_name", clues.Hide(entry.Name()))
d.progress.errs.AddRecoverable(ctx, err)
continue
}
if err := streamBaseEntries(
ctx,
ctr,
params,
locationPath,
seen,
globalExcludeSet,
progress); err != nil {
return clues.Wrap(err, "streaming base snapshot entries")
ctx = clues.Add(ctx, "entry_name", clues.Hide(entName))
if dir, ok := entry.(fs.Directory); ok {
// Don't walk subdirectories in this function.
if !d.params.streamBaseEnts {
continue
}
// Do walk subdirectories. The previous and current path of the
// directory can be generated by appending the directory name onto the
// previous and current path of this directory. Since the directory has
// no BackupCollection associated with it (part of the criteria for
// allowing walking directories in this function) there shouldn't be any
// LocationPath information associated with the directory.
newP, err := d.params.currentPath.Append(false, entName)
if err != nil {
err = clues.WrapWC(ctx, err, "getting current directory path")
d.progress.errs.AddRecoverable(ctx, err)
continue
}
ctx = clues.Add(ctx, "child_directory_path", newP)
oldP, err := d.params.prevPath.Append(false, entName)
if err != nil {
err = clues.WrapWC(ctx, err, "getting previous directory path")
d.progress.errs.AddRecoverable(ctx, err)
continue
}
return virtualfs.NewStreamingDirectory(
entry.Name(),
&corsoDirectoryIterator{
ctx: ctx,
params: snapshotParams{
currentPath: newP,
prevPath: oldP,
collection: nil,
baseDir: dir,
streamBaseEnts: d.params.streamBaseEnts,
},
globalExcludeSet: d.globalExcludeSet,
progress: d.progress,
}), nil
}
return nil
// This entry was either updated or deleted. In either case, the external
// service notified us about it and it's already been handled so we should
// skip it here.
if _, ok := d.seenEnts[entry.Name()]; ok {
continue
}
// This entry was marked as deleted by a service that can't tell us the
// previous path of deleted items, only the item ID.
if _, ok := d.excludeSet[entName]; ok {
continue
}
// This is a path used in corso not kopia so it doesn't need to encode the
// item name.
itemPath, err := d.params.currentPath.AppendItem(entName)
if err != nil {
err = clues.WrapWC(ctx, err, "getting full item path for base entry")
d.progress.errs.AddRecoverable(ctx, err)
continue
}
ctx = clues.Add(ctx, "item_path", itemPath)
// We need the previous path so we can find this item in the base snapshot's
// backup details. If the item moved and we had only the new path, we'd be
// unable to find it in the old backup details because we wouldn't know what
// to look for.
prevItemPath, err := d.params.prevPath.AppendItem(entName)
if err != nil {
err = clues.WrapWC(
ctx,
err,
"getting previous full item path for base entry")
d.progress.errs.AddRecoverable(ctx, err)
continue
}
// Meta files aren't in backup details since it's the set of items the
// user sees.
//
// TODO(ashmrtn): We may eventually want to make this a function that is
// passed in so that we can more easily switch it between different
// external service provider implementations.
if !metadata.IsMetadataFile(itemPath) {
// All items have item info in the base backup. However, we need to make
// sure we have enough metadata to find those entries. To do that we add
// the item to progress and having progress aggregate everything for
// later.
detailsEnt := &itemDetails{
repoPath: itemPath,
prevPath: prevItemPath,
locationPath: d.locationPath,
modTime: ptr.To(entry.ModTime()),
}
d.progress.put(
encodeAsPath(itemPath.PopFront().Elements()...),
detailsEnt)
}
return entry, nil
}
return nil, clues.Stack(err).OrNil()
}
// buildKopiaDirs recursively builds a directory hierarchy from the roots up.
// Returned directories are virtualfs.StreamingDirectory.
func buildKopiaDirs(
ctx context.Context,
dirName string,
dir *treeMap,
globalExcludeSet prefixmatcher.StringSetReader,
@ -572,6 +704,7 @@ func buildKopiaDirs(
for childName, childDir := range dir.childDirs {
child, err := buildKopiaDirs(
ctx,
childName,
childDir,
globalExcludeSet,
@ -585,11 +718,13 @@ func buildKopiaDirs(
return virtualfs.NewStreamingDirectory(
encodeAsPath(dirName),
getStreamItemFunc(
dir.snapshotParams,
childDirs,
globalExcludeSet,
progress)), nil
&corsoDirectoryIterator{
ctx: ctx,
params: dir.snapshotParams,
staticEnts: childDirs,
globalExcludeSet: globalExcludeSet,
progress: progress,
}), nil
}
type snapshotParams struct {
@ -987,23 +1122,26 @@ func traverseBaseDir(
var hasItems bool
if changed {
err = dir.IterateEntries(ctx, func(innerCtx context.Context, entry fs.Entry) error {
dEntry, ok := entry.(fs.Directory)
if !ok {
hasItems = true
return nil
}
err = fs.IterateEntries(
ctx,
dir,
func(innerCtx context.Context, entry fs.Entry) error {
dEntry, ok := entry.(fs.Directory)
if !ok {
hasItems = true
return nil
}
return traverseBaseDir(
innerCtx,
depth+1,
updatedPaths,
oldDirPath,
currentPath,
dEntry,
roots,
stats)
})
return traverseBaseDir(
innerCtx,
depth+1,
updatedPaths,
oldDirPath,
currentPath,
dEntry,
roots,
stats)
})
if err != nil {
return clues.WrapWC(ctx, err, "traversing base directory")
}
@ -1260,7 +1398,7 @@ func inflateDirTree(
var res fs.Directory
for dirName, dir := range roots {
tmp, err := buildKopiaDirs(dirName, dir, globalExcludeSet, progress)
tmp, err := buildKopiaDirs(ctx, dirName, dir, globalExcludeSet, progress)
if err != nil {
return nil, err
}

View File

@ -2556,12 +2556,13 @@ type mockStaticDirectory struct {
iterateCount int
}
func (msd *mockStaticDirectory) IterateEntries(
func (msd *mockStaticDirectory) Iterate(
ctx context.Context,
callback func(context.Context, fs.Entry) error,
) error {
) (fs.DirectoryIterator, error) {
msd.iterateCount++
return msd.Directory.IterateEntries(ctx, callback)
iter, err := msd.Directory.Iterate(ctx)
return iter, clues.Stack(err).OrNil()
}
func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreePruning() {