corso/src/internal/kopia/wrapper.go
ashmrtn f71de7a021
Select specific metadata from base snapshot by Reasons base snapshot was picked (#1836)
## Description

Use the Reasons a snapshot was selected to retrieve only the metadata
corresponding to those reasons. This will avoid having multiple versions
of metadata for the same (resource owner, service, category) tuple as
well as pulling in more metadata than required for some backups.

## Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No 

## Type of change

- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🐹 Trivial/Minor

## Issue(s)

* closes #1829 

## Test Plan

<!-- How will this be tested prior to merging.-->
- [x] 💪 Manual
- [ ]  Unit test
- [ ] 💚 E2E
2022-12-17 01:12:42 +00:00

434 lines
11 KiB
Go

package kopia
import (
"context"
"strings"
"github.com/hashicorp/go-multierror"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/data"
D "github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/stats"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
)
const (
// TODO(ashmrtnz): These should be some values from upper layer corso,
// possibly corresponding to who is making the backup.
corsoHost = "corso-host"
corsoUser = "corso"
serializationVersion uint32 = 1
)
// common manifest tags
const (
TagBackupID = "backup-id"
TagBackupCategory = "is-canon-backup"
)
var (
errNotConnected = errors.New("not connected to repo")
errNoRestorePath = errors.New("no restore path given")
)
type BackupStats struct {
SnapshotID string
TotalHashedBytes int64
TotalUploadedBytes int64
TotalFileCount int
CachedFileCount int
UncachedFileCount int
TotalDirectoryCount int
IgnoredErrorCount int
ErrorCount int
Incomplete bool
IncompleteReason string
}
func manifestToStats(
man *snapshot.Manifest,
progress *corsoProgress,
uploadCount *stats.ByteCounter,
) BackupStats {
return BackupStats{
SnapshotID: string(man.ID),
TotalHashedBytes: progress.totalBytes,
TotalUploadedBytes: uploadCount.NumBytes,
TotalFileCount: int(man.Stats.TotalFileCount),
CachedFileCount: int(man.Stats.CachedFiles),
UncachedFileCount: int(man.Stats.NonCachedFiles),
TotalDirectoryCount: int(man.Stats.TotalDirectoryCount),
IgnoredErrorCount: int(man.Stats.IgnoredErrorCount),
ErrorCount: int(man.Stats.ErrorCount),
Incomplete: man.IncompleteReason != "",
IncompleteReason: man.IncompleteReason,
}
}
func NewWrapper(c *conn) (*Wrapper, error) {
if err := c.wrap(); err != nil {
return nil, errors.Wrap(err, "creating Wrapper")
}
return &Wrapper{c}, nil
}
type Wrapper struct {
c *conn
}
func (w *Wrapper) Close(ctx context.Context) error {
if w.c == nil {
return nil
}
err := w.c.Close(ctx)
w.c = nil
return errors.Wrap(err, "closing Wrapper")
}
// BackupCollections takes a set of collections and creates a kopia snapshot
// with the data that they contain. previousSnapshots is used for incremental
// backups and should represent the base snapshot from which metadata is sourced
// from as well as any incomplete snapshot checkpoints that may contain more
// recent data than the base snapshot. The absence of previousSnapshots causes a
// complete backup of all data.
//
// TODO(ashmrtn): Use previousSnapshots parameter.
func (w Wrapper) BackupCollections(
ctx context.Context,
previousSnapshots []*ManifestEntry,
collections []data.Collection,
service path.ServiceType,
oc *OwnersCats,
tags map[string]string,
) (*BackupStats, *details.Details, error) {
if w.c == nil {
return nil, nil, errNotConnected
}
ctx, end := D.Span(ctx, "kopia:backupCollections")
defer end()
if len(collections) == 0 {
return &BackupStats{}, &details.Details{}, nil
}
progress := &corsoProgress{
pending: map[string]*itemDetails{},
deets: &details.Details{},
}
dirTree, err := inflateDirTree(ctx, w.c, nil, collections, progress)
if err != nil {
return nil, nil, errors.Wrap(err, "building kopia directories")
}
s, err := w.makeSnapshotWithRoot(ctx, dirTree, oc, progress, tags)
if err != nil {
return nil, nil, err
}
return s, progress.deets, nil
}
func (w Wrapper) makeSnapshotWithRoot(
ctx context.Context,
root fs.Directory,
oc *OwnersCats,
progress *corsoProgress,
addlTags map[string]string,
) (*BackupStats, error) {
var (
man *snapshot.Manifest
prevSnapEntries = fetchPrevSnapshotManifests(ctx, w.c, oc, nil)
bc = &stats.ByteCounter{}
)
prevSnaps := make([]*snapshot.Manifest, 0, len(prevSnapEntries))
for _, ent := range prevSnapEntries {
prevSnaps = append(prevSnaps, ent.Manifest)
}
err := repo.WriteSession(
ctx,
w.c,
repo.WriteSessionOptions{
Purpose: "KopiaWrapperBackup",
// Always flush so we don't leak write sessions. Still uses reachability
// for consistency.
FlushOnFailure: true,
OnUpload: bc.Count,
},
func(innerCtx context.Context, rw repo.RepositoryWriter) error {
si := snapshot.SourceInfo{
Host: corsoHost,
UserName: corsoUser,
// TODO(ashmrtnz): will this be something useful for snapshot lookups later?
Path: root.Name(),
}
trueVal := policy.OptionalBool(true)
errPolicy := &policy.Policy{
ErrorHandlingPolicy: policy.ErrorHandlingPolicy{
IgnoreFileErrors: &trueVal,
IgnoreDirectoryErrors: &trueVal,
},
}
policyTree, err := policy.TreeForSourceWithOverride(innerCtx, w.c, si, errPolicy)
if err != nil {
err = errors.Wrap(err, "get policy tree")
logger.Ctx(innerCtx).Errorw("kopia backup", err)
return err
}
// By default Uploader is best-attempt.
u := snapshotfs.NewUploader(rw)
progress.UploadProgress = u.Progress
u.Progress = progress
man, err = u.Upload(innerCtx, root, policyTree, si, prevSnaps...)
if err != nil {
err = errors.Wrap(err, "uploading data")
logger.Ctx(innerCtx).Errorw("kopia backup", err)
return err
}
man.Tags = tagsFromStrings(oc)
for k, v := range addlTags {
mk, mv := MakeTagKV(k)
if len(v) == 0 {
v = mv
}
man.Tags[mk] = v
}
if _, err := snapshot.SaveSnapshot(innerCtx, rw, man); err != nil {
err = errors.Wrap(err, "saving snapshot")
logger.Ctx(innerCtx).Errorw("kopia backup", err)
return err
}
return nil
},
)
// Telling kopia to always flush may hide other errors if it fails while
// flushing the write session (hence logging above).
if err != nil {
return nil, errors.Wrap(err, "kopia backup")
}
res := manifestToStats(man, progress, bc)
return &res, nil
}
func (w Wrapper) getSnapshotRoot(
ctx context.Context,
snapshotID string,
) (fs.Entry, error) {
man, err := snapshot.LoadSnapshot(ctx, w.c, manifest.ID(snapshotID))
if err != nil {
return nil, errors.Wrap(err, "getting snapshot handle")
}
rootDirEntry, err := snapshotfs.SnapshotRoot(w.c, man)
return rootDirEntry, errors.Wrap(err, "getting root directory")
}
// getItemStream looks up the item at the given path starting from snapshotRoot.
// If the item is a file in kopia then it returns a data.Stream of the item. If
// the item does not exist in kopia or is not a file an error is returned. The
// UUID of the returned data.Stream will be the name of the kopia file the data
// is sourced from.
func getItemStream(
ctx context.Context,
itemPath path.Path,
snapshotRoot fs.Entry,
bcounter ByteCounter,
) (data.Stream, error) {
if itemPath == nil {
return nil, errors.WithStack(errNoRestorePath)
}
// GetNestedEntry handles nil properly.
e, err := snapshotfs.GetNestedEntry(
ctx,
snapshotRoot,
encodeElements(itemPath.PopFront().Elements()...),
)
if err != nil {
if strings.Contains(err.Error(), "entry not found") {
err = errors.Wrap(ErrNotFound, err.Error())
}
return nil, errors.Wrap(err, "getting nested object handle")
}
f, ok := e.(fs.File)
if !ok {
return nil, errors.New("requested object is not a file")
}
if bcounter != nil {
bcounter.Count(f.Size())
}
r, err := f.Open(ctx)
if err != nil {
return nil, errors.Wrap(err, "opening file")
}
decodedName, err := decodeElement(f.Name())
if err != nil {
return nil, errors.Wrap(err, "decoding file name")
}
return &kopiaDataStream{
uuid: decodedName,
reader: &restoreStreamReader{
ReadCloser: r,
expectedVersion: serializationVersion,
},
size: f.Size() - int64(versionSize),
}, nil
}
type ByteCounter interface {
Count(numBytes int64)
}
// RestoreMultipleItems looks up all paths- assuming each is an item declaration,
// not a directory- in the snapshot with id snapshotID. The path should be the
// full path of the item from the root. Returns the results as a slice of single-
// item DataCollections, where the DataCollection.FullPath() matches the path.
// If the item does not exist in kopia or is not a file an error is returned.
// The UUID of the returned DataStreams will be the name of the kopia file the
// data is sourced from.
func (w Wrapper) RestoreMultipleItems(
ctx context.Context,
snapshotID string,
paths []path.Path,
bcounter ByteCounter,
) ([]data.Collection, error) {
ctx, end := D.Span(ctx, "kopia:restoreMultipleItems")
defer end()
if len(paths) == 0 {
return nil, errors.WithStack(errNoRestorePath)
}
snapshotRoot, err := w.getSnapshotRoot(ctx, snapshotID)
if err != nil {
return nil, err
}
var (
errs *multierror.Error
// Maps short ID of parent path to data collection for that folder.
cols = map[string]*kopiaDataCollection{}
)
for _, itemPath := range paths {
ds, err := getItemStream(ctx, itemPath, snapshotRoot, bcounter)
if err != nil {
errs = multierror.Append(errs, err)
continue
}
parentPath, err := itemPath.Dir()
if err != nil {
errs = multierror.Append(errs, errors.Wrap(err, "making directory collection"))
continue
}
c, ok := cols[parentPath.ShortRef()]
if !ok {
cols[parentPath.ShortRef()] = &kopiaDataCollection{path: parentPath}
c = cols[parentPath.ShortRef()]
}
c.streams = append(c.streams, ds)
}
res := make([]data.Collection, 0, len(cols))
for _, c := range cols {
res = append(res, c)
}
return res, errs.ErrorOrNil()
}
// DeleteSnapshot removes the provided manifest from kopia.
func (w Wrapper) DeleteSnapshot(
ctx context.Context,
snapshotID string,
) error {
mid := manifest.ID(snapshotID)
if len(mid) == 0 {
return errors.New("attempt to delete unidentified snapshot")
}
err := repo.WriteSession(
ctx,
w.c,
repo.WriteSessionOptions{Purpose: "KopiaWrapperBackupDeletion"},
func(innerCtx context.Context, rw repo.RepositoryWriter) error {
if err := rw.DeleteManifest(ctx, mid); err != nil {
return errors.Wrap(err, "deleting snapshot")
}
return nil
},
)
// Telling kopia to always flush may hide other errors if it fails while
// flushing the write session (hence logging above).
if err != nil {
return errors.Wrap(err, "kopia deleting backup manifest")
}
return nil
}
// FetchPrevSnapshotManifests returns a set of manifests for complete and maybe
// incomplete snapshots for the given (resource owner, service, category)
// tuples. Up to two manifests can be returned per tuple: one complete and one
// incomplete. An incomplete manifest may be returned if it is newer than the
// newest complete manifest for the tuple. Manifests are deduped such that if
// multiple tuples match the same manifest it will only be returned once.
// If tags are provided, manifests must include a superset of the k:v pairs
// specified by those tags. Tags should pass their raw values, and will be
// normalized inside the func using MakeTagKV.
func (w Wrapper) FetchPrevSnapshotManifests(
ctx context.Context,
oc *OwnersCats,
tags map[string]string,
) ([]*ManifestEntry, error) {
if w.c == nil {
return nil, errors.WithStack(errNotConnected)
}
return fetchPrevSnapshotManifests(ctx, w.c, oc, tags), nil
}