corso/src/internal/kopia/wrapper.go
ashmrtn 9e66f197c0
Create and populate folder backup details entries during backup (#872)
## Description

Other components may need to rebuild the directory hierarchy of items. As the paths Corso deals with can be hard to properly parse at times, store that information in the Corso backup details. The hierarchy can be rebuilt by following the `ParentRef` fields of items. The item at the root of the hierarchy has an empty `ParentRef` field.

Also hide these folders from end-users. They are not displayed during backup list nor are they eligible as a target for restore

## Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🐹 Trivial/Minor

## Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* closes #862
* closes #861
* closes #818 

merge after:
* #869 

## Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
2022-09-16 01:42:01 +00:00

637 lines
16 KiB
Go

package kopia
import (
"context"
"sync"
"github.com/hashicorp/go-multierror"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/path"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/logger"
)
const (
// TODO(ashmrtnz): These should be some values from upper layer corso,
// possibly corresponding to who is making the backup.
corsoHost = "corso-host"
corsoUser = "corso"
)
var (
errNotConnected = errors.New("not connected to repo")
errNoRestorePath = errors.New("no restore path given")
)
type BackupStats struct {
SnapshotID string
TotalFileCount int
TotalDirectoryCount int
IgnoredErrorCount int
ErrorCount int
Incomplete bool
IncompleteReason string
}
func manifestToStats(man *snapshot.Manifest) BackupStats {
return BackupStats{
SnapshotID: string(man.ID),
TotalFileCount: int(man.Stats.TotalFileCount),
TotalDirectoryCount: int(man.Stats.TotalDirectoryCount),
IgnoredErrorCount: int(man.Stats.IgnoredErrorCount),
ErrorCount: int(man.Stats.ErrorCount),
Incomplete: man.IncompleteReason != "",
IncompleteReason: man.IncompleteReason,
}
}
type itemDetails struct {
info details.ItemInfo
repoPath path.Path
}
type corsoProgress struct {
snapshotfs.UploadProgress
pending map[string]*itemDetails
deets *details.Details
mu sync.RWMutex
}
// Kopia interface function used as a callback when kopia finishes processing a
// file.
func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
// Pass the call through as well so we don't break expected functionality.
defer cp.UploadProgress.FinishedFile(relativePath, err)
// Whether it succeeded or failed, remove the entry from our pending set so we
// don't leak references.
defer func() {
cp.mu.Lock()
defer cp.mu.Unlock()
delete(cp.pending, relativePath)
}()
if err != nil {
return
}
d := cp.get(relativePath)
if d == nil {
return
}
parent := d.repoPath.ToBuilder().Dir()
cp.deets.Add(
d.repoPath.String(),
d.repoPath.ShortRef(),
parent.ShortRef(),
d.info,
)
folders := []details.FolderEntry{}
for len(parent.Elements()) > 0 {
nextParent := parent.Dir()
folders = append(folders, details.FolderEntry{
RepoRef: parent.String(),
ShortRef: parent.ShortRef(),
ParentRef: nextParent.ShortRef(),
Info: details.ItemInfo{
Folder: &details.FolderInfo{
DisplayName: parent.Elements()[len(parent.Elements())-1],
},
},
})
parent = nextParent
}
cp.deets.AddFolders(folders)
}
func (cp *corsoProgress) put(k string, v *itemDetails) {
cp.mu.Lock()
defer cp.mu.Unlock()
cp.pending[k] = v
}
func (cp *corsoProgress) get(k string) *itemDetails {
cp.mu.RLock()
defer cp.mu.RUnlock()
return cp.pending[k]
}
func NewWrapper(c *conn) (*Wrapper, error) {
if err := c.wrap(); err != nil {
return nil, errors.Wrap(err, "creating Wrapper")
}
return &Wrapper{c}, nil
}
type Wrapper struct {
c *conn
}
func (w *Wrapper) Close(ctx context.Context) error {
if w.c == nil {
return nil
}
err := w.c.Close(ctx)
w.c = nil
return errors.Wrap(err, "closing Wrapper")
}
// getStreamItemFunc returns a function that can be used by kopia's
// virtualfs.StreamingDirectory to iterate through directory entries and call
// kopia callbacks on directory entries. It binds the directory to the given
// DataCollection.
func getStreamItemFunc(
staticEnts []fs.Entry,
streamedEnts data.Collection,
progress *corsoProgress,
) func(context.Context, func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
// Collect all errors and return them at the end so that iteration for this
// directory doesn't end early.
var errs *multierror.Error
// Return static entries in this directory first.
for _, d := range staticEnts {
if err := cb(ctx, d); err != nil {
return errors.Wrap(err, "executing callback on static directory")
}
}
if streamedEnts == nil {
return nil
}
items := streamedEnts.Items()
for {
select {
case <-ctx.Done():
return ctx.Err()
case e, ok := <-items:
if !ok {
return errs.ErrorOrNil()
}
// For now assuming that item IDs don't need escaping.
itemPath, err := streamedEnts.FullPath().Append(e.UUID(), true)
if err != nil {
err = errors.Wrap(err, "getting full item path")
errs = multierror.Append(errs, err)
logger.Ctx(ctx).Error(err)
continue
}
ei, ok := e.(data.StreamInfo)
if !ok {
errs = multierror.Append(
errs, errors.Errorf("item %q does not implement DataStreamInfo", itemPath))
logger.Ctx(ctx).Errorw(
"item does not implement DataStreamInfo; skipping", "path", itemPath)
continue
}
// Relative path given to us in the callback is missing the root
// element. Add to pending set before calling the callback to avoid race
// conditions when the item is completed.
p := itemPath.PopFront().String()
d := &itemDetails{info: ei.Info(), repoPath: itemPath}
progress.put(p, d)
entry := virtualfs.StreamingFileFromReader(e.UUID(), e.ToReader())
if err := cb(ctx, entry); err != nil {
// Kopia's uploader swallows errors in most cases, so if we see
// something here it's probably a big issue and we should return.
errs = multierror.Append(errs, errors.Wrapf(err, "executing callback on %q", itemPath))
return errs.ErrorOrNil()
}
}
}
}
}
// buildKopiaDirs recursively builds a directory hierarchy from the roots up.
// Returned directories are virtualfs.StreamingDirectory.
func buildKopiaDirs(dirName string, dir *treeMap, progress *corsoProgress) (fs.Directory, error) {
// Need to build the directory tree from the leaves up because intermediate
// directories need to have all their entries at creation time.
var childDirs []fs.Entry
for childName, childDir := range dir.childDirs {
child, err := buildKopiaDirs(childName, childDir, progress)
if err != nil {
return nil, err
}
childDirs = append(childDirs, child)
}
return virtualfs.NewStreamingDirectory(
dirName,
getStreamItemFunc(childDirs, dir.collection, progress),
), nil
}
type treeMap struct {
childDirs map[string]*treeMap
collection data.Collection
}
func newTreeMap() *treeMap {
return &treeMap{
childDirs: map[string]*treeMap{},
}
}
// inflateDirTree returns an fs.Directory tree rooted at the oldest common
// ancestor of the streams and uses virtualfs.StaticDirectory for internal nodes
// in the hierarchy. Leaf nodes are virtualfs.StreamingDirectory with the given
// DataCollections.
func inflateDirTree(
ctx context.Context,
collections []data.Collection,
progress *corsoProgress,
) (fs.Directory, error) {
roots := make(map[string]*treeMap)
for _, s := range collections {
if s.FullPath() == nil {
return nil, errors.New("no identifier for collection")
}
itemPath := s.FullPath().Elements()
if len(itemPath) == 0 {
return nil, errors.New("no identifier for collection")
}
dir, ok := roots[itemPath[0]]
if !ok {
dir = newTreeMap()
roots[itemPath[0]] = dir
}
// Single DataCollection with no ancestors.
if len(itemPath) == 1 {
dir.collection = s
continue
}
for _, p := range itemPath[1 : len(itemPath)-1] {
newDir := dir.childDirs[p]
if newDir == nil {
newDir = newTreeMap()
if dir.childDirs == nil {
dir.childDirs = map[string]*treeMap{}
}
dir.childDirs[p] = newDir
}
dir = newDir
}
// At this point we have all the ancestor directories of this DataCollection
// as treeMap objects and `dir` is the parent directory of this
// DataCollection.
end := len(itemPath) - 1
// Make sure this entry doesn't already exist.
tmpDir := dir.childDirs[itemPath[end]]
if tmpDir == nil {
tmpDir = newTreeMap()
dir.childDirs[itemPath[end]] = tmpDir
}
tmpDir.collection = s
}
if len(roots) > 1 {
return nil, errors.New("multiple root directories")
}
var res fs.Directory
for dirName, dir := range roots {
tmp, err := buildKopiaDirs(dirName, dir, progress)
if err != nil {
return nil, err
}
res = tmp
}
return res, nil
}
func (w Wrapper) BackupCollections(
ctx context.Context,
collections []data.Collection,
) (*BackupStats, *details.Details, error) {
if w.c == nil {
return nil, nil, errNotConnected
}
progress := &corsoProgress{
pending: map[string]*itemDetails{},
deets: &details.Details{},
}
dirTree, err := inflateDirTree(ctx, collections, progress)
if err != nil {
return nil, nil, errors.Wrap(err, "building kopia directories")
}
stats, err := w.makeSnapshotWithRoot(ctx, dirTree, progress)
if err != nil {
return nil, nil, err
}
return stats, progress.deets, nil
}
func (w Wrapper) makeSnapshotWithRoot(
ctx context.Context,
root fs.Directory,
progress *corsoProgress,
) (*BackupStats, error) {
var man *snapshot.Manifest
err := repo.WriteSession(
ctx,
w.c,
repo.WriteSessionOptions{
Purpose: "KopiaWrapperBackup",
// Always flush so we don't leak write sessions. Still uses reachability
// for consistency.
FlushOnFailure: true,
},
func(innerCtx context.Context, rw repo.RepositoryWriter) error {
si := snapshot.SourceInfo{
Host: corsoHost,
UserName: corsoUser,
// TODO(ashmrtnz): will this be something useful for snapshot lookups later?
Path: root.Name(),
}
trueVal := policy.OptionalBool(true)
errPolicy := &policy.Policy{
ErrorHandlingPolicy: policy.ErrorHandlingPolicy{
IgnoreFileErrors: &trueVal,
IgnoreDirectoryErrors: &trueVal,
},
}
policyTree, err := policy.TreeForSourceWithOverride(innerCtx, w.c, si, errPolicy)
if err != nil {
err = errors.Wrap(err, "get policy tree")
logger.Ctx(innerCtx).Errorw("kopia backup", err)
return err
}
// By default Uploader is best-attempt.
u := snapshotfs.NewUploader(rw)
progress.UploadProgress = u.Progress
u.Progress = progress
man, err = u.Upload(innerCtx, root, policyTree, si)
if err != nil {
err = errors.Wrap(err, "uploading data")
logger.Ctx(innerCtx).Errorw("kopia backup", err)
return err
}
if _, err := snapshot.SaveSnapshot(innerCtx, rw, man); err != nil {
err = errors.Wrap(err, "saving snapshot")
logger.Ctx(innerCtx).Errorw("kopia backup", err)
return err
}
return nil
},
)
// Telling kopia to always flush may hide other errors if it fails while
// flushing the write session (hence logging above).
if err != nil {
return nil, errors.Wrap(err, "kopia backup")
}
res := manifestToStats(man)
return &res, nil
}
// getEntry returns the item that the restore operation is rooted at. For
// single-item restores, this is the kopia file the data is sourced from. For
// restores of directories or subtrees it is the directory at the root of the
// subtree.
func (w Wrapper) getEntry(
ctx context.Context,
snapshotID string,
itemPath path.Path,
) (fs.Entry, error) {
if itemPath == nil {
return nil, errors.WithStack(errNoRestorePath)
}
man, err := snapshot.LoadSnapshot(ctx, w.c, manifest.ID(snapshotID))
if err != nil {
return nil, errors.Wrap(err, "getting snapshot handle")
}
rootDirEntry, err := snapshotfs.SnapshotRoot(w.c, man)
if err != nil {
return nil, errors.Wrap(err, "getting root directory")
}
// GetNestedEntry handles nil properly.
e, err := snapshotfs.GetNestedEntry(
ctx,
rootDirEntry,
itemPath.PopFront().Elements(),
)
if err != nil {
return nil, errors.Wrap(err, "getting nested object handle")
}
return e, nil
}
// CollectItems pulls data from kopia for the given items in the snapshot with
// ID snapshotID. If isDirectory is true, it returns a slice of DataCollections
// with data from directories in the subtree rooted at itemPath. If isDirectory
// is false it returns a DataCollection (in a slice) with a single item for each
// requested item. If the item does not exist or a file is found when a directory
// is expected (or the opposite) it returns an error.
func (w Wrapper) collectItems(
ctx context.Context,
snapshotID string,
itemPath path.Path,
) ([]data.Collection, error) {
if itemPath == nil {
return nil, errors.WithStack(errNoRestorePath)
}
parentDir, err := itemPath.Dir()
if err != nil {
return nil, errors.Wrap(err, "getting parent directory from path")
}
e, err := w.getEntry(ctx, snapshotID, itemPath)
if err != nil {
return nil, err
}
f, ok := e.(fs.File)
if !ok {
return nil, errors.New("requested object is not a file")
}
c, err := restoreSingleItem(ctx, f, parentDir)
if err != nil {
return nil, err
}
return []data.Collection{c}, nil
}
// RestoreSingleItem looks up the item at the given path in the snapshot with id
// snapshotID. The path should be the full path of the item from the root.
// If the item is a file in kopia then it returns a DataCollection with the item
// as its sole element and DataCollection.FullPath() set to
// split(dirname(itemPath), "/"). If the item does not exist in kopia or is not
// a file an error is returned. The UUID of the returned DataStreams will be the
// name of the kopia file the data is sourced from.
func (w Wrapper) RestoreSingleItem(
ctx context.Context,
snapshotID string,
itemPath path.Path,
) (data.Collection, error) {
c, err := w.collectItems(ctx, snapshotID, itemPath)
if err != nil {
return nil, err
}
return c[0], nil
}
// restoreSingleItem looks up the item at the given path starting from rootDir
// where rootDir is the root of a snapshot. If the item is a file in kopia then
// it returns a DataCollection with the item as its sole element and
// DataCollection.FullPath() set to split(dirname(itemPath), "/"). If the item
// does not exist in kopia or is not a file an error is returned. The UUID of
// the returned DataStreams will be the name of the kopia file the data is
// sourced from.
func restoreSingleItem(
ctx context.Context,
f fs.File,
itemDir path.Path,
) (data.Collection, error) {
r, err := f.Open(ctx)
if err != nil {
return nil, errors.Wrap(err, "opening file")
}
return &kopiaDataCollection{
streams: []data.Stream{
&kopiaDataStream{
uuid: f.Name(),
reader: r,
},
},
path: itemDir,
}, nil
}
// RestoreSingleItem looks up all paths- assuming each is an item declaration,
// not a directory- in the snapshot with id snapshotID. The path should be the
// full path of the item from the root. Returns the results as a slice of single-
// item DataCollections, where the DataCollection.FullPath() matches the path.
// If the item does not exist in kopia or is not a file an error is returned.
// The UUID of the returned DataStreams will be the name of the kopia file the
// data is sourced from.
func (w Wrapper) RestoreMultipleItems(
ctx context.Context,
snapshotID string,
paths []path.Path,
) ([]data.Collection, error) {
if len(paths) == 0 {
return nil, errors.WithStack(errNoRestorePath)
}
var (
dcs = []data.Collection{}
errs *multierror.Error
)
for _, itemPath := range paths {
dc, err := w.RestoreSingleItem(ctx, snapshotID, itemPath)
if err != nil {
errs = multierror.Append(errs, err)
} else {
dcs = append(dcs, dc)
}
}
return dcs, errs.ErrorOrNil()
}
// DeleteSnapshot removes the provided manifest from kopia.
func (w Wrapper) DeleteSnapshot(
ctx context.Context,
snapshotID string,
) error {
mid := manifest.ID(snapshotID)
if len(mid) == 0 {
return errors.New("attempt to delete unidentified snapshot")
}
err := repo.WriteSession(
ctx,
w.c,
repo.WriteSessionOptions{Purpose: "KopiaWrapperBackupDeletion"},
func(innerCtx context.Context, rw repo.RepositoryWriter) error {
if err := rw.DeleteManifest(ctx, mid); err != nil {
return errors.Wrap(err, "deleting snapshot")
}
return nil
},
)
// Telling kopia to always flush may hide other errors if it fails while
// flushing the write session (hence logging above).
if err != nil {
return errors.Wrap(err, "kopia deleting backup manifest")
}
return nil
}