clean up operation run-do funcs (#2391)

## Description

The operation run-do pattern is currently spaghetti,
The do() call includes a deferred persistence that
occurs before the Run call concludes, causing
us to need panic handlers in multiple levels.

This change normalizes the interacton: do() now
only contains the behavior necessary to process
the backup or restore.  Run() contains all setup
and teardown processes surrounding that.

General pattern looks like this:

Run()
    0. defer panic recovery
    1. create state builders/recorder vars/clients
    2. call do()
    3. persist results of do(), even in case of error.
    
do()
    process step-by-step backup or restore operation
    update builders/recorders along the way
    exit immediately on any error

## Does this PR need a docs update or release note?

- [x]  No 

## Type of change

- [x] 🧹 Tech Debt/Cleanup

## Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-02-03 17:11:59 -07:00 committed by GitHub
parent 38f56cccba
commit 8d04957e5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 192 additions and 157 deletions

View File

@ -2,6 +2,7 @@ package operations
import ( import (
"context" "context"
"fmt"
"runtime/debug" "runtime/debug"
"time" "time"
@ -110,7 +111,21 @@ type detailsWriter interface {
func (op *BackupOperation) Run(ctx context.Context) (err error) { func (op *BackupOperation) Run(ctx context.Context) (err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
err = clues.Wrap(r.(error), "panic recovery").WithClues(ctx).With("stacktrace", debug.Stack()) var rerr error
if re, ok := r.(error); ok {
rerr = re
} else if re, ok := r.(string); ok {
rerr = clues.New(re)
} else {
rerr = clues.New(fmt.Sprintf("%v", r))
}
err = clues.Wrap(rerr, "panic recovery").
WithClues(ctx).
With("stacktrace", string(debug.Stack()))
logger.Ctx(ctx).
With("err", err).
Errorw("backup panic", clues.InErr(err).Slice()...)
} }
}() }()
@ -121,6 +136,18 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
observe.Complete() observe.Complete()
}() }()
// -----
// Setup
// -----
var (
opStats backupStats
startTime = time.Now()
detailsStore = streamstore.New(op.kopia, op.account.ID(), op.Selectors.PathService())
)
op.Results.BackupID = model.StableID(uuid.NewString())
ctx = clues.AddAll( ctx = clues.AddAll(
ctx, ctx,
"tenant_id", op.account.ID(), // TODO: pii "tenant_id", op.account.ID(), // TODO: pii
@ -129,32 +156,6 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
"service", op.Selectors.Service, "service", op.Selectors.Service,
"incremental", op.incremental) "incremental", op.incremental)
if err := op.do(ctx); err != nil {
logger.Ctx(ctx).
With("err", err).
Errorw("backup operation", clues.InErr(err).Slice()...)
return err
}
logger.Ctx(ctx).Infow("completed backup", "results", op.Results)
return nil
}
func (op *BackupOperation) do(ctx context.Context) (err error) {
var (
opStats backupStats
backupDetails *details.Builder
toMerge map[string]path.Path
tenantID = op.account.ID()
startTime = time.Now()
detailsStore = streamstore.New(op.kopia, tenantID, op.Selectors.PathService())
reasons = selectorToReasons(op.Selectors)
)
op.Results.BackupID = model.StableID(uuid.NewString())
op.bus.Event( op.bus.Event(
ctx, ctx,
events.BackupStart, events.BackupStart,
@ -162,122 +163,128 @@ func (op *BackupOperation) do(ctx context.Context) (err error) {
events.StartTime: startTime, events.StartTime: startTime,
events.Service: op.Selectors.Service.String(), events.Service: op.Selectors.Service.String(),
events.BackupID: op.Results.BackupID, events.BackupID: op.Results.BackupID,
}, })
)
// persist operation results to the model store on exit // -----
defer func() { // Execution
// panic recovery here prevents additional errors in op.persistResults() // -----
if r := recover(); r != nil {
err = clues.Wrap(r.(error), "panic recovery").WithClues(ctx).With("stacktrace", debug.Stack())
return
}
err = op.persistResults(startTime, &opStats) deets, err := op.do(
if err != nil { ctx,
op.Errors.Fail(errors.Wrap(err, "persisting backup results")) &opStats,
return detailsStore,
} op.Results.BackupID)
if err != nil {
// No return here! We continue down to persistResults, even in case of failure.
logger.Ctx(ctx).
With("err", err).
Errorw("doing backup", clues.InErr(err).Slice()...)
op.Errors.Fail(errors.Wrap(err, "doing backup"))
opStats.readErr = op.Errors.Err()
}
err = op.createBackupModels( // -----
ctx, // Persistence
detailsStore, // -----
opStats.k.SnapshotID,
backupDetails.Details()) err = op.persistResults(startTime, &opStats)
if err != nil { if err != nil {
op.Errors.Fail(errors.Wrap(err, "persisting backup")) op.Errors.Fail(errors.Wrap(err, "persisting backup results"))
opStats.writeErr = op.Errors.Err() opStats.writeErr = op.Errors.Err()
}
}() return op.Errors.Err()
}
err = op.createBackupModels(
ctx,
detailsStore,
opStats.k.SnapshotID,
op.Results.BackupID,
deets.Details())
if err != nil {
op.Errors.Fail(errors.Wrap(err, "persisting backup"))
opStats.writeErr = op.Errors.Err()
return op.Errors.Err()
}
logger.Ctx(ctx).Infow("completed backup", "results", op.Results)
return nil
}
// do is purely the action of running a backup. All pre/post behavior
// is found in Run().
func (op *BackupOperation) do(
ctx context.Context,
opStats *backupStats,
detailsStore detailsReader,
backupID model.StableID,
) (*details.Builder, error) {
reasons := selectorToReasons(op.Selectors)
// should always be 1, since backups are 1:1 with resourceOwners.
opStats.resourceCount = 1
mans, mdColls, canUseMetaData, err := produceManifestsAndMetadata( mans, mdColls, canUseMetaData, err := produceManifestsAndMetadata(
ctx, ctx,
op.kopia, op.kopia,
op.store, op.store,
reasons, reasons,
tenantID, op.account.ID(),
op.incremental, op.incremental,
op.Errors) op.Errors)
if err != nil { if err != nil {
op.Errors.Fail(errors.Wrap(err, "collecting manifest heuristics")) return nil, errors.Wrap(err, "producing manifests and metadata")
opStats.readErr = op.Errors.Err()
logger.Ctx(ctx).With("err", err).Errorw("producing manifests and metadata", clues.InErr(err).Slice()...)
return opStats.readErr
} }
gc, err := connectToM365(ctx, op.Selectors, op.account) gc, err := connectToM365(ctx, op.Selectors, op.account)
if err != nil { if err != nil {
op.Errors.Fail(errors.Wrap(err, "connecting to m365")) return nil, errors.Wrap(err, "connectng to m365")
opStats.readErr = op.Errors.Err()
logger.Ctx(ctx).With("err", err).Errorw("connectng to m365", clues.InErr(err).Slice()...)
return opStats.readErr
} }
cs, err := produceBackupDataCollections(ctx, gc, op.Selectors, mdColls, op.Options) cs, err := produceBackupDataCollections(ctx, gc, op.Selectors, mdColls, op.Options)
if err != nil { if err != nil {
op.Errors.Fail(errors.Wrap(err, "retrieving data to backup")) return nil, errors.Wrap(err, "producing backup data collections")
opStats.readErr = op.Errors.Err()
logger.Ctx(ctx).With("err", err).Errorw("producing backup data collections", clues.InErr(err).Slice()...)
return opStats.readErr
} }
ctx = clues.Add(ctx, "coll_count", len(cs)) ctx = clues.Add(ctx, "coll_count", len(cs))
opStats.k, backupDetails, toMerge, err = consumeBackupDataCollections( writeStats, deets, toMerge, err := consumeBackupDataCollections(
ctx, ctx,
op.kopia, op.kopia,
tenantID, op.account.ID(),
reasons, reasons,
mans, mans,
cs, cs,
op.Results.BackupID, backupID,
op.incremental && canUseMetaData) op.incremental && canUseMetaData)
if err != nil { if err != nil {
op.Errors.Fail(errors.Wrap(err, "backing up service data")) return nil, errors.Wrap(err, "persisting collection backups")
opStats.writeErr = op.Errors.Err()
logger.Ctx(ctx).With("err", err).Errorw("persisting collection backups", clues.InErr(err).Slice()...)
return opStats.writeErr
} }
if err = mergeDetails( opStats.k = writeStats
err = mergeDetails(
ctx, ctx,
op.store, op.store,
detailsStore, detailsStore,
mans, mans,
toMerge, toMerge,
backupDetails, deets)
); err != nil { if err != nil {
op.Errors.Fail(errors.Wrap(err, "merging backup details")) return nil, errors.Wrap(err, "merging details")
opStats.writeErr = op.Errors.Err()
logger.Ctx(ctx).With("err", err).Errorw("merging details", clues.InErr(err).Slice()...)
return opStats.writeErr
} }
opStats.gc = gc.AwaitStatus() opStats.gc = gc.AwaitStatus()
// TODO(keepers): remove when fault.Errors handles all iterable error aggregation. // TODO(keepers): remove when fault.Errors handles all iterable error aggregation.
if opStats.gc.ErrorCount > 0 { if opStats.gc.ErrorCount > 0 {
merr := multierror.Append(opStats.readErr, errors.Wrap(opStats.gc.Err, "retrieving data")) return nil, opStats.gc.Err
opStats.readErr = merr.ErrorOrNil()
// Need to exit before we set started to true else we'll report no errors.
return opStats.readErr
} }
// should always be 1, since backups are 1:1 with resourceOwners. logger.Ctx(ctx).Debug(gc.PrintableStatus())
opStats.resourceCount = 1
return err return deets, nil
} }
// checker to see if conditions are correct for incremental backup behavior such as // checker to see if conditions are correct for incremental backup behavior such as
@ -520,8 +527,7 @@ func mergeDetails(
ctx, ctx,
model.StableID(bID), model.StableID(bID),
ms, ms,
detailsStore, detailsStore)
)
if err != nil { if err != nil {
return clues.New("fetching base details for backup").WithClues(mctx) return clues.New("fetching base details for backup").WithClues(mctx)
} }
@ -566,8 +572,7 @@ func mergeDetails(
newPath.ShortRef(), newPath.ShortRef(),
newPath.ToBuilder().Dir().ShortRef(), newPath.ToBuilder().Dir().ShortRef(),
itemUpdated, itemUpdated,
item, item)
)
folders := details.FolderEntriesForPath(newPath.ToBuilder().Dir()) folders := details.FolderEntriesForPath(newPath.ToBuilder().Dir())
deets.AddFoldersForItem(folders, item, itemUpdated) deets.AddFoldersForItem(folders, item, itemUpdated)
@ -617,10 +622,10 @@ func (op *BackupOperation) persistResults(
if opStats.gc == nil { if opStats.gc == nil {
op.Status = Failed op.Status = Failed
return errors.New("data population never completed") return errors.New("backup population never completed")
} }
if opStats.readErr == nil && opStats.writeErr == nil && opStats.gc.Successful == 0 { if opStats.gc.Successful == 0 {
op.Status = NoData op.Status = NoData
} }
@ -634,6 +639,7 @@ func (op *BackupOperation) createBackupModels(
ctx context.Context, ctx context.Context,
detailsStore detailsWriter, detailsStore detailsWriter,
snapID string, snapID string,
backupID model.StableID,
backupDetails *details.Details, backupDetails *details.Details,
) error { ) error {
ctx = clues.Add(ctx, "snapshot_id", snapID) ctx = clues.Add(ctx, "snapshot_id", snapID)
@ -650,7 +656,7 @@ func (op *BackupOperation) createBackupModels(
ctx = clues.Add(ctx, "details_id", detailsID) ctx = clues.Add(ctx, "details_id", detailsID)
b := backup.New( b := backup.New(
snapID, detailsID, op.Status.String(), snapID, detailsID, op.Status.String(),
op.Results.BackupID, backupID,
op.Selectors, op.Selectors,
op.Results.ReadWrites, op.Results.ReadWrites,
op.Results.StartAndEndTime, op.Results.StartAndEndTime,

View File

@ -110,10 +110,37 @@ type restorer interface {
func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.Details, err error) { func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.Details, err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
err = clues.Wrap(r.(error), "panic recovery").WithClues(ctx).With("stacktrace", debug.Stack()) var rerr error
if re, ok := r.(error); ok {
rerr = re
} else if re, ok := r.(string); ok {
rerr = clues.New(re)
} else {
rerr = clues.New(fmt.Sprintf("%v", r))
}
err = clues.Wrap(rerr, "panic recovery").
WithClues(ctx).
With("stacktrace", string(debug.Stack()))
logger.Ctx(ctx).
With("err", err).
Errorw("backup panic", clues.InErr(err).Slice()...)
} }
}() }()
var (
opStats = restoreStats{
bytesRead: &stats.ByteCounter{},
restoreID: uuid.NewString(),
}
start = time.Now()
detailsStore = streamstore.New(op.kopia, op.account.ID(), op.Selectors.PathService())
)
// -----
// Setup
// -----
ctx, end := D.Span(ctx, "operations:restore:run") ctx, end := D.Span(ctx, "operations:restore:run")
defer func() { defer func() {
end() end()
@ -127,13 +154,30 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
"backup_id", op.BackupID, "backup_id", op.BackupID,
"service", op.Selectors.Service) "service", op.Selectors.Service)
deets, err := op.do(ctx) // -----
// Execution
// -----
deets, err := op.do(ctx, &opStats, detailsStore, start)
if err != nil { if err != nil {
// No return here! We continue down to persistResults, even in case of failure.
logger.Ctx(ctx). logger.Ctx(ctx).
With("err", err). With("err", err).
Errorw("restore operation", clues.InErr(err).Slice()...) Errorw("doing restore", clues.InErr(err).Slice()...)
op.Errors.Fail(errors.Wrap(err, "doing restore"))
opStats.readErr = op.Errors.Err()
}
return nil, err // -----
// Persistence
// -----
err = op.persistResults(ctx, start, &opStats)
if err != nil {
op.Errors.Fail(errors.Wrap(err, "persisting restore results"))
opStats.writeErr = op.Errors.Err()
return nil, op.Errors.Err()
} }
logger.Ctx(ctx).Infow("completed restore", "results", op.Results) logger.Ctx(ctx).Infow("completed restore", "results", op.Results)
@ -141,30 +185,12 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
return deets, nil return deets, nil
} }
func (op *RestoreOperation) do(ctx context.Context) (restoreDetails *details.Details, err error) { func (op *RestoreOperation) do(
var ( ctx context.Context,
opStats = restoreStats{ opStats *restoreStats,
bytesRead: &stats.ByteCounter{}, detailsStore detailsReader,
restoreID: uuid.NewString(), start time.Time,
} ) (*details.Details, error) {
startTime = time.Now()
)
defer func() {
// panic recovery here prevents additional errors in op.persistResults()
if r := recover(); r != nil {
err = clues.Wrap(r.(error), "panic recovery").WithClues(ctx).With("stacktrace", debug.Stack())
return
}
err = op.persistResults(ctx, startTime, &opStats)
if err != nil {
return
}
}()
detailsStore := streamstore.New(op.kopia, op.account.ID(), op.Selectors.PathService())
bup, deets, err := getBackupAndDetailsFromID( bup, deets, err := getBackupAndDetailsFromID(
ctx, ctx,
op.BackupID, op.BackupID,
@ -172,30 +198,29 @@ func (op *RestoreOperation) do(ctx context.Context) (restoreDetails *details.Det
detailsStore, detailsStore,
) )
if err != nil { if err != nil {
opStats.readErr = errors.Wrap(err, "restore") return nil, errors.Wrap(err, "getting backup and details")
return nil, opStats.readErr
} }
ctx = clues.Add(ctx, "resource_owner", bup.Selector.DiscreteOwner) paths, err := formatDetailsForRestoration(ctx, op.Selectors, deets)
if err != nil {
return nil, errors.Wrap(err, "formatting paths from details")
}
ctx = clues.AddAll(
ctx,
"resource_owner", bup.Selector.DiscreteOwner,
"details_paths", len(paths))
op.bus.Event( op.bus.Event(
ctx, ctx,
events.RestoreStart, events.RestoreStart,
map[string]any{ map[string]any{
events.StartTime: startTime, events.StartTime: start,
events.BackupID: op.BackupID, events.BackupID: op.BackupID,
events.BackupCreateTime: bup.CreationTime, events.BackupCreateTime: bup.CreationTime,
events.RestoreID: opStats.restoreID, events.RestoreID: opStats.restoreID,
}, })
)
paths, err := formatDetailsForRestoration(ctx, op.Selectors, deets)
if err != nil {
opStats.readErr = err
return nil, err
}
ctx = clues.Add(ctx, "details_paths", len(paths))
observe.Message(ctx, observe.Safe(fmt.Sprintf("Discovered %d items in backup %s to restore", len(paths), op.BackupID))) observe.Message(ctx, observe.Safe(fmt.Sprintf("Discovered %d items in backup %s to restore", len(paths), op.BackupID)))
kopiaComplete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Enumerating items in repository")) kopiaComplete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Enumerating items in repository"))
@ -204,41 +229,45 @@ func (op *RestoreOperation) do(ctx context.Context) (restoreDetails *details.Det
dcs, err := op.kopia.RestoreMultipleItems(ctx, bup.SnapshotID, paths, opStats.bytesRead) dcs, err := op.kopia.RestoreMultipleItems(ctx, bup.SnapshotID, paths, opStats.bytesRead)
if err != nil { if err != nil {
opStats.readErr = errors.Wrap(err, "retrieving service data") return nil, errors.Wrap(err, "retrieving collections from repository")
return nil, opStats.readErr
} }
kopiaComplete <- struct{}{} kopiaComplete <- struct{}{}
ctx = clues.Add(ctx, "coll_count", len(dcs)) ctx = clues.Add(ctx, "coll_count", len(dcs))
// should always be 1, since backups are 1:1 with resourceOwners.
opStats.resourceCount = 1
opStats.cs = dcs opStats.cs = dcs
opStats.resourceCount = len(data.ResourceOwnerSet(dcs))
gc, err := connectToM365(ctx, op.Selectors, op.account) gc, err := connectToM365(ctx, op.Selectors, op.account)
if err != nil { if err != nil {
opStats.readErr = errors.Wrap(err, "connecting to M365") return nil, errors.Wrap(err, "connecting to M365")
return nil, opStats.readErr
} }
restoreComplete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Restoring data")) restoreComplete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Restoring data"))
defer closer() defer closer()
defer close(restoreComplete) defer close(restoreComplete)
restoreDetails, err = gc.RestoreDataCollections( restoreDetails, err := gc.RestoreDataCollections(
ctx, ctx,
bup.Version, bup.Version,
op.account, op.account,
op.Selectors, op.Selectors,
op.Destination, op.Destination,
op.Options, op.Options,
dcs, dcs)
)
if err != nil { if err != nil {
opStats.writeErr = errors.Wrap(err, "restoring service data") return nil, errors.Wrap(err, "restoring collections")
return nil, opStats.writeErr
} }
restoreComplete <- struct{}{} restoreComplete <- struct{}{}
opStats.gc = gc.AwaitStatus() opStats.gc = gc.AwaitStatus()
// TODO(keepers): remove when fault.Errors handles all iterable error aggregation.
if opStats.gc.ErrorCount > 0 {
return nil, opStats.gc.Err
}
logger.Ctx(ctx).Debug(gc.PrintableStatus()) logger.Ctx(ctx).Debug(gc.PrintableStatus())
@ -273,10 +302,10 @@ func (op *RestoreOperation) persistResults(
if opStats.gc == nil { if opStats.gc == nil {
op.Status = Failed op.Status = Failed
return errors.New("data restoration never completed") return errors.New("restoration never completed")
} }
if opStats.readErr == nil && opStats.writeErr == nil && opStats.gc.Successful == 0 { if opStats.gc.Successful == 0 {
op.Status = NoData op.Status = NoData
} }