Keepers fbb867a08d
add clues.Hide to various logs (#2994)
Begins hiding PII with clues.Hide injections in our structured logging aggregators.  This is a Non-
comprehensive change, and improvements here will be ongoing.

---

#### Does this PR need a docs update or release note?

- [x] 🕐 Yes, but in a later PR

#### Type of change

- [x] 🤖 Supportability/Tests

#### Issue(s)

* #2024

#### Test Plan

- [x] 💪 Manual
- [x] 💚 E2E
2023-04-04 16:59:31 +00:00

739 lines
19 KiB
Go

package operations
import (
"context"
"time"
"github.com/alcionai/clues"
"github.com/google/uuid"
"github.com/alcionai/corso/src/internal/common"
"github.com/alcionai/corso/src/internal/common/crash"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/model"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/internal/stats"
"github.com/alcionai/corso/src/internal/streamstore"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/backup"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/store"
)
// BackupOperation wraps an operation with backup-specific props.
type BackupOperation struct {
operation
ResourceOwner common.IDNamer
Results BackupResults `json:"results"`
Selectors selectors.Selector `json:"selectors"`
Version string `json:"version"`
account account.Account
bp inject.BackupProducer
// when true, this allows for incremental backups instead of full data pulls
incremental bool
}
// BackupResults aggregate the details of the result of the operation.
type BackupResults struct {
stats.ReadWrites
stats.StartAndEndTime
BackupID model.StableID `json:"backupID"`
}
// NewBackupOperation constructs and validates a backup operation.
func NewBackupOperation(
ctx context.Context,
opts control.Options,
kw *kopia.Wrapper,
sw *store.Wrapper,
bp inject.BackupProducer,
acct account.Account,
selector selectors.Selector,
owner common.IDNamer,
bus events.Eventer,
) (BackupOperation, error) {
op := BackupOperation{
operation: newOperation(opts, bus, kw, sw),
ResourceOwner: owner,
Selectors: selector,
Version: "v0",
account: acct,
incremental: useIncrementalBackup(selector, opts),
bp: bp,
}
if err := op.validate(); err != nil {
return BackupOperation{}, err
}
return op, nil
}
func (op BackupOperation) validate() error {
if op.ResourceOwner == nil {
return clues.New("backup requires a resource owner")
}
if len(op.ResourceOwner.ID()) == 0 {
return clues.New("backup requires a resource owner with a populated ID")
}
if op.bp == nil {
return clues.New("missing backup producer")
}
return op.operation.validate()
}
// aggregates stats from the backup.Run().
// primarily used so that the defer can take in a
// pointer wrapping the values, while those values
// get populated asynchronously.
type backupStats struct {
k *kopia.BackupStats
gc *data.CollectionStats
resourceCount int
}
// ---------------------------------------------------------------------------
// Primary Controller
// ---------------------------------------------------------------------------
// Run begins a synchronous backup operation.
func (op *BackupOperation) Run(ctx context.Context) (err error) {
defer func() {
if crErr := crash.Recovery(ctx, recover()); crErr != nil {
err = crErr
}
}()
ctx, end := diagnostics.Span(ctx, "operations:backup:run")
defer func() {
end()
// wait for the progress display to clean up
observe.Complete()
}()
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
defer flushMetrics()
// -----
// Setup
// -----
var (
opStats backupStats
startTime = time.Now()
sstore = streamstore.NewStreamer(op.kopia, op.account.ID(), op.Selectors.PathService())
)
op.Results.BackupID = model.StableID(uuid.NewString())
ctx = clues.Add(
ctx,
"tenant_id", clues.Hide(op.account.ID()),
"resource_owner", clues.Hide(op.ResourceOwner),
"backup_id", op.Results.BackupID,
"service", op.Selectors.Service,
"incremental", op.incremental)
op.bus.Event(
ctx,
events.BackupStart,
map[string]any{
events.StartTime: startTime,
events.Service: op.Selectors.Service.String(),
events.BackupID: op.Results.BackupID,
})
// -----
// Execution
// -----
observe.Message(ctx, "Backing Up", observe.Bullet, clues.Hide(op.ResourceOwner.Name()))
deets, err := op.do(
ctx,
&opStats,
sstore,
op.Results.BackupID)
if err != nil {
// No return here! We continue down to persistResults, even in case of failure.
logger.Ctx(ctx).
With("err", err).
Errorw("running backup", clues.InErr(err).Slice()...)
op.Errors.Fail(clues.Wrap(err, "running backup"))
}
finalizeErrorHandling(ctx, op.Options, op.Errors, "running backup")
LogFaultErrors(ctx, op.Errors.Errors(), "running backup")
// -----
// Persistence
// -----
err = op.persistResults(startTime, &opStats)
if err != nil {
op.Errors.Fail(clues.Wrap(err, "persisting backup results"))
return op.Errors.Failure()
}
// force exit without backup in certain cases.
// see: https://github.com/alcionai/corso/pull/2510#discussion_r1113532530
for _, e := range op.Errors.Recovered() {
if clues.HasLabel(e, fault.LabelForceNoBackupCreation) {
logger.Ctx(ctx).
With("error", e).
With(clues.InErr(err).Slice()...).
Infow("completed backup; conditional error forcing exit without model persistence",
"results", op.Results)
return op.Errors.Fail(clues.Wrap(e, "forced backup")).Failure()
}
}
err = op.createBackupModels(
ctx,
sstore,
opStats.k.SnapshotID,
op.Results.BackupID,
deets.Details())
if err != nil {
op.Errors.Fail(clues.Wrap(err, "persisting backup"))
return op.Errors.Failure()
}
logger.Ctx(ctx).Infow("completed backup", "results", op.Results)
return nil
}
// do is purely the action of running a backup. All pre/post behavior
// is found in Run().
func (op *BackupOperation) do(
ctx context.Context,
opStats *backupStats,
detailsStore streamstore.Streamer,
backupID model.StableID,
) (*details.Builder, error) {
reasons := selectorToReasons(op.Selectors)
logger.Ctx(ctx).With("selectors", op.Selectors).Info("backing up selection")
// should always be 1, since backups are 1:1 with resourceOwners.
opStats.resourceCount = 1
mans, mdColls, canUseMetaData, err := produceManifestsAndMetadata(
ctx,
op.kopia,
op.store,
reasons,
op.account.ID(),
op.incremental,
op.Errors)
if err != nil {
return nil, clues.Wrap(err, "producing manifests and metadata")
}
cs, excludes, err := produceBackupDataCollections(
ctx,
op.bp,
op.ResourceOwner,
op.Selectors,
mdColls,
op.Options,
op.Errors)
if err != nil {
return nil, clues.Wrap(err, "producing backup data collections")
}
ctx = clues.Add(ctx, "coll_count", len(cs))
writeStats, deets, toMerge, err := consumeBackupCollections(
ctx,
op.kopia,
op.account.ID(),
reasons,
mans,
cs,
excludes,
backupID,
op.incremental && canUseMetaData,
op.Errors)
if err != nil {
return nil, clues.Wrap(err, "persisting collection backups")
}
opStats.k = writeStats
err = mergeDetails(
ctx,
op.store,
detailsStore,
mans,
toMerge,
deets,
op.Errors)
if err != nil {
return nil, clues.Wrap(err, "merging details")
}
opStats.gc = op.bp.Wait()
logger.Ctx(ctx).Debug(opStats.gc)
return deets, nil
}
// checker to see if conditions are correct for incremental backup behavior such as
// retrieving metadata like delta tokens and previous paths.
func useIncrementalBackup(sel selectors.Selector, opts control.Options) bool {
enabled := !opts.ToggleFeatures.DisableIncrementals
if sel.Service == selectors.ServiceExchange ||
sel.Service == selectors.ServiceOneDrive {
return enabled
}
return false
}
// ---------------------------------------------------------------------------
// Producer funcs
// ---------------------------------------------------------------------------
// calls the producer to generate collections of data to backup
func produceBackupDataCollections(
ctx context.Context,
bp inject.BackupProducer,
resourceOwner common.IDNamer,
sel selectors.Selector,
metadata []data.RestoreCollection,
ctrlOpts control.Options,
errs *fault.Bus,
) ([]data.BackupCollection, map[string]map[string]struct{}, error) {
complete, closer := observe.MessageWithCompletion(ctx, "Discovering items to backup")
defer func() {
complete <- struct{}{}
close(complete)
closer()
}()
return bp.ProduceBackupCollections(ctx, resourceOwner, sel, metadata, ctrlOpts, errs)
}
// ---------------------------------------------------------------------------
// Consumer funcs
// ---------------------------------------------------------------------------
func selectorToReasons(sel selectors.Selector) []kopia.Reason {
service := sel.PathService()
reasons := []kopia.Reason{}
pcs, err := sel.PathCategories()
if err != nil {
// This is technically safe, it's just that the resulting backup won't be
// usable as a base for future incremental backups.
return nil
}
for _, sl := range [][]path.CategoryType{pcs.Includes, pcs.Filters} {
for _, cat := range sl {
reasons = append(reasons, kopia.Reason{
ResourceOwner: sel.DiscreteOwner,
Service: service,
Category: cat,
})
}
}
return reasons
}
func builderFromReason(ctx context.Context, tenant string, r kopia.Reason) (*path.Builder, error) {
ctx = clues.Add(ctx, "category", r.Category.String())
// This is hacky, but we want the path package to format the path the right
// way (e.x. proper order for service, category, etc), but we don't care about
// the folders after the prefix.
p, err := path.Build(
tenant,
r.ResourceOwner,
r.Service,
r.Category,
false,
"tmp")
if err != nil {
return nil, clues.Wrap(err, "building path").WithClues(ctx)
}
return p.ToBuilder().Dir(), nil
}
// calls kopia to backup the collections of data
func consumeBackupCollections(
ctx context.Context,
bc inject.BackupConsumer,
tenantID string,
reasons []kopia.Reason,
mans []*kopia.ManifestEntry,
cs []data.BackupCollection,
excludes map[string]map[string]struct{},
backupID model.StableID,
isIncremental bool,
errs *fault.Bus,
) (*kopia.BackupStats, *details.Builder, map[string]kopia.PrevRefs, error) {
complete, closer := observe.MessageWithCompletion(ctx, "Backing up data")
defer func() {
complete <- struct{}{}
close(complete)
closer()
}()
tags := map[string]string{
kopia.TagBackupID: string(backupID),
kopia.TagBackupCategory: "",
}
for _, reason := range reasons {
for _, k := range reason.TagKeys() {
tags[k] = ""
}
}
bases := make([]kopia.IncrementalBase, 0, len(mans))
for _, m := range mans {
paths := make([]*path.Builder, 0, len(m.Reasons))
services := map[string]struct{}{}
categories := map[string]struct{}{}
for _, reason := range m.Reasons {
pb, err := builderFromReason(ctx, tenantID, reason)
if err != nil {
return nil, nil, nil, clues.Wrap(err, "getting subtree paths for bases")
}
paths = append(paths, pb)
services[reason.Service.String()] = struct{}{}
categories[reason.Category.String()] = struct{}{}
}
bases = append(bases, kopia.IncrementalBase{
Manifest: m.Manifest,
SubtreePaths: paths,
})
svcs := make([]string, 0, len(services))
for k := range services {
svcs = append(svcs, k)
}
cats := make([]string, 0, len(categories))
for k := range categories {
cats = append(cats, k)
}
mbID, ok := m.GetTag(kopia.TagBackupID)
if !ok {
mbID = "no_backup_id_tag"
}
logger.Ctx(ctx).Infow(
"using base for backup",
"base_snapshot_id", m.ID,
"services", svcs,
"categories", cats,
"base_backup_id", mbID)
}
kopiaStats, deets, itemsSourcedFromBase, err := bc.ConsumeBackupCollections(
ctx,
bases,
cs,
excludes,
tags,
isIncremental,
errs)
if err != nil {
if kopiaStats == nil {
return nil, nil, nil, err
}
return nil, nil, nil, clues.Stack(err).With(
"kopia_errors", kopiaStats.ErrorCount,
"kopia_ignored_errors", kopiaStats.IgnoredErrorCount)
}
if kopiaStats.ErrorCount > 0 ||
(kopiaStats.IgnoredErrorCount > kopiaStats.ExpectedIgnoredErrorCount) {
err = clues.New("building kopia snapshot").With(
"kopia_errors", kopiaStats.ErrorCount,
"kopia_ignored_errors", kopiaStats.IgnoredErrorCount)
}
return kopiaStats, deets, itemsSourcedFromBase, err
}
func matchesReason(reasons []kopia.Reason, p path.Path) bool {
for _, reason := range reasons {
if p.ResourceOwner() == reason.ResourceOwner &&
p.Service() == reason.Service &&
p.Category() == reason.Category {
return true
}
}
return false
}
func mergeDetails(
ctx context.Context,
ms *store.Wrapper,
detailsStore streamstore.Streamer,
mans []*kopia.ManifestEntry,
shortRefsFromPrevBackup map[string]kopia.PrevRefs,
deets *details.Builder,
errs *fault.Bus,
) error {
// Don't bother loading any of the base details if there's nothing we need to merge.
if len(shortRefsFromPrevBackup) == 0 {
return nil
}
var addedEntries int
for _, man := range mans {
var (
mctx = clues.Add(ctx, "base_manifest_id", man.ID)
manifestAddedEntries int
)
// For now skip snapshots that aren't complete. We will need to revisit this
// when we tackle restartability.
if len(man.IncompleteReason) > 0 {
continue
}
bID, ok := man.GetTag(kopia.TagBackupID)
if !ok {
return clues.New("no backup ID in snapshot manifest").WithClues(mctx)
}
mctx = clues.Add(mctx, "base_manifest_backup_id", bID)
_, baseDeets, err := getBackupAndDetailsFromID(
mctx,
model.StableID(bID),
ms,
detailsStore,
errs)
if err != nil {
return clues.New("fetching base details for backup").WithClues(mctx)
}
for _, entry := range baseDeets.Items() {
rr, err := path.FromDataLayerPath(entry.RepoRef, true)
if err != nil {
return clues.New("parsing base item info path").
WithClues(mctx).
With("repo_ref", entry.RepoRef) // todo: pii, path needs concealer compliance
}
// Although this base has an entry it may not be the most recent. Check
// the reasons a snapshot was returned to ensure we only choose the recent
// entries.
//
// TODO(ashmrtn): This logic will need expanded to cover entries from
// checkpoints if we start doing kopia-assisted incrementals for those.
if !matchesReason(man.Reasons, rr) {
continue
}
prev, ok := shortRefsFromPrevBackup[rr.ShortRef()]
if !ok {
// This entry was not sourced from a base snapshot or cached from a
// previous backup, skip it.
continue
}
newPath := prev.Repo
newLoc := prev.Location
// Fixup paths in the item.
item := entry.ItemInfo
if err := details.UpdateItem(&item, newPath, newLoc); err != nil {
return clues.New("updating item details").WithClues(mctx)
}
// TODO(ashmrtn): This may need updated if we start using this merge
// strategry for items that were cached in kopia.
var (
itemUpdated = newPath.String() != rr.String()
newLocStr string
locBuilder *path.Builder
)
if newLoc != nil {
locBuilder = newLoc.ToBuilder()
newLocStr = newLoc.Folder(true)
itemUpdated = itemUpdated || newLocStr != entry.LocationRef
}
err = deets.Add(
newPath.String(),
newPath.ShortRef(),
newPath.ToBuilder().Dir().ShortRef(),
newLocStr,
itemUpdated,
item)
if err != nil {
return clues.Wrap(err, "adding item to details")
}
folders := details.FolderEntriesForPath(newPath.ToBuilder().Dir(), locBuilder)
deets.AddFoldersForItem(folders, item, itemUpdated)
// Track how many entries we added so that we know if we got them all when
// we're done.
addedEntries++
manifestAddedEntries++
}
logger.Ctx(mctx).Infow(
"merged details with base manifest",
"base_item_count_unfiltered", len(baseDeets.Items()),
"base_item_count_added", manifestAddedEntries)
}
if addedEntries != len(shortRefsFromPrevBackup) {
return clues.New("incomplete migration of backup details").
WithClues(ctx).
With("item_count", addedEntries, "expected_item_count", len(shortRefsFromPrevBackup))
}
return nil
}
// writes the results metrics to the operation results.
// later stored in the manifest using createBackupModels.
func (op *BackupOperation) persistResults(
started time.Time,
opStats *backupStats,
) error {
op.Results.StartedAt = started
op.Results.CompletedAt = time.Now()
op.Status = Completed
if op.Errors.Failure() != nil {
op.Status = Failed
}
if opStats.k == nil {
op.Status = Failed
return clues.New("backup persistence never completed")
}
op.Results.BytesRead = opStats.k.TotalHashedBytes
op.Results.BytesUploaded = opStats.k.TotalUploadedBytes
op.Results.ItemsWritten = opStats.k.TotalFileCount
op.Results.ResourceOwners = opStats.resourceCount
if opStats.gc == nil {
op.Status = Failed
return clues.New("backup population never completed")
}
if op.Status != Failed && opStats.gc.IsZero() {
op.Status = NoData
}
op.Results.ItemsRead = opStats.gc.Successes
return op.Errors.Failure()
}
// stores the operation details, results, and selectors in the backup manifest.
func (op *BackupOperation) createBackupModels(
ctx context.Context,
sscw streamstore.CollectorWriter,
snapID string,
backupID model.StableID,
deets *details.Details,
) error {
ctx = clues.Add(ctx, "snapshot_id", snapID, "backup_id", backupID)
// generate a new fault bus so that we can maintain clean
// separation between the errors we serialize and those that
// are generated during the serialization process.
errs := fault.New(true)
if deets == nil {
return clues.New("no backup details to record").WithClues(ctx)
}
ctx = clues.Add(ctx, "details_entry_count", len(deets.Entries))
err := sscw.Collect(ctx, streamstore.DetailsCollector(deets))
if err != nil {
return clues.Wrap(err, "collecting details for persistence").WithClues(ctx)
}
err = sscw.Collect(ctx, streamstore.FaultErrorsCollector(op.Errors.Errors()))
if err != nil {
return clues.Wrap(err, "collecting errors for persistence").WithClues(ctx)
}
ssid, err := sscw.Write(ctx, errs)
if err != nil {
return clues.Wrap(err, "persisting details and errors").WithClues(ctx)
}
ctx = clues.Add(ctx, "streamstore_snapshot_id", ssid)
b := backup.New(
snapID, ssid,
op.Status.String(),
backupID,
op.Selectors,
op.ResourceOwner.ID(),
op.ResourceOwner.Name(),
op.Results.ReadWrites,
op.Results.StartAndEndTime,
op.Errors.Errors())
logger.Ctx(ctx).Info("creating new backup")
if err = op.store.Put(ctx, model.BackupSchema, b); err != nil {
return clues.Wrap(err, "creating backup model").WithClues(ctx)
}
op.bus.Event(
ctx,
events.BackupEnd,
map[string]any{
events.BackupID: b.ID,
events.DataStored: op.Results.BytesUploaded,
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
events.EndTime: common.FormatTime(op.Results.CompletedAt),
events.Resources: op.Results.ResourceOwners,
events.Service: op.Selectors.PathService().String(),
events.StartTime: common.FormatTime(op.Results.StartedAt),
events.Status: op.Status.String(),
})
return nil
}