ashmrtn eb95b43010
Remove updated field from details (#4004)
No longer required by SDK users and not exposed directly to CLI users unless they're looking at the JSON output from details. This field is not documented anywhere though so there's no guarantees that we technically need to uphold with it.

Manually tested:
* restoring from a backup that had this field
* making a fresh backup without this field when the merge base did have this field

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)

* closes #3986

#### Test Plan

- [x] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
2023-08-11 16:32:21 +00:00

965 lines
27 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package operations
import (
"context"
"time"
"github.com/alcionai/clues"
"github.com/google/uuid"
"github.com/alcionai/corso/src/internal/common/crash"
"github.com/alcionai/corso/src/internal/common/dttm"
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia"
kinject "github.com/alcionai/corso/src/internal/kopia/inject"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/model"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/internal/stats"
"github.com/alcionai/corso/src/internal/streamstore"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/backup"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/backup/identity"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/store"
)
// BackupOperation wraps an operation with backup-specific props.
type BackupOperation struct {
operation
ResourceOwner idname.Provider
Results BackupResults `json:"results"`
Selectors selectors.Selector `json:"selectors"`
Version string `json:"version"`
// BackupVersion ONLY controls the value that gets persisted to the
// backup model after operation. It does NOT modify the operation behavior
// to match the version. Its inclusion here is, unfortunately, purely to
// facilitate integration testing that requires a certain backup version, and
// should be removed when we have a more controlled workaround.
BackupVersion int
account account.Account
bp inject.BackupProducer
// when true, this allows for incremental backups instead of full data pulls
incremental bool
// When true, disables kopia-assisted incremental backups. This forces
// downloading and hashing all item data for items not in the merge base(s).
disableAssistBackup bool
}
// BackupResults aggregate the details of the result of the operation.
type BackupResults struct {
stats.ReadWrites
stats.StartAndEndTime
BackupID model.StableID `json:"backupID"`
}
// NewBackupOperation constructs and validates a backup operation.
func NewBackupOperation(
ctx context.Context,
opts control.Options,
kw *kopia.Wrapper,
sw store.BackupStorer,
bp inject.BackupProducer,
acct account.Account,
selector selectors.Selector,
owner idname.Provider,
bus events.Eventer,
) (BackupOperation, error) {
op := BackupOperation{
operation: newOperation(opts, bus, count.New(), kw, sw),
ResourceOwner: owner,
Selectors: selector,
Version: "v0",
BackupVersion: version.Backup,
account: acct,
incremental: useIncrementalBackup(selector, opts),
disableAssistBackup: opts.ToggleFeatures.ForceItemDataDownload,
bp: bp,
}
if err := op.validate(); err != nil {
return BackupOperation{}, err
}
return op, nil
}
func (op BackupOperation) validate() error {
if op.ResourceOwner == nil {
return clues.New("backup requires a resource owner")
}
if len(op.ResourceOwner.ID()) == 0 {
return clues.New("backup requires a resource owner with a populated ID")
}
if op.bp == nil {
return clues.New("missing backup producer")
}
return op.operation.validate()
}
// aggregates stats from the backup.Run().
// primarily used so that the defer can take in a
// pointer wrapping the values, while those values
// get populated asynchronously.
type backupStats struct {
k *kopia.BackupStats
ctrl *data.CollectionStats
resourceCount int
hasNewDetailEntries bool
}
// An assist backup must meet the following criteria:
// 1. new detail entries were produced
// 2. valid details ssid & item snapshot ID
// 3. no non-recoverable errors
// 4. we observed recoverable errors
// 5. not running in best effort mode. Reason being that there is
// no way to distinguish assist backups from merge backups in best effort mode.
//
// Primary reason for persisting assist backup models is to ensure we don't
// lose corso extension data(deets) in the event of recoverable failures.
//
// Note: kopia.DetailsMergeInfoer doesn't impact decision making for creating
// assist backups. It may be empty if its the very first backup so there is no
// merge base to source base details from, or non-empty, if there was a merge
// base. In summary, if there are no new deets, no new extension data was produced
// and hence no need to persist assist backup model.
func isAssistBackup(
newDeetsProduced bool,
snapID, ssid string,
failurePolicy control.FailurePolicy,
err *fault.Bus,
) bool {
return newDeetsProduced &&
len(snapID) > 0 &&
len(ssid) > 0 &&
failurePolicy != control.BestEffort &&
err.Failure() == nil &&
len(err.Recovered()) > 0
}
// A merge backup must meet the following criteria:
// 1. valid details ssid & item snapshot ID
// 2. zero recoverable errors
// 3. no recoverable errors if not running in best effort mode
func isMergeBackup(
snapID, ssid string,
failurePolicy control.FailurePolicy,
err *fault.Bus,
) bool {
if len(snapID) == 0 || len(ssid) == 0 {
return false
}
if err.Failure() != nil {
return false
}
if failurePolicy == control.BestEffort {
return true
}
return len(err.Recovered()) == 0
}
// ---------------------------------------------------------------------------
// Primary Controller
// ---------------------------------------------------------------------------
// Run begins a synchronous backup operation.
func (op *BackupOperation) Run(ctx context.Context) (err error) {
defer func() {
if crErr := crash.Recovery(ctx, recover(), "backup"); crErr != nil {
err = crErr
}
}()
ctx, end := diagnostics.Span(ctx, "operations:backup:run")
defer func() {
end()
}()
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
defer flushMetrics()
var runnable bool
// IsBackupRunnable checks if the user has services enabled to run a backup.
// it also checks for conditions like mailbox full.
runnable, err = op.bp.IsBackupRunnable(ctx, op.Selectors.PathService(), op.ResourceOwner.ID())
if err != nil {
logger.CtxErr(ctx, err).Error("verifying backup is runnable")
op.Errors.Fail(clues.Wrap(err, "verifying backup is runnable"))
return
}
if !runnable {
logger.CtxErr(ctx, graph.ErrServiceNotEnabled).Error("checking if backup is enabled")
op.Errors.Fail(clues.Wrap(err, "checking if backup is enabled"))
return
}
// -----
// Setup
// -----
var (
opStats backupStats
startTime = time.Now()
sstore = streamstore.NewStreamer(op.kopia, op.account.ID(), op.Selectors.PathService())
)
op.Results.BackupID = model.StableID(uuid.NewString())
ctx = clues.Add(
ctx,
"tenant_id", clues.Hide(op.account.ID()),
"resource_owner_id", op.ResourceOwner.ID(),
"resource_owner_name", clues.Hide(op.ResourceOwner.Name()),
"backup_id", op.Results.BackupID,
"service", op.Selectors.Service,
"incremental", op.incremental,
"disable_assist_backup", op.disableAssistBackup)
op.bus.Event(
ctx,
events.BackupStart,
map[string]any{
events.StartTime: startTime,
events.Service: op.Selectors.Service.String(),
events.BackupID: op.Results.BackupID,
})
defer func() {
op.bus.Event(
ctx,
events.BackupEnd,
map[string]any{
events.BackupID: op.Results.BackupID,
events.DataStored: op.Results.BytesUploaded,
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
events.EndTime: dttm.Format(op.Results.CompletedAt),
events.Resources: op.Results.ResourceOwners,
events.Service: op.Selectors.PathService().String(),
events.StartTime: dttm.Format(op.Results.StartedAt),
events.Status: op.Status.String(),
})
}()
// -----
// Execution
// -----
observe.Message(ctx, "Backing Up", observe.Bullet, clues.Hide(op.ResourceOwner.Name()))
deets, err := op.do(
ctx,
&opStats,
sstore,
op.Results.BackupID)
if err != nil {
// No return here! We continue down to persistResults, even in case of failure.
logger.CtxErr(ctx, err).Error("running backup")
op.Errors.Fail(clues.Wrap(err, "running backup"))
}
LogFaultErrors(ctx, op.Errors.Errors(), "running backup")
// -----
// Persistence
// -----
err = op.persistResults(startTime, &opStats)
if err != nil {
op.Errors.Fail(clues.Wrap(err, "persisting backup results"))
return op.Errors.Failure()
}
err = op.createBackupModels(
ctx,
sstore,
opStats,
op.Results.BackupID,
op.BackupVersion,
deets.Details())
if err != nil {
op.Errors.Fail(clues.Wrap(err, "persisting backup models"))
return op.Errors.Failure()
}
finalizeErrorHandling(ctx, op.Options, op.Errors, "running backup")
if op.Errors.Failure() == nil {
logger.Ctx(ctx).Infow("completed backup", "results", op.Results)
}
return op.Errors.Failure()
}
// do is purely the action of running a backup. All pre/post behavior
// is found in Run().
func (op *BackupOperation) do(
ctx context.Context,
opStats *backupStats,
detailsStore streamstore.Streamer,
backupID model.StableID,
) (*details.Builder, error) {
var (
reasons = selectorToReasons(op.account.ID(), op.Selectors, false)
fallbackReasons = makeFallbackReasons(op.account.ID(), op.Selectors)
lastBackupVersion = version.NoBackup
)
logger.Ctx(ctx).With(
"control_options", op.Options,
"selectors", op.Selectors).
Info("backing up selection")
// should always be 1, since backups are 1:1 with resourceOwners.
opStats.resourceCount = 1
kbf, err := op.kopia.NewBaseFinder(op.store)
if err != nil {
return nil, clues.Stack(err)
}
mans, mdColls, canUseMetadata, err := produceManifestsAndMetadata(
ctx,
kbf,
op.kopia,
reasons, fallbackReasons,
op.account.ID(),
op.incremental,
op.disableAssistBackup)
if err != nil {
return nil, clues.Wrap(err, "producing manifests and metadata")
}
ctx = clues.Add(ctx, "can_use_metadata", canUseMetadata)
if canUseMetadata {
lastBackupVersion = mans.MinBackupVersion()
}
// TODO(ashmrtn): This should probably just return a collection that deletes
// the entire subtree instead of returning an additional bool. That way base
// selection is controlled completely by flags and merging is controlled
// completely by collections.
cs, ssmb, canUsePreviousBackup, err := produceBackupDataCollections(
ctx,
op.bp,
op.ResourceOwner,
op.Selectors,
mdColls,
lastBackupVersion,
op.Options,
op.Errors)
if err != nil {
return nil, clues.Wrap(err, "producing backup data collections")
}
ctx = clues.Add(
ctx,
"can_use_previous_backup", canUsePreviousBackup,
"collection_count", len(cs))
writeStats, deets, toMerge, err := consumeBackupCollections(
ctx,
op.kopia,
op.account.ID(),
reasons,
mans,
cs,
ssmb,
backupID,
op.incremental && canUseMetadata && canUsePreviousBackup,
op.Errors)
if err != nil {
return nil, clues.Wrap(err, "persisting collection backups")
}
opStats.hasNewDetailEntries = (deets != nil && !deets.Empty()) ||
(toMerge != nil && toMerge.ItemsToMerge() > 0)
opStats.k = writeStats
err = mergeDetails(
ctx,
detailsStore,
mans,
toMerge,
deets,
writeStats,
op.Selectors.PathService(),
op.Errors)
if err != nil {
return nil, clues.Wrap(err, "merging details")
}
opStats.ctrl = op.bp.Wait()
logger.Ctx(ctx).Debug(opStats.ctrl)
return deets, nil
}
func makeFallbackReasons(tenant string, sel selectors.Selector) []identity.Reasoner {
if sel.PathService() != path.SharePointService &&
sel.DiscreteOwner != sel.DiscreteOwnerName {
return selectorToReasons(tenant, sel, true)
}
return nil
}
// checker to see if conditions are correct for incremental backup behavior such as
// retrieving metadata like delta tokens and previous paths.
func useIncrementalBackup(sel selectors.Selector, opts control.Options) bool {
return !opts.ToggleFeatures.DisableIncrementals
}
// ---------------------------------------------------------------------------
// Producer funcs
// ---------------------------------------------------------------------------
// calls the producer to generate collections of data to backup
func produceBackupDataCollections(
ctx context.Context,
bp inject.BackupProducer,
protectedResource idname.Provider,
sel selectors.Selector,
metadata []data.RestoreCollection,
lastBackupVersion int,
ctrlOpts control.Options,
errs *fault.Bus,
) ([]data.BackupCollection, prefixmatcher.StringSetReader, bool, error) {
progressBar := observe.MessageWithCompletion(ctx, "Discovering items to backup")
defer close(progressBar)
bpc := inject.BackupProducerConfig{
LastBackupVersion: lastBackupVersion,
MetadataCollections: metadata,
Options: ctrlOpts,
ProtectedResource: protectedResource,
Selector: sel,
}
return bp.ProduceBackupCollections(ctx, bpc, errs)
}
// ---------------------------------------------------------------------------
// Consumer funcs
// ---------------------------------------------------------------------------
func selectorToReasons(
tenant string,
sel selectors.Selector,
useOwnerNameForID bool,
) []identity.Reasoner {
service := sel.PathService()
reasons := []identity.Reasoner{}
pcs, err := sel.PathCategories()
if err != nil {
// This is technically safe, it's just that the resulting backup won't be
// usable as a base for future incremental backups.
return nil
}
owner := sel.DiscreteOwner
if useOwnerNameForID {
owner = sel.DiscreteOwnerName
}
for _, sl := range [][]path.CategoryType{pcs.Includes, pcs.Filters} {
for _, cat := range sl {
reasons = append(reasons, kopia.NewReason(tenant, owner, service, cat))
}
}
return reasons
}
// calls kopia to backup the collections of data
func consumeBackupCollections(
ctx context.Context,
bc kinject.BackupConsumer,
tenantID string,
reasons []identity.Reasoner,
bbs kopia.BackupBases,
cs []data.BackupCollection,
pmr prefixmatcher.StringSetReader,
backupID model.StableID,
isIncremental bool,
errs *fault.Bus,
) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error) {
ctx = clues.Add(ctx, "collection_source", "operations")
progressBar := observe.MessageWithCompletion(ctx, "Backing up data")
defer close(progressBar)
tags := map[string]string{
kopia.TagBackupID: string(backupID),
kopia.TagBackupCategory: "",
}
kopiaStats, deets, itemsSourcedFromBase, err := bc.ConsumeBackupCollections(
ctx,
reasons,
bbs,
cs,
pmr,
tags,
isIncremental,
errs)
if err != nil {
if kopiaStats == nil {
return nil, nil, nil, clues.Stack(err)
}
return nil, nil, nil, clues.Stack(err).With(
"kopia_errors", kopiaStats.ErrorCount,
"kopia_ignored_errors", kopiaStats.IgnoredErrorCount)
}
ctx = clues.Add(
ctx,
"kopia_errors", kopiaStats.ErrorCount,
"kopia_ignored_errors", kopiaStats.IgnoredErrorCount,
"kopia_expected_ignored_errors", kopiaStats.ExpectedIgnoredErrorCount)
if kopiaStats.ErrorCount > 0 {
err = clues.New("building kopia snapshot").WithClues(ctx)
} else if kopiaStats.IgnoredErrorCount > kopiaStats.ExpectedIgnoredErrorCount {
logger.Ctx(ctx).Info("recoverable errors were seen during backup")
}
return kopiaStats, deets, itemsSourcedFromBase, err
}
func matchesReason(reasons []identity.Reasoner, p path.Path) bool {
for _, reason := range reasons {
if p.ResourceOwner() == reason.ProtectedResource() &&
p.Service() == reason.Service() &&
p.Category() == reason.Category() {
return true
}
}
return false
}
// getNewPathRefs returns
// 1. the new RepoRef for the item if it needs merged
// 2. the new locationPath
// 3. if the location was likely updated
// 4. any errors encountered
func getNewPathRefs(
dataFromBackup kopia.DetailsMergeInfoer,
entry *details.Entry,
repoRef path.Path,
backupVersion int,
) (path.Path, *path.Builder, error) {
// Right now we can't guarantee that we have an old location in the
// previous details entry so first try a lookup without a location to see
// if it matches so we don't need to try parsing from the old entry.
//
// TODO(ashmrtn): In the future we can remove this first check as we'll be
// able to assume we always have the location in the previous entry. We'll end
// up doing some extra parsing, but it will simplify this code.
if repoRef.Service() == path.ExchangeService {
newPath, newLoc, err := dataFromBackup.GetNewPathRefs(
repoRef.ToBuilder(),
entry.Modified(),
nil)
if err != nil {
return nil, nil, clues.Wrap(err, "getting new paths")
} else if newPath == nil {
// This entry doesn't need merging.
return nil, nil, nil
} else if newLoc == nil {
return nil, nil, clues.New("unable to find new exchange location")
}
return newPath, newLoc, nil
}
// We didn't have an exact entry, so retry with a location.
locRef, err := entry.ToLocationIDer(backupVersion)
if err != nil {
return nil, nil, clues.Wrap(err, "getting previous item location")
}
if locRef == nil {
return nil, nil, clues.New("entry with empty LocationRef")
}
newPath, newLoc, err := dataFromBackup.GetNewPathRefs(
repoRef.ToBuilder(),
entry.Modified(),
locRef)
if err != nil {
return nil, nil, clues.Wrap(err, "getting new paths with old location")
} else if newPath == nil {
return nil, nil, nil
} else if newLoc == nil {
return nil, nil, clues.New("unable to get new paths")
}
return newPath, newLoc, nil
}
func mergeItemsFromBase(
ctx context.Context,
checkReason bool,
baseBackup kopia.BackupEntry,
detailsStore streamstore.Streamer,
dataFromBackup kopia.DetailsMergeInfoer,
deets *details.Builder,
alreadySeenItems map[string]struct{},
errs *fault.Bus,
) (int, error) {
var (
manifestAddedEntries int
totalBaseItems int
)
// Can't be in the above block else it's counted as a redeclaration.
ctx = clues.Add(ctx, "base_backup_id", baseBackup.ID)
baseDeets, err := getDetailsFromBackup(
ctx,
baseBackup.Backup,
detailsStore,
errs)
if err != nil {
return manifestAddedEntries,
clues.New("fetching base details for backup").WithClues(ctx)
}
for _, entry := range baseDeets.Items() {
// Track this here instead of calling Items() again to get the count since
// it can be a bit expensive.
totalBaseItems++
rr, err := path.FromDataLayerPath(entry.RepoRef, true)
if err != nil {
return manifestAddedEntries, clues.New("parsing base item info path").
WithClues(ctx).
With("repo_ref", path.LoggableDir(entry.RepoRef))
}
// Although this base has an entry it may not be the most recent. Check
// the reasons a snapshot was returned to ensure we only choose the recent
// entries.
//
// We only really want to do this check for merge bases though because
// kopia won't abide by reasons when determining if an item's cached. This
// leaves us in a bit of a pickle if the user has run any concurrent backups
// with overlapping reasons that then turn into assist bases, but the
// modTime check in DetailsMergeInfoer should handle that.
if checkReason && !matchesReason(baseBackup.Reasons, rr) {
continue
}
// Skip items that were already found in a previous base backup.
if _, ok := alreadySeenItems[rr.ShortRef()]; ok {
continue
}
ictx := clues.Add(ctx, "repo_ref", rr)
newPath, newLoc, err := getNewPathRefs(
dataFromBackup,
entry,
rr,
baseBackup.Version)
if err != nil {
return manifestAddedEntries,
clues.Wrap(err, "getting updated info for entry").WithClues(ictx)
}
// This entry isn't merged.
if newPath == nil {
continue
}
// Fixup paths in the item.
item := entry.ItemInfo
details.UpdateItem(&item, newLoc)
err = deets.Add(
newPath,
newLoc,
item)
if err != nil {
return manifestAddedEntries,
clues.Wrap(err, "adding item to details").WithClues(ictx)
}
// Make sure we won't add this again in another base.
alreadySeenItems[rr.ShortRef()] = struct{}{}
// Track how many entries we added so that we know if we got them all when
// we're done.
manifestAddedEntries++
}
logger.Ctx(ctx).Infow(
"merged details with base manifest",
"count_base_item_unfiltered", totalBaseItems,
"count_base_item_added", manifestAddedEntries)
return manifestAddedEntries, nil
}
func mergeDetails(
ctx context.Context,
detailsStore streamstore.Streamer,
bases kopia.BackupBases,
dataFromBackup kopia.DetailsMergeInfoer,
deets *details.Builder,
writeStats *kopia.BackupStats,
serviceType path.ServiceType,
errs *fault.Bus,
) error {
detailsModel := deets.Details().DetailsModel
// getting the values in writeStats before anything else so that we don't get a return from
// conditions like no backup data.
writeStats.TotalNonMetaFileCount = len(detailsModel.FilterMetaFiles().Items())
writeStats.TotalNonMetaUploadedBytes = detailsModel.SumNonMetaFileSizes()
// Don't bother loading any of the base details if there's nothing we need to merge.
if bases == nil || dataFromBackup == nil || dataFromBackup.ItemsToMerge() == 0 {
return nil
}
var (
addedEntries int
// alreadySeenEntries tracks items that we've already merged so we don't
// accidentally merge them again. This could happen if, for example, there's
// an assist backup and a merge backup that both have the same version of an
// item at the same path.
alreadySeenEntries = map[string]struct{}{}
)
// Merge details from assist bases first. It shouldn't technically matter
// since the DetailsMergeInfoer should take into account the modTime of items,
// but just to be on the safe side.
//
// We don't want to match entries based on Reason for assist bases because
// kopia won't abide by Reasons when determining if an item's cached. This
// leaves us in a bit of a pickle if the user has run any concurrent backups
// with overlapping Reasons that turn into assist bases, but the modTime check
// in DetailsMergeInfoer should handle that.
for _, base := range bases.AssistBackups() {
added, err := mergeItemsFromBase(
ctx,
false,
base,
detailsStore,
dataFromBackup,
deets,
alreadySeenEntries,
errs)
if err != nil {
return clues.Wrap(err, "merging assist backup base details")
}
addedEntries = addedEntries + added
}
// Now add entries from the merge base backups. These will be things that
// weren't changed in the new backup. Items that were already added because
// they were counted as cached in an assist base backup will be skipped due to
// alreadySeenEntries.
//
// We do want to enable matching entries based on Reasons because we
// explicitly control which subtrees from the merge base backup are grafted
// onto the hierarchy for the currently running backup.
for _, base := range bases.Backups() {
added, err := mergeItemsFromBase(
ctx,
true,
base,
detailsStore,
dataFromBackup,
deets,
alreadySeenEntries,
errs)
if err != nil {
return clues.Wrap(err, "merging merge backup base details")
}
addedEntries = addedEntries + added
}
checkCount := dataFromBackup.ItemsToMerge()
if addedEntries != checkCount {
return clues.New("incomplete migration of backup details").
WithClues(ctx).
With(
"item_count", addedEntries,
"expected_item_count", checkCount)
}
return nil
}
// writes the results metrics to the operation results.
// later stored in the manifest using createBackupModels.
func (op *BackupOperation) persistResults(
started time.Time,
opStats *backupStats,
) error {
op.Results.StartedAt = started
op.Results.CompletedAt = time.Now()
op.Status = Completed
// Non recoverable errors always result in a failed backup.
// This holds true for all FailurePolicy.
if op.Errors.Failure() != nil {
op.Status = Failed
}
if opStats.k == nil {
op.Status = Failed
return clues.New("backup persistence never completed")
}
op.Results.BytesRead = opStats.k.TotalHashedBytes
op.Results.BytesUploaded = opStats.k.TotalUploadedBytes
op.Results.ItemsWritten = opStats.k.TotalFileCount
op.Results.NonMetaBytesUploaded = opStats.k.TotalNonMetaUploadedBytes
op.Results.NonMetaItemsWritten = opStats.k.TotalNonMetaFileCount
op.Results.ResourceOwners = opStats.resourceCount
if opStats.ctrl == nil {
op.Status = Failed
return clues.New("backup population never completed")
}
if op.Status != Failed && opStats.ctrl.IsZero() {
op.Status = NoData
}
op.Results.ItemsRead = opStats.ctrl.Successes
// Only return non-recoverable errors at this point.
return op.Errors.Failure()
}
// stores the operation details, results, and selectors in the backup manifest.
func (op *BackupOperation) createBackupModels(
ctx context.Context,
sscw streamstore.CollectorWriter,
opStats backupStats,
backupID model.StableID,
backupVersion int,
deets *details.Details,
) error {
snapID := opStats.k.SnapshotID
ctx = clues.Add(ctx,
"snapshot_id", snapID,
"backup_id", backupID)
// generate a new fault bus so that we can maintain clean
// separation between the errors we serialize and those that
// are generated during the serialization process.
errs := fault.New(true)
if deets == nil {
return clues.New("no backup details to record").WithClues(ctx)
}
ctx = clues.Add(ctx, "details_entry_count", len(deets.Entries))
err := sscw.Collect(ctx, streamstore.DetailsCollector(deets))
if err != nil {
return clues.Wrap(err, "collecting details for persistence").WithClues(ctx)
}
err = sscw.Collect(ctx, streamstore.FaultErrorsCollector(op.Errors.Errors()))
if err != nil {
return clues.Wrap(err, "collecting errors for persistence").WithClues(ctx)
}
ssid, err := sscw.Write(ctx, errs)
if err != nil {
return clues.Wrap(err, "persisting details and errors").WithClues(ctx)
}
ctx = clues.Add(ctx, "streamstore_snapshot_id", ssid)
tags := map[string]string{
model.ServiceTag: op.Selectors.PathService().String(),
}
// Add tags to mark this backup as either assist or merge. This is used to:
// 1. Filter assist backups by tag during base selection process
// 2. Differentiate assist backups from merge backups
if isMergeBackup(
snapID,
ssid,
op.Options.FailureHandling,
op.Errors) {
tags[model.BackupTypeTag] = model.MergeBackup
} else if isAssistBackup(
opStats.hasNewDetailEntries,
snapID,
ssid,
op.Options.FailureHandling,
op.Errors) {
tags[model.BackupTypeTag] = model.AssistBackup
} else {
return clues.New("backup is neither assist nor merge").WithClues(ctx)
}
ctx = clues.Add(ctx, model.BackupTypeTag, tags[model.BackupTypeTag])
b := backup.New(
snapID, ssid,
op.Status.String(),
backupVersion,
backupID,
op.Selectors,
op.ResourceOwner.ID(),
op.ResourceOwner.Name(),
op.Results.ReadWrites,
op.Results.StartAndEndTime,
op.Errors.Errors(),
tags)
logger.Ctx(ctx).Info("creating new backup")
if err = op.store.Put(ctx, model.BackupSchema, b); err != nil {
return clues.Wrap(err, "creating backup model").WithClues(ctx)
}
return nil
}