From 4e1b1d8fc6cd41184bb66ca984e9d4c5dd9849ee Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Thu, 10 Aug 2023 08:46:35 +0530 Subject: [PATCH] Persist backup models for assist backups (#3905) This PR adds changes to persist backup models for assist backups. Primary reason for doing this is to ensure we don't lose corso extension data(deets) for items which were downloaded and processed by kopia during this backup op. Note: This is 1/n PRs, and will not be merged into main until all n are reviewed, given we are touching state here. **Changes in this PR:** 1. Error handling changes to ensure assist backups get persisted 2. Introduce tags for assist/merge backups. These tags will be used for filtering during base selection process. 3. Don't persist backups if they are not assist backup, and they have non zero errors. 4. Remove `LabelForceNoBackupCreation` criteria for not persisting a backup. More details are in [PR comment](https://github.com/alcionai/corso/pull/3905/files#r1278030140). 5. Switch kopia failure from non-recoverable to recoverable. More details are in [PR comment](https://github.com/alcionai/corso/pull/3905/files#r1278019992). **What are assist backups?** Below conditions must be true for a backup operation to qualify as an assist backup. 1. new deets were produced during this backup operation 2. we have a valid snapshot ID 3. we don't have any non-recoverable errors 4. we have recoverable errors **A merge backup satisfies the following conditions:** 1. Is not an assist backup 2. No primary or recoverable errors occurred during the backup operation. Tests will be added in later PRs. --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) * internal #### Test Plan - [x] :muscle: Manual - [ ] :zap: Unit test - [ ] :green_heart: E2E --- src/internal/operations/backup.go | 132 ++++++++++++++---- src/internal/operations/helpers.go | 4 +- src/pkg/backup/backup.go | 7 +- src/pkg/backup/details/builder.go | 7 + .../repository/repository_unexported_test.go | 7 +- 5 files changed, 124 insertions(+), 33 deletions(-) diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index f5f73f5d9..a2531bc30 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -122,9 +122,64 @@ func (op BackupOperation) validate() error { // pointer wrapping the values, while those values // get populated asynchronously. type backupStats struct { - k *kopia.BackupStats - ctrl *data.CollectionStats - resourceCount int + k *kopia.BackupStats + ctrl *data.CollectionStats + resourceCount int + hasNewDetailEntries bool +} + +// An assist backup must meet the following criteria: +// 1. new detail entries were produced +// 2. valid details ssid & item snapshot ID +// 3. no non-recoverable errors +// 4. we observed recoverable errors +// 5. not running in best effort mode. Reason being that there is +// no way to distinguish assist backups from merge backups in best effort mode. +// +// Primary reason for persisting assist backup models is to ensure we don't +// lose corso extension data(deets) in the event of recoverable failures. +// +// Note: kopia.DetailsMergeInfoer doesn't impact decision making for creating +// assist backups. It may be empty if it’s the very first backup so there is no +// merge base to source base details from, or non-empty, if there was a merge +// base. In summary, if there are no new deets, no new extension data was produced +// and hence no need to persist assist backup model. +func isAssistBackup( + newDeetsProduced bool, + snapID, ssid string, + failurePolicy control.FailurePolicy, + err *fault.Bus, +) bool { + return newDeetsProduced && + len(snapID) > 0 && + len(ssid) > 0 && + failurePolicy != control.BestEffort && + err.Failure() == nil && + len(err.Recovered()) > 0 +} + +// A merge backup must meet the following criteria: +// 1. valid details ssid & item snapshot ID +// 2. zero recoverable errors +// 3. no recoverable errors if not running in best effort mode +func isMergeBackup( + snapID, ssid string, + failurePolicy control.FailurePolicy, + err *fault.Bus, +) bool { + if len(snapID) == 0 || len(ssid) == 0 { + return false + } + + if err.Failure() != nil { + return false + } + + if failurePolicy == control.BestEffort { + return true + } + + return len(err.Recovered()) == 0 } // --------------------------------------------------------------------------- @@ -230,7 +285,6 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { op.Errors.Fail(clues.Wrap(err, "running backup")) } - finalizeErrorHandling(ctx, op.Options, op.Errors, "running backup") LogFaultErrors(ctx, op.Errors.Errors(), "running backup") // ----- @@ -243,35 +297,25 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { return op.Errors.Failure() } - // force exit without backup in certain cases. - // see: https://github.com/alcionai/corso/pull/2510#discussion_r1113532530 - for _, e := range op.Errors.Recovered() { - if clues.HasLabel(e, fault.LabelForceNoBackupCreation) { - logger.Ctx(ctx). - With("error", e). - With(clues.InErr(err).Slice()...). - Infow("completed backup; conditional error forcing exit without model persistence", - "results", op.Results) - - return op.Errors.Fail(clues.Wrap(e, "forced backup")).Failure() - } - } - err = op.createBackupModels( ctx, sstore, - opStats.k.SnapshotID, + opStats, op.Results.BackupID, op.BackupVersion, deets.Details()) if err != nil { - op.Errors.Fail(clues.Wrap(err, "persisting backup")) + op.Errors.Fail(clues.Wrap(err, "persisting backup models")) return op.Errors.Failure() } - logger.Ctx(ctx).Infow("completed backup", "results", op.Results) + finalizeErrorHandling(ctx, op.Options, op.Errors, "running backup") - return nil + if op.Errors.Failure() == nil { + logger.Ctx(ctx).Infow("completed backup", "results", op.Results) + } + + return op.Errors.Failure() } // do is purely the action of running a backup. All pre/post behavior @@ -356,6 +400,8 @@ func (op *BackupOperation) do( return nil, clues.Wrap(err, "persisting collection backups") } + opStats.hasNewDetailEntries = (deets != nil && !deets.Empty()) || + (toMerge != nil && toMerge.ItemsToMerge() > 0) opStats.k = writeStats err = mergeDetails( @@ -506,7 +552,7 @@ func consumeBackupCollections( if kopiaStats.ErrorCount > 0 { err = clues.New("building kopia snapshot").WithClues(ctx) } else if kopiaStats.IgnoredErrorCount > kopiaStats.ExpectedIgnoredErrorCount { - err = clues.New("downloading items for persistence").WithClues(ctx) + logger.Ctx(ctx).Info("recoverable errors were seen during backup") } return kopiaStats, deets, itemsSourcedFromBase, err @@ -810,6 +856,8 @@ func (op *BackupOperation) persistResults( op.Status = Completed + // Non recoverable errors always result in a failed backup. + // This holds true for all FailurePolicy. if op.Errors.Failure() != nil { op.Status = Failed } @@ -837,6 +885,7 @@ func (op *BackupOperation) persistResults( op.Results.ItemsRead = opStats.ctrl.Successes + // Only return non-recoverable errors at this point. return op.Errors.Failure() } @@ -844,12 +893,16 @@ func (op *BackupOperation) persistResults( func (op *BackupOperation) createBackupModels( ctx context.Context, sscw streamstore.CollectorWriter, - snapID string, + opStats backupStats, backupID model.StableID, backupVersion int, deets *details.Details, ) error { - ctx = clues.Add(ctx, "snapshot_id", snapID, "backup_id", backupID) + snapID := opStats.k.SnapshotID + ctx = clues.Add(ctx, + "snapshot_id", snapID, + "backup_id", backupID) + // generate a new fault bus so that we can maintain clean // separation between the errors we serialize and those that // are generated during the serialization process. @@ -878,6 +931,32 @@ func (op *BackupOperation) createBackupModels( ctx = clues.Add(ctx, "streamstore_snapshot_id", ssid) + tags := map[string]string{ + model.ServiceTag: op.Selectors.PathService().String(), + } + + // Add tags to mark this backup as either assist or merge. This is used to: + // 1. Filter assist backups by tag during base selection process + // 2. Differentiate assist backups from merge backups + if isMergeBackup( + snapID, + ssid, + op.Options.FailureHandling, + op.Errors) { + tags[model.BackupTypeTag] = model.MergeBackup + } else if isAssistBackup( + opStats.hasNewDetailEntries, + snapID, + ssid, + op.Options.FailureHandling, + op.Errors) { + tags[model.BackupTypeTag] = model.AssistBackup + } else { + return clues.New("backup is neither assist nor merge").WithClues(ctx) + } + + ctx = clues.Add(ctx, model.BackupTypeTag, tags[model.BackupTypeTag]) + b := backup.New( snapID, ssid, op.Status.String(), @@ -888,7 +967,8 @@ func (op *BackupOperation) createBackupModels( op.ResourceOwner.Name(), op.Results.ReadWrites, op.Results.StartAndEndTime, - op.Errors.Errors()) + op.Errors.Errors(), + tags) logger.Ctx(ctx).Info("creating new backup") diff --git a/src/internal/operations/helpers.go b/src/internal/operations/helpers.go index 0c5c9c049..cdce0fdec 100644 --- a/src/internal/operations/helpers.go +++ b/src/internal/operations/helpers.go @@ -11,8 +11,8 @@ import ( "github.com/alcionai/corso/src/pkg/logger" ) -// finalizeErrorHandling ensures the operation follow the options -// failure behavior requirements. +// finalizeErrorHandling ensures the operation follows the +// failure policy requirements. func finalizeErrorHandling( ctx context.Context, opts control.Options, diff --git a/src/pkg/backup/backup.go b/src/pkg/backup/backup.go index 39e317513..f9bee844b 100644 --- a/src/pkg/backup/backup.go +++ b/src/pkg/backup/backup.go @@ -79,6 +79,7 @@ func New( rw stats.ReadWrites, se stats.StartAndEndTime, fe *fault.Errors, + tags map[string]string, ) *Backup { if fe == nil { fe = &fault.Errors{} @@ -113,10 +114,8 @@ func New( return &Backup{ BaseModel: model.BaseModel{ - ID: id, - Tags: map[string]string{ - model.ServiceTag: selector.PathService().String(), - }, + ID: id, + Tags: tags, }, ResourceOwnerID: ownerID, diff --git a/src/pkg/backup/details/builder.go b/src/pkg/backup/details/builder.go index cb5014435..41ce3c60f 100644 --- a/src/pkg/backup/details/builder.go +++ b/src/pkg/backup/details/builder.go @@ -16,6 +16,13 @@ type Builder struct { knownFolders map[string]Entry `json:"-"` } +func (b *Builder) Empty() bool { + b.mu.Lock() + defer b.mu.Unlock() + + return len(b.d.Entries) == 0 +} + func (b *Builder) Add( repoRef path.Path, locationRef *path.Builder, diff --git a/src/pkg/repository/repository_unexported_test.go b/src/pkg/repository/repository_unexported_test.go index 344567f74..e24f2e1d6 100644 --- a/src/pkg/repository/repository_unexported_test.go +++ b/src/pkg/repository/repository_unexported_test.go @@ -559,6 +559,10 @@ func writeBackup( ssid, err := sstore.Write(ctx, errs) require.NoError(t, err, "writing to streamstore") + tags := map[string]string{ + model.ServiceTag: sel.PathService().String(), + } + b := backup.New( snapID, ssid, operations.Completed.String(), @@ -568,7 +572,8 @@ func writeBackup( ownerID, ownerName, stats.ReadWrites{}, stats.StartAndEndTime{}, - fe) + fe, + tags) err = sw.Put(ctx, model.BackupSchema, b) require.NoError(t, err)