corso/src/internal/kopia/upload.go
ashmrtn a25948042a
Wire up LocationIRef stuff to details merge (#3110)
This still doesn't populate LocationRef for
OneDrive, but gets a lot of the other logic
that will be required for that in place.

Mostly it focuses on starting to use
LocationIDer for lookups when merging
details

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

* #2486

#### Test Plan

- [x] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
2023-04-14 00:48:12 +00:00

1105 lines
33 KiB
Go

package kopia
import (
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"fmt"
"io"
"os"
"runtime/trace"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/alcionai/clues"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/graph/metadata"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
)
const maxInflateTraversalDepth = 500
var versionSize = int(unsafe.Sizeof(serializationVersion))
func newBackupStreamReader(version uint32, reader io.ReadCloser) *backupStreamReader {
buf := make([]byte, versionSize)
binary.BigEndian.PutUint32(buf, version)
bufReader := io.NopCloser(bytes.NewReader(buf))
return &backupStreamReader{
readers: []io.ReadCloser{bufReader, reader},
combined: io.NopCloser(io.MultiReader(bufReader, reader)),
}
}
// backupStreamReader is a wrapper around the io.Reader that other Corso
// components return when backing up information. It injects a version number at
// the start of the data stream. Future versions of Corso may not need this if
// they use more complex serialization logic as serialization/version injection
// will be handled by other components.
type backupStreamReader struct {
readers []io.ReadCloser
combined io.ReadCloser
}
func (rw *backupStreamReader) Read(p []byte) (n int, err error) {
if rw.combined == nil {
return 0, os.ErrClosed
}
return rw.combined.Read(p)
}
func (rw *backupStreamReader) Close() error {
if rw.combined == nil {
return nil
}
rw.combined = nil
for _, r := range rw.readers {
r.Close()
}
return nil
}
// restoreStreamReader is a wrapper around the io.Reader that kopia returns when
// reading data from an item. It examines and strips off the version number of
// the restored data. Future versions of Corso may not need this if they use
// more complex serialization logic as version checking/deserialization will be
// handled by other components. A reader that returns a version error is no
// longer valid and should not be used once the version error is returned.
type restoreStreamReader struct {
io.ReadCloser
expectedVersion uint32
readVersion bool
}
func (rw *restoreStreamReader) checkVersion() error {
versionBuf := make([]byte, versionSize)
for newlyRead := 0; newlyRead < versionSize; {
n, err := rw.ReadCloser.Read(versionBuf[newlyRead:])
if err != nil {
return clues.Wrap(err, "reading data format version")
}
newlyRead += n
}
version := binary.BigEndian.Uint32(versionBuf)
if version != rw.expectedVersion {
return clues.New("unexpected data format").With("read_version", version)
}
return nil
}
func (rw *restoreStreamReader) Read(p []byte) (n int, err error) {
if !rw.readVersion {
rw.readVersion = true
if err := rw.checkVersion(); err != nil {
return 0, err
}
}
return rw.ReadCloser.Read(p)
}
type itemDetails struct {
info *details.ItemInfo
repoPath path.Path
prevPath path.Path
locationPath *path.Builder
cached bool
}
type corsoProgress struct {
snapshotfs.UploadProgress
pending map[string]*itemDetails
deets *details.Builder
// toMerge represents items that we don't have in-memory item info for. The
// item info for these items should be sourced from a base snapshot later on.
toMerge *mergeDetails
mu sync.RWMutex
totalBytes int64
errs *fault.Bus
// expectedIgnoredErrors is a count of error cases caught in the Error wrapper
// which are well known and actually ignorable. At the end of a run, if the
// manifest ignored error count is equal to this count, then everything is good.
expectedIgnoredErrors int
}
// mutexted wrapper around expectedIgnoredErrors++
func (cp *corsoProgress) incExpectedErrs() {
cp.mu.Lock()
defer cp.mu.Unlock()
cp.expectedIgnoredErrors++
}
// Kopia interface function used as a callback when kopia finishes processing a
// file.
func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
// Pass the call through as well so we don't break expected functionality.
defer cp.UploadProgress.FinishedFile(relativePath, err)
// Whether it succeeded or failed, remove the entry from our pending set so we
// don't leak references.
defer func() {
cp.mu.Lock()
defer cp.mu.Unlock()
delete(cp.pending, relativePath)
}()
if err != nil {
return
}
d := cp.get(relativePath)
if d == nil {
return
}
// These items were sourced from a base snapshot or were cached in kopia so we
// never had to materialize their details in-memory.
if d.info == nil {
if d.prevPath == nil {
cp.errs.AddRecoverable(clues.New("item sourced from previous backup with no previous path").
With(
"service", d.repoPath.Service().String(),
"category", d.repoPath.Category().String(),
).
Label(fault.LabelForceNoBackupCreation))
return
}
cp.mu.Lock()
defer cp.mu.Unlock()
err := cp.toMerge.addRepoRef(d.prevPath.ToBuilder(), d.repoPath, d.locationPath)
if err != nil {
cp.errs.AddRecoverable(clues.Wrap(err, "adding item to merge list").
With(
"service", d.repoPath.Service().String(),
"category", d.repoPath.Category().String(),
).
Label(fault.LabelForceNoBackupCreation))
}
return
}
var (
locationFolders string
parent = d.repoPath.ToBuilder().Dir()
)
if d.locationPath != nil {
locationFolders = d.locationPath.String()
}
err = cp.deets.Add(
d.repoPath.String(),
d.repoPath.ShortRef(),
parent.ShortRef(),
locationFolders,
!d.cached,
*d.info)
if err != nil {
cp.errs.AddRecoverable(clues.New("adding item to details").
With(
"service", d.repoPath.Service().String(),
"category", d.repoPath.Category().String(),
).
Label(fault.LabelForceNoBackupCreation))
return
}
folders := details.FolderEntriesForPath(parent, d.locationPath)
cp.deets.AddFoldersForItem(
folders,
*d.info,
!d.cached)
}
// Kopia interface function used as a callback when kopia finishes hashing a file.
func (cp *corsoProgress) FinishedHashingFile(fname string, bs int64) {
// Pass the call through as well so we don't break expected functionality.
defer cp.UploadProgress.FinishedHashingFile(fname, bs)
sl := strings.Split(fname, "/")
for i := range sl {
rdt, err := base64.StdEncoding.DecodeString(sl[i])
if err != nil {
fmt.Println("f did not decode")
}
sl[i] = string(rdt)
}
logger.Ctx(context.Background()).Debugw("finished hashing file", "path", sl[2:])
atomic.AddInt64(&cp.totalBytes, bs)
}
// Kopia interface function used as a callback when kopia detects a previously
// uploaded file that matches the current file and skips uploading the new
// (duplicate) version.
func (cp *corsoProgress) CachedFile(fname string, size int64) {
defer cp.UploadProgress.CachedFile(fname, size)
d := cp.get(fname)
if d == nil {
return
}
d.cached = true
}
// Kopia interface function used as a callback when kopia encounters an error
// during the upload process. This could be from reading a file or something
// else.
func (cp *corsoProgress) Error(relpath string, err error, isIgnored bool) {
// LabelsSkippable is set of malware items or not found items.
// The malware case is an artifact of being unable to skip the
// item if we catch detection at a late enough stage in collection
// enumeration. The not found could be items deleted in between a
// delta query and a fetch. This is our next point of error
// handling, where we can identify and skip over the case.
if clues.HasLabel(err, graph.LabelsSkippable) {
cp.incExpectedErrs()
return
}
defer cp.UploadProgress.Error(relpath, err, isIgnored)
cp.errs.AddRecoverable(clues.Wrap(err, "kopia reported error").
With("is_ignored", isIgnored, "relative_path", relpath).
Label(fault.LabelForceNoBackupCreation))
}
func (cp *corsoProgress) put(k string, v *itemDetails) {
cp.mu.Lock()
defer cp.mu.Unlock()
cp.pending[k] = v
}
func (cp *corsoProgress) get(k string) *itemDetails {
cp.mu.RLock()
defer cp.mu.RUnlock()
return cp.pending[k]
}
func collectionEntries(
ctx context.Context,
cb func(context.Context, fs.Entry) error,
streamedEnts data.BackupCollection,
progress *corsoProgress,
) (map[string]struct{}, error) {
if streamedEnts == nil {
return nil, nil
}
var (
locationPath *path.Builder
// Track which items have already been seen so we can skip them if we see
// them again in the data from the base snapshot.
seen = map[string]struct{}{}
items = streamedEnts.Items(ctx, progress.errs)
)
if lp, ok := streamedEnts.(data.LocationPather); ok {
locationPath = lp.LocationPath()
}
for {
select {
case <-ctx.Done():
return seen, clues.Stack(ctx.Err()).WithClues(ctx)
case e, ok := <-items:
if !ok {
return seen, nil
}
encodedName := encodeAsPath(e.UUID())
// Even if this item has been deleted and should not appear at all in
// the new snapshot we need to record that we've seen it here so we know
// to skip it if it's also present in the base snapshot.
//
// TODO(ashmrtn): Determine if we want to try to use the old version of
// the data (if it exists in the base) if we fail uploading the new
// version. If so, we should split this call into where we check for the
// item being deleted and then again after we do the kopia callback.
//
// TODO(ashmrtn): With a little more info, we could reduce the number of
// items we need to track. Namely, we can track the created time of the
// item and if it's after the base snapshot was finalized we can skip it
// because it's not possible for the base snapshot to contain that item.
seen[encodedName] = struct{}{}
// For now assuming that item IDs don't need escaping.
itemPath, err := streamedEnts.FullPath().Append(e.UUID(), true)
if err != nil {
err = clues.Wrap(err, "getting full item path")
progress.errs.AddRecoverable(err)
logger.CtxErr(ctx, err).Error("getting full item path")
continue
}
trace.Log(ctx, "kopia:streamEntries:item", itemPath.String())
if e.Deleted() {
continue
}
// Not all items implement StreamInfo. For example, the metadata files
// do not because they don't contain information directly backed up or
// used for restore. If progress does not contain information about a
// finished file it just returns without an error so it's safe to skip
// adding something to it.
ei, ok := e.(data.StreamInfo)
if ok {
// Relative path given to us in the callback is missing the root
// element. Add to pending set before calling the callback to avoid race
// conditions when the item is completed.
//
// TODO(ashmrtn): If we want to pull item info for cached item from a
// previous snapshot then we should populate prevPath here and leave
// info nil.
itemInfo := ei.Info()
d := &itemDetails{
info: &itemInfo,
repoPath: itemPath,
locationPath: locationPath,
}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
}
modTime := time.Now()
if smt, ok := e.(data.StreamModTime); ok {
modTime = smt.ModTime()
}
entry := virtualfs.StreamingFileWithModTimeFromReader(
encodedName,
modTime,
newBackupStreamReader(serializationVersion, e.ToReader()))
err = cb(ctx, entry)
if err != nil {
// Kopia's uploader swallows errors in most cases, so if we see
// something here it's probably a big issue and we should return.
return seen, clues.Wrap(err, "executing callback").WithClues(ctx).With("item_path", itemPath)
}
}
}
}
func streamBaseEntries(
ctx context.Context,
cb func(context.Context, fs.Entry) error,
curPath path.Path,
prevPath path.Path,
locationPath *path.Builder,
dir fs.Directory,
encodedSeen map[string]struct{},
globalExcludeSet map[string]map[string]struct{},
progress *corsoProgress,
) error {
if dir == nil {
return nil
}
var (
excludeSet map[string]struct{}
curPrefix string
)
ctx = clues.Add(ctx, "current_item_path", curPath)
for prefix, excludes := range globalExcludeSet {
// Select the set with the longest prefix to be most precise.
if strings.HasPrefix(curPath.String(), prefix) && len(prefix) >= len(curPrefix) {
excludeSet = excludes
curPrefix = prefix
}
}
err := dir.IterateEntries(ctx, func(innerCtx context.Context, entry fs.Entry) error {
if err := innerCtx.Err(); err != nil {
return err
}
// Don't walk subdirectories in this function.
if _, ok := entry.(fs.Directory); ok {
return nil
}
// This entry was either updated or deleted. In either case, the external
// service notified us about it and it's already been handled so we should
// skip it here.
if _, ok := encodedSeen[entry.Name()]; ok {
return nil
}
entName, err := decodeElement(entry.Name())
if err != nil {
return clues.Wrap(err, "decoding entry name: "+entry.Name())
}
// This entry was marked as deleted by a service that can't tell us the
// previous path of deleted items, only the item ID.
if _, ok := excludeSet[entName]; ok {
return nil
}
// For now assuming that item IDs don't need escaping.
itemPath, err := curPath.Append(entName, true)
if err != nil {
return clues.Wrap(err, "getting full item path for base entry")
}
// We need the previous path so we can find this item in the base snapshot's
// backup details. If the item moved and we had only the new path, we'd be
// unable to find it in the old backup details because we wouldn't know what
// to look for.
prevItemPath, err := prevPath.Append(entName, true)
if err != nil {
return clues.Wrap(err, "getting previous full item path for base entry")
}
// Meta files aren't in backup details since it's the set of items the user
// sees.
//
// TODO(ashmrtn): We may eventually want to make this a function that is
// passed in so that we can more easily switch it between different external
// service provider implementations.
if !metadata.IsMetadataFile(itemPath) {
// All items have item info in the base backup. However, we need to make
// sure we have enough metadata to find those entries. To do that we add
// the item to progress and having progress aggregate everything for
// later.
d := &itemDetails{
info: nil,
repoPath: itemPath,
prevPath: prevItemPath,
locationPath: locationPath,
}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
}
if err := cb(ctx, entry); err != nil {
return clues.Wrap(err, "executing callback on item").With("item_path", itemPath)
}
return nil
})
if err != nil {
return clues.Wrap(err, "traversing items in base snapshot directory")
}
return nil
}
// getStreamItemFunc returns a function that can be used by kopia's
// virtualfs.StreamingDirectory to iterate through directory entries and call
// kopia callbacks on directory entries. It binds the directory to the given
// DataCollection.
func getStreamItemFunc(
curPath path.Path,
prevPath path.Path,
staticEnts []fs.Entry,
streamedEnts data.BackupCollection,
baseDir fs.Directory,
globalExcludeSet map[string]map[string]struct{},
progress *corsoProgress,
) func(context.Context, func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
ctx, end := diagnostics.Span(ctx, "kopia:getStreamItemFunc")
defer end()
// Return static entries in this directory first.
for _, d := range staticEnts {
if err := cb(ctx, d); err != nil {
return clues.Wrap(err, "executing callback on static directory").WithClues(ctx)
}
}
var locationPath *path.Builder
if lp, ok := streamedEnts.(data.LocationPather); ok {
locationPath = lp.LocationPath()
}
seen, err := collectionEntries(ctx, cb, streamedEnts, progress)
if err != nil {
return clues.Wrap(err, "streaming collection entries")
}
if err := streamBaseEntries(
ctx,
cb,
curPath,
prevPath,
locationPath,
baseDir,
seen,
globalExcludeSet,
progress,
); err != nil {
return clues.Wrap(err, "streaming base snapshot entries")
}
return nil
}
}
// buildKopiaDirs recursively builds a directory hierarchy from the roots up.
// Returned directories are virtualfs.StreamingDirectory.
func buildKopiaDirs(
dirName string,
dir *treeMap,
globalExcludeSet map[string]map[string]struct{},
progress *corsoProgress,
) (fs.Directory, error) {
// Need to build the directory tree from the leaves up because intermediate
// directories need to have all their entries at creation time.
var childDirs []fs.Entry
// TODO(ashmrtn): Reuse kopia directories directly if the subtree rooted at
// them is unchanged.
//
// This has a few restrictions though:
// * if we allow for moved folders, we need to make sure we update folder
// names properly
// * we need some way to know what items need to be pulled from the base
// backup's backup details
for childName, childDir := range dir.childDirs {
child, err := buildKopiaDirs(childName, childDir, globalExcludeSet, progress)
if err != nil {
return nil, err
}
childDirs = append(childDirs, child)
}
return virtualfs.NewStreamingDirectory(
encodeAsPath(dirName),
getStreamItemFunc(
dir.currentPath,
dir.prevPath,
childDirs,
dir.collection,
dir.baseDir,
globalExcludeSet,
progress,
),
), nil
}
type treeMap struct {
// path.Path representing the node's path. This is passed as a parameter to
// the stream item function so that even baseDir directories can properly
// generate the full path of items.
currentPath path.Path
// Previous path this directory may have resided at if it is sourced from a
// base snapshot.
prevPath path.Path
// Child directories of this directory.
childDirs map[string]*treeMap
// Reference to data pulled from the external service. Contains only items in
// this directory. Does not contain references to subdirectories.
collection data.BackupCollection
// Reference to directory in base snapshot. The referenced directory itself
// may contain files and subdirectories, but the subdirectories should
// eventually be added when walking the base snapshot to build the hierarchy,
// not when handing items to kopia for the new snapshot. Subdirectories should
// be added to childDirs while building the hierarchy. They will be ignored
// when iterating through the directory to hand items to kopia.
baseDir fs.Directory
}
func newTreeMap() *treeMap {
return &treeMap{
childDirs: map[string]*treeMap{},
}
}
// maybeGetTreeNode walks the tree(s) with roots roots and returns the node
// specified by pathElements if all nodes on the path exist. If pathElements is
// nil or empty then returns nil.
func maybeGetTreeNode(roots map[string]*treeMap, pathElements []string) *treeMap {
if len(pathElements) == 0 {
return nil
}
dir := roots[pathElements[0]]
for i := 1; i < len(pathElements); i++ {
if dir == nil {
return nil
}
p := pathElements[i]
dir = dir.childDirs[p]
}
return dir
}
// getTreeNode walks the tree(s) with roots roots and returns the node specified
// by pathElements. If pathElements is nil or empty then returns nil. Tree nodes
// are created for any path elements where a node is not already present.
func getTreeNode(roots map[string]*treeMap, pathElements []string) *treeMap {
if len(pathElements) == 0 {
return nil
}
dir, ok := roots[pathElements[0]]
if !ok {
dir = newTreeMap()
roots[pathElements[0]] = dir
}
// Use actual indices so this is automatically skipped if
// len(pathElements) == 1.
for i := 1; i < len(pathElements); i++ {
p := pathElements[i]
newDir := dir.childDirs[p]
if newDir == nil {
newDir = newTreeMap()
if dir.childDirs == nil {
dir.childDirs = map[string]*treeMap{}
}
dir.childDirs[p] = newDir
}
dir = newDir
}
return dir
}
func addMergeLocation(col data.BackupCollection, toMerge *mergeDetails) error {
lp, ok := col.(data.PreviousLocationPather)
if !ok {
return nil
}
prevLoc := lp.PreviousLocationPath()
newLoc := lp.LocationPath()
if prevLoc == nil {
return clues.New("moved collection with nil previous location")
} else if newLoc == nil {
return clues.New("moved collection with nil location")
}
if err := toMerge.addLocation(prevLoc, newLoc); err != nil {
return clues.Wrap(err, "building updated location set").
With(
"collection_previous_location", prevLoc,
"collection_location", newLoc)
}
return nil
}
func inflateCollectionTree(
ctx context.Context,
collections []data.BackupCollection,
toMerge *mergeDetails,
) (map[string]*treeMap, map[string]path.Path, error) {
roots := make(map[string]*treeMap)
// Contains the old path for collections that have been moved or renamed.
// Allows resolving what the new path should be when walking the base
// snapshot(s)'s hierarchy. Nil represents a collection that was deleted.
updatedPaths := make(map[string]path.Path)
// Temporary variable just to track the things that have been marked as
// changed while keeping a reference to their path.
changedPaths := []path.Path{}
for _, s := range collections {
ictx := clues.Add(
ctx,
"collection_full_path", s.FullPath(),
"collection_previous_path", s.PreviousPath())
switch s.State() {
case data.DeletedState:
if s.PreviousPath() == nil {
return nil, nil, clues.New("nil previous path on deleted collection").WithClues(ictx)
}
changedPaths = append(changedPaths, s.PreviousPath())
if _, ok := updatedPaths[s.PreviousPath().String()]; ok {
return nil, nil, clues.New("multiple previous state changes to collection").
WithClues(ictx)
}
updatedPaths[s.PreviousPath().String()] = nil
continue
case data.MovedState:
changedPaths = append(changedPaths, s.PreviousPath())
if _, ok := updatedPaths[s.PreviousPath().String()]; ok {
return nil, nil, clues.New("multiple previous state changes to collection").
WithClues(ictx)
}
updatedPaths[s.PreviousPath().String()] = s.FullPath()
// Only safe when collections are moved since we only need prefix matching
// if a nested folder's path changed in some way that didn't generate a
// collection. For that to the be case, the nested folder's path must have
// changed via one of the ancestor folders being moved. This catches the
// ancestor folder move.
if err := addMergeLocation(s, toMerge); err != nil {
return nil, nil, clues.Wrap(err, "adding merge location").WithClues(ictx)
}
}
if s.FullPath() == nil || len(s.FullPath().Elements()) == 0 {
return nil, nil, clues.New("no identifier for collection").WithClues(ictx)
}
node := getTreeNode(roots, s.FullPath().Elements())
if node == nil {
return nil, nil, clues.New("getting tree node").WithClues(ictx)
}
// Make sure there's only a single collection adding items for any given
// path in the new hierarchy.
if node.collection != nil {
return nil, nil, clues.New("multiple instances of collection").WithClues(ictx)
}
node.collection = s
node.currentPath = s.FullPath()
node.prevPath = s.PreviousPath()
}
// Check that each previous path has only one of the states of deleted, moved,
// or notmoved. Check at the end to avoid issues like seeing a notmoved state
// collection and then a deleted state collection.
for _, p := range changedPaths {
node := maybeGetTreeNode(roots, p.Elements())
if node == nil {
continue
}
if node.collection != nil && node.collection.State() == data.NotMovedState {
return nil, nil, clues.New("conflicting states for collection").With("changed_path", p)
}
}
return roots, updatedPaths, nil
}
// traverseBaseDir is an unoptimized function that reads items in a directory
// and traverses subdirectories in the given directory. oldDirPath is the path
// the directory would be at if the hierarchy was unchanged. expectedDirPath is the
// path the directory would be at if all changes from the root to this directory
// were taken into account. Both are needed to detect some changes like moving
// a parent directory and moving one of the child directories out of the parent.
// If a directory on the path was deleted, expectedDirPath is set to nil.
//
// TODO(ashmrtn): A potentially more memory efficient version of this would
// traverse only the directories that we know are present in the collections
// passed in. The other directories could be dynamically discovered when kopia
// was requesting items.
func traverseBaseDir(
ctx context.Context,
depth int,
updatedPaths map[string]path.Path,
oldDirPath *path.Builder,
expectedDirPath *path.Builder,
dir fs.Directory,
roots map[string]*treeMap,
) error {
ctx = clues.Add(ctx,
"old_dir_path", oldDirPath,
"expected_dir_path", expectedDirPath)
if depth >= maxInflateTraversalDepth {
return clues.New("base snapshot tree too tall")
}
// Wrapper base64 encodes all file and folder names to avoid issues with
// special characters. Since we're working directly with files and folders
// from kopia we need to do the decoding here.
dirName, err := decodeElement(dir.Name())
if err != nil {
return clues.Wrap(err, "decoding base directory name").With("dir_name", dir.Name())
}
// Form the path this directory would be at if the hierarchy remained the same
// as well as where it would be at if we take into account ancestor
// directories that may have had changes. The former is used to check if this
// directory specifically has been moved. The latter is used to handle
// deletions and moving subtrees in the hierarchy.
//
// Explicit movement of directories should have the final say though so we
// override any subtree movement with what's in updatedPaths if an entry
// exists.
oldDirPath = oldDirPath.Append(dirName)
currentPath := expectedDirPath
if currentPath != nil {
currentPath = currentPath.Append(dirName)
}
if upb, ok := updatedPaths[oldDirPath.String()]; ok {
// This directory was deleted.
if upb == nil {
currentPath = nil
} else {
// This directory was moved/renamed and the new location is in upb.
currentPath = upb.ToBuilder()
}
}
ctx = clues.Add(ctx, "new_path", currentPath)
// TODO(ashmrtn): If we can do prefix matching on elements in updatedPaths and
// we know that the tree node for this directory has no collection reference
// and no child nodes then we can skip traversing this directory. This will
// only work if we know what directory deleted items used to belong in (e.x.
// it won't work for OneDrive because we only know the ID of the deleted
// item).
var hasItems bool
err = dir.IterateEntries(ctx, func(innerCtx context.Context, entry fs.Entry) error {
dEntry, ok := entry.(fs.Directory)
if !ok {
hasItems = true
return nil
}
return traverseBaseDir(
innerCtx,
depth+1,
updatedPaths,
oldDirPath,
currentPath,
dEntry,
roots,
)
})
if err != nil {
return clues.Wrap(err, "traversing base directory")
}
// We only need to add this base directory to the tree we're building if it
// has items in it. The traversal of the directory here just finds
// subdirectories. This optimization will not be valid if we dynamically
// determine the subdirectories this directory has when handing items to
// kopia.
if currentPath != nil && hasItems {
// Having this in the if-block has the effect of removing empty directories
// from backups that have a base snapshot. If we'd like to preserve empty
// directories across incremental backups, move getting the node outside of
// the if-block. That will be sufficient to create a StreamingDirectory that
// kopia will pick up on. Assigning the baseDir of the node should remain
// in the if-block though as that is an optimization.
node := getTreeNode(roots, currentPath.Elements())
if node == nil {
return clues.New("getting tree node")
}
// Now that we have the node we need to check if there is a collection
// marked DoNotMerge. If there is, skip adding a reference to this base dir
// in the node. That allows us to propagate subtree operations (e.x. move)
// while selectively skipping merging old and new versions for some
// directories. The expected usecase for this is delta token expiry in M365.
if node.collection != nil &&
(node.collection.DoNotMergeItems() || node.collection.State() == data.NewState) {
return nil
}
curP, err := path.FromDataLayerPath(currentPath.String(), false)
if err != nil {
return clues.New("converting current path to path.Path")
}
oldP, err := path.FromDataLayerPath(oldDirPath.String(), false)
if err != nil {
return clues.New("converting old path to path.Path")
}
node.baseDir = dir
node.currentPath = curP
node.prevPath = oldP
}
return nil
}
func inflateBaseTree(
ctx context.Context,
loader snapshotLoader,
snap IncrementalBase,
updatedPaths map[string]path.Path,
roots map[string]*treeMap,
) error {
// Only complete snapshots should be used to source base information.
// Snapshots for checkpoints will rely on kopia-assisted dedupe to efficiently
// handle items that were completely uploaded before Corso crashed.
if len(snap.IncompleteReason) > 0 {
return nil
}
ctx = clues.Add(ctx, "snapshot_base_id", snap.ID)
root, err := loader.SnapshotRoot(snap.Manifest)
if err != nil {
return clues.Wrap(err, "getting snapshot root directory").WithClues(ctx)
}
dir, ok := root.(fs.Directory)
if !ok {
return clues.New("snapshot root is not a directory").WithClues(ctx)
}
// For each subtree corresponding to the tuple
// (resource owner, service, category) merge the directories in the base with
// what has been reported in the collections we got.
for _, subtreePath := range snap.SubtreePaths {
// We're starting from the root directory so don't need it in the path.
pathElems := encodeElements(subtreePath.PopFront().Elements()...)
ictx := clues.Add(ctx, "subtree_path", subtreePath)
ent, err := snapshotfs.GetNestedEntry(ictx, dir, pathElems)
if err != nil {
if isErrEntryNotFound(err) {
logger.CtxErr(ictx, err).Infow("base snapshot missing subtree")
continue
}
return clues.Wrap(err, "getting subtree root").WithClues(ictx)
}
subtreeDir, ok := ent.(fs.Directory)
if !ok {
return clues.Wrap(err, "subtree root is not directory").WithClues(ictx)
}
// We're assuming here that the prefix for the path has not changed (i.e.
// all of tenant, service, resource owner, and category are the same in the
// old snapshot (snap) and the snapshot we're currently trying to make.
if err = traverseBaseDir(
ictx,
0,
updatedPaths,
subtreePath.Dir(),
subtreePath.Dir(),
subtreeDir,
roots,
); err != nil {
return clues.Wrap(err, "traversing base snapshot").WithClues(ictx)
}
}
return nil
}
// inflateDirTree returns a set of tags representing all the resource owners and
// service/categories in the snapshot and a fs.Directory tree rooted at the
// oldest common ancestor of the streams. All nodes are
// virtualfs.StreamingDirectory with the given DataCollections if there is one
// for that node. Tags can be used in future backups to fetch old snapshots for
// caching reasons.
//
// globalExcludeSet represents a set of items, represented with file names, to
// exclude from base directories when uploading the snapshot. As items in *all*
// base directories will be checked for in every base directory, this assumes
// that items in the bases are unique. Deletions of directories or subtrees
// should be represented as changes in the status of a BackupCollection, not an
// entry in the globalExcludeSet.
func inflateDirTree(
ctx context.Context,
loader snapshotLoader,
baseSnaps []IncrementalBase,
collections []data.BackupCollection,
globalExcludeSet map[string]map[string]struct{},
progress *corsoProgress,
) (fs.Directory, error) {
roots, updatedPaths, err := inflateCollectionTree(ctx, collections, progress.toMerge)
if err != nil {
return nil, clues.Wrap(err, "inflating collection tree")
}
baseIDs := make([]manifest.ID, 0, len(baseSnaps))
for _, snap := range baseSnaps {
baseIDs = append(baseIDs, snap.ID)
}
ctx = clues.Add(ctx, "len_base_snapshots", len(baseSnaps), "base_snapshot_ids", baseIDs)
if len(baseIDs) > 0 {
logger.Ctx(ctx).Info("merging hierarchies from base snapshots")
} else {
logger.Ctx(ctx).Info("no base snapshots to merge")
}
for _, snap := range baseSnaps {
if err = inflateBaseTree(ctx, loader, snap, updatedPaths, roots); err != nil {
return nil, clues.Wrap(err, "inflating base snapshot tree(s)")
}
}
if len(roots) > 1 {
return nil, clues.New("multiple root directories")
}
var res fs.Directory
for dirName, dir := range roots {
tmp, err := buildKopiaDirs(dirName, dir, globalExcludeSet, progress)
if err != nil {
return nil, err
}
res = tmp
}
return res, nil
}