diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index e6b8c767a..ec47bae1c 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -2,6 +2,7 @@ package operations import ( "context" + "fmt" "runtime/debug" "time" @@ -110,7 +111,21 @@ type detailsWriter interface { func (op *BackupOperation) Run(ctx context.Context) (err error) { defer func() { if r := recover(); r != nil { - err = clues.Wrap(r.(error), "panic recovery").WithClues(ctx).With("stacktrace", debug.Stack()) + var rerr error + if re, ok := r.(error); ok { + rerr = re + } else if re, ok := r.(string); ok { + rerr = clues.New(re) + } else { + rerr = clues.New(fmt.Sprintf("%v", r)) + } + + err = clues.Wrap(rerr, "panic recovery"). + WithClues(ctx). + With("stacktrace", string(debug.Stack())) + logger.Ctx(ctx). + With("err", err). + Errorw("backup panic", clues.InErr(err).Slice()...) } }() @@ -121,6 +136,18 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { observe.Complete() }() + // ----- + // Setup + // ----- + + var ( + opStats backupStats + startTime = time.Now() + detailsStore = streamstore.New(op.kopia, op.account.ID(), op.Selectors.PathService()) + ) + + op.Results.BackupID = model.StableID(uuid.NewString()) + ctx = clues.AddAll( ctx, "tenant_id", op.account.ID(), // TODO: pii @@ -129,32 +156,6 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { "service", op.Selectors.Service, "incremental", op.incremental) - if err := op.do(ctx); err != nil { - logger.Ctx(ctx). - With("err", err). - Errorw("backup operation", clues.InErr(err).Slice()...) - - return err - } - - logger.Ctx(ctx).Infow("completed backup", "results", op.Results) - - return nil -} - -func (op *BackupOperation) do(ctx context.Context) (err error) { - var ( - opStats backupStats - backupDetails *details.Builder - toMerge map[string]path.Path - tenantID = op.account.ID() - startTime = time.Now() - detailsStore = streamstore.New(op.kopia, tenantID, op.Selectors.PathService()) - reasons = selectorToReasons(op.Selectors) - ) - - op.Results.BackupID = model.StableID(uuid.NewString()) - op.bus.Event( ctx, events.BackupStart, @@ -162,122 +163,128 @@ func (op *BackupOperation) do(ctx context.Context) (err error) { events.StartTime: startTime, events.Service: op.Selectors.Service.String(), events.BackupID: op.Results.BackupID, - }, - ) + }) - // persist operation results to the model store on exit - defer func() { - // panic recovery here prevents additional errors in op.persistResults() - if r := recover(); r != nil { - err = clues.Wrap(r.(error), "panic recovery").WithClues(ctx).With("stacktrace", debug.Stack()) - return - } + // ----- + // Execution + // ----- - err = op.persistResults(startTime, &opStats) - if err != nil { - op.Errors.Fail(errors.Wrap(err, "persisting backup results")) - return - } + deets, err := op.do( + ctx, + &opStats, + detailsStore, + op.Results.BackupID) + if err != nil { + // No return here! We continue down to persistResults, even in case of failure. + logger.Ctx(ctx). + With("err", err). + Errorw("doing backup", clues.InErr(err).Slice()...) + op.Errors.Fail(errors.Wrap(err, "doing backup")) + opStats.readErr = op.Errors.Err() + } - err = op.createBackupModels( - ctx, - detailsStore, - opStats.k.SnapshotID, - backupDetails.Details()) - if err != nil { - op.Errors.Fail(errors.Wrap(err, "persisting backup")) - opStats.writeErr = op.Errors.Err() - } - }() + // ----- + // Persistence + // ----- + + err = op.persistResults(startTime, &opStats) + if err != nil { + op.Errors.Fail(errors.Wrap(err, "persisting backup results")) + opStats.writeErr = op.Errors.Err() + + return op.Errors.Err() + } + + err = op.createBackupModels( + ctx, + detailsStore, + opStats.k.SnapshotID, + op.Results.BackupID, + deets.Details()) + if err != nil { + op.Errors.Fail(errors.Wrap(err, "persisting backup")) + opStats.writeErr = op.Errors.Err() + + return op.Errors.Err() + } + + logger.Ctx(ctx).Infow("completed backup", "results", op.Results) + + return nil +} + +// do is purely the action of running a backup. All pre/post behavior +// is found in Run(). +func (op *BackupOperation) do( + ctx context.Context, + opStats *backupStats, + detailsStore detailsReader, + backupID model.StableID, +) (*details.Builder, error) { + reasons := selectorToReasons(op.Selectors) + + // should always be 1, since backups are 1:1 with resourceOwners. + opStats.resourceCount = 1 mans, mdColls, canUseMetaData, err := produceManifestsAndMetadata( ctx, op.kopia, op.store, reasons, - tenantID, + op.account.ID(), op.incremental, op.Errors) if err != nil { - op.Errors.Fail(errors.Wrap(err, "collecting manifest heuristics")) - opStats.readErr = op.Errors.Err() - - logger.Ctx(ctx).With("err", err).Errorw("producing manifests and metadata", clues.InErr(err).Slice()...) - - return opStats.readErr + return nil, errors.Wrap(err, "producing manifests and metadata") } gc, err := connectToM365(ctx, op.Selectors, op.account) if err != nil { - op.Errors.Fail(errors.Wrap(err, "connecting to m365")) - opStats.readErr = op.Errors.Err() - - logger.Ctx(ctx).With("err", err).Errorw("connectng to m365", clues.InErr(err).Slice()...) - - return opStats.readErr + return nil, errors.Wrap(err, "connectng to m365") } cs, err := produceBackupDataCollections(ctx, gc, op.Selectors, mdColls, op.Options) if err != nil { - op.Errors.Fail(errors.Wrap(err, "retrieving data to backup")) - opStats.readErr = op.Errors.Err() - - logger.Ctx(ctx).With("err", err).Errorw("producing backup data collections", clues.InErr(err).Slice()...) - - return opStats.readErr + return nil, errors.Wrap(err, "producing backup data collections") } ctx = clues.Add(ctx, "coll_count", len(cs)) - opStats.k, backupDetails, toMerge, err = consumeBackupDataCollections( + writeStats, deets, toMerge, err := consumeBackupDataCollections( ctx, op.kopia, - tenantID, + op.account.ID(), reasons, mans, cs, - op.Results.BackupID, + backupID, op.incremental && canUseMetaData) if err != nil { - op.Errors.Fail(errors.Wrap(err, "backing up service data")) - opStats.writeErr = op.Errors.Err() - - logger.Ctx(ctx).With("err", err).Errorw("persisting collection backups", clues.InErr(err).Slice()...) - - return opStats.writeErr + return nil, errors.Wrap(err, "persisting collection backups") } - if err = mergeDetails( + opStats.k = writeStats + + err = mergeDetails( ctx, op.store, detailsStore, mans, toMerge, - backupDetails, - ); err != nil { - op.Errors.Fail(errors.Wrap(err, "merging backup details")) - opStats.writeErr = op.Errors.Err() - - logger.Ctx(ctx).With("err", err).Errorw("merging details", clues.InErr(err).Slice()...) - - return opStats.writeErr + deets) + if err != nil { + return nil, errors.Wrap(err, "merging details") } opStats.gc = gc.AwaitStatus() - // TODO(keepers): remove when fault.Errors handles all iterable error aggregation. if opStats.gc.ErrorCount > 0 { - merr := multierror.Append(opStats.readErr, errors.Wrap(opStats.gc.Err, "retrieving data")) - opStats.readErr = merr.ErrorOrNil() - - // Need to exit before we set started to true else we'll report no errors. - return opStats.readErr + return nil, opStats.gc.Err } - // should always be 1, since backups are 1:1 with resourceOwners. - opStats.resourceCount = 1 + logger.Ctx(ctx).Debug(gc.PrintableStatus()) - return err + return deets, nil } // checker to see if conditions are correct for incremental backup behavior such as @@ -520,8 +527,7 @@ func mergeDetails( ctx, model.StableID(bID), ms, - detailsStore, - ) + detailsStore) if err != nil { return clues.New("fetching base details for backup").WithClues(mctx) } @@ -566,8 +572,7 @@ func mergeDetails( newPath.ShortRef(), newPath.ToBuilder().Dir().ShortRef(), itemUpdated, - item, - ) + item) folders := details.FolderEntriesForPath(newPath.ToBuilder().Dir()) deets.AddFoldersForItem(folders, item, itemUpdated) @@ -617,10 +622,10 @@ func (op *BackupOperation) persistResults( if opStats.gc == nil { op.Status = Failed - return errors.New("data population never completed") + return errors.New("backup population never completed") } - if opStats.readErr == nil && opStats.writeErr == nil && opStats.gc.Successful == 0 { + if opStats.gc.Successful == 0 { op.Status = NoData } @@ -634,6 +639,7 @@ func (op *BackupOperation) createBackupModels( ctx context.Context, detailsStore detailsWriter, snapID string, + backupID model.StableID, backupDetails *details.Details, ) error { ctx = clues.Add(ctx, "snapshot_id", snapID) @@ -650,7 +656,7 @@ func (op *BackupOperation) createBackupModels( ctx = clues.Add(ctx, "details_id", detailsID) b := backup.New( snapID, detailsID, op.Status.String(), - op.Results.BackupID, + backupID, op.Selectors, op.Results.ReadWrites, op.Results.StartAndEndTime, diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index f90e3c3c8..dafb8670e 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -110,10 +110,37 @@ type restorer interface { func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.Details, err error) { defer func() { if r := recover(); r != nil { - err = clues.Wrap(r.(error), "panic recovery").WithClues(ctx).With("stacktrace", debug.Stack()) + var rerr error + if re, ok := r.(error); ok { + rerr = re + } else if re, ok := r.(string); ok { + rerr = clues.New(re) + } else { + rerr = clues.New(fmt.Sprintf("%v", r)) + } + + err = clues.Wrap(rerr, "panic recovery"). + WithClues(ctx). + With("stacktrace", string(debug.Stack())) + logger.Ctx(ctx). + With("err", err). + Errorw("backup panic", clues.InErr(err).Slice()...) } }() + var ( + opStats = restoreStats{ + bytesRead: &stats.ByteCounter{}, + restoreID: uuid.NewString(), + } + start = time.Now() + detailsStore = streamstore.New(op.kopia, op.account.ID(), op.Selectors.PathService()) + ) + + // ----- + // Setup + // ----- + ctx, end := D.Span(ctx, "operations:restore:run") defer func() { end() @@ -127,13 +154,30 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De "backup_id", op.BackupID, "service", op.Selectors.Service) - deets, err := op.do(ctx) + // ----- + // Execution + // ----- + + deets, err := op.do(ctx, &opStats, detailsStore, start) if err != nil { + // No return here! We continue down to persistResults, even in case of failure. logger.Ctx(ctx). With("err", err). - Errorw("restore operation", clues.InErr(err).Slice()...) + Errorw("doing restore", clues.InErr(err).Slice()...) + op.Errors.Fail(errors.Wrap(err, "doing restore")) + opStats.readErr = op.Errors.Err() + } - return nil, err + // ----- + // Persistence + // ----- + + err = op.persistResults(ctx, start, &opStats) + if err != nil { + op.Errors.Fail(errors.Wrap(err, "persisting restore results")) + opStats.writeErr = op.Errors.Err() + + return nil, op.Errors.Err() } logger.Ctx(ctx).Infow("completed restore", "results", op.Results) @@ -141,30 +185,12 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De return deets, nil } -func (op *RestoreOperation) do(ctx context.Context) (restoreDetails *details.Details, err error) { - var ( - opStats = restoreStats{ - bytesRead: &stats.ByteCounter{}, - restoreID: uuid.NewString(), - } - startTime = time.Now() - ) - - defer func() { - // panic recovery here prevents additional errors in op.persistResults() - if r := recover(); r != nil { - err = clues.Wrap(r.(error), "panic recovery").WithClues(ctx).With("stacktrace", debug.Stack()) - return - } - - err = op.persistResults(ctx, startTime, &opStats) - if err != nil { - return - } - }() - - detailsStore := streamstore.New(op.kopia, op.account.ID(), op.Selectors.PathService()) - +func (op *RestoreOperation) do( + ctx context.Context, + opStats *restoreStats, + detailsStore detailsReader, + start time.Time, +) (*details.Details, error) { bup, deets, err := getBackupAndDetailsFromID( ctx, op.BackupID, @@ -172,30 +198,29 @@ func (op *RestoreOperation) do(ctx context.Context) (restoreDetails *details.Det detailsStore, ) if err != nil { - opStats.readErr = errors.Wrap(err, "restore") - return nil, opStats.readErr + return nil, errors.Wrap(err, "getting backup and details") } - ctx = clues.Add(ctx, "resource_owner", bup.Selector.DiscreteOwner) + paths, err := formatDetailsForRestoration(ctx, op.Selectors, deets) + if err != nil { + return nil, errors.Wrap(err, "formatting paths from details") + } + + ctx = clues.AddAll( + ctx, + "resource_owner", bup.Selector.DiscreteOwner, + "details_paths", len(paths)) op.bus.Event( ctx, events.RestoreStart, map[string]any{ - events.StartTime: startTime, + events.StartTime: start, events.BackupID: op.BackupID, events.BackupCreateTime: bup.CreationTime, events.RestoreID: opStats.restoreID, - }, - ) + }) - paths, err := formatDetailsForRestoration(ctx, op.Selectors, deets) - if err != nil { - opStats.readErr = err - return nil, err - } - - ctx = clues.Add(ctx, "details_paths", len(paths)) observe.Message(ctx, observe.Safe(fmt.Sprintf("Discovered %d items in backup %s to restore", len(paths), op.BackupID))) kopiaComplete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Enumerating items in repository")) @@ -204,41 +229,45 @@ func (op *RestoreOperation) do(ctx context.Context) (restoreDetails *details.Det dcs, err := op.kopia.RestoreMultipleItems(ctx, bup.SnapshotID, paths, opStats.bytesRead) if err != nil { - opStats.readErr = errors.Wrap(err, "retrieving service data") - return nil, opStats.readErr + return nil, errors.Wrap(err, "retrieving collections from repository") } + kopiaComplete <- struct{}{} ctx = clues.Add(ctx, "coll_count", len(dcs)) + + // should always be 1, since backups are 1:1 with resourceOwners. + opStats.resourceCount = 1 opStats.cs = dcs - opStats.resourceCount = len(data.ResourceOwnerSet(dcs)) gc, err := connectToM365(ctx, op.Selectors, op.account) if err != nil { - opStats.readErr = errors.Wrap(err, "connecting to M365") - return nil, opStats.readErr + return nil, errors.Wrap(err, "connecting to M365") } restoreComplete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Restoring data")) defer closer() defer close(restoreComplete) - restoreDetails, err = gc.RestoreDataCollections( + restoreDetails, err := gc.RestoreDataCollections( ctx, bup.Version, op.account, op.Selectors, op.Destination, op.Options, - dcs, - ) + dcs) if err != nil { - opStats.writeErr = errors.Wrap(err, "restoring service data") - return nil, opStats.writeErr + return nil, errors.Wrap(err, "restoring collections") } + restoreComplete <- struct{}{} opStats.gc = gc.AwaitStatus() + // TODO(keepers): remove when fault.Errors handles all iterable error aggregation. + if opStats.gc.ErrorCount > 0 { + return nil, opStats.gc.Err + } logger.Ctx(ctx).Debug(gc.PrintableStatus()) @@ -273,10 +302,10 @@ func (op *RestoreOperation) persistResults( if opStats.gc == nil { op.Status = Failed - return errors.New("data restoration never completed") + return errors.New("restoration never completed") } - if opStats.readErr == nil && opStats.writeErr == nil && opStats.gc.Successful == 0 { + if opStats.gc.Successful == 0 { op.Status = NoData }