From 8dfb00f30812728797a51aac24d761028df7e8b8 Mon Sep 17 00:00:00 2001 From: Keepers Date: Thu, 16 Mar 2023 19:07:07 -0600 Subject: [PATCH] add fault errors streamstore (#2731) Adds a new streamstore controller for fault.Errors. This provides large scale, extensible file storage for fault errors to be persisted, much like we do for backup details. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :sunflower: Feature #### Issue(s) * #2708 #### Test Plan - [x] :green_heart: E2E --- CHANGELOG.md | 1 + src/cli/backup/backup.go | 89 ++++-- src/cli/backup/exchange.go | 11 +- src/cli/backup/exchange_e2e_test.go | 4 +- src/cli/backup/onedrive.go | 13 +- src/cli/backup/sharepoint.go | 13 +- src/cli/backup/sharepoint_e2e_test.go | 4 +- src/cli/restore/exchange_e2e_test.go | 2 +- src/cli/utils/testdata/opts.go | 21 +- src/cmd/purge/purge.go | 3 +- src/go.mod | 2 +- src/go.sum | 4 +- .../connector/discovery/discovery_test.go | 2 +- src/internal/kopia/wrapper.go | 3 +- src/internal/operations/backup.go | 40 ++- .../operations/backup_integration_test.go | 49 +-- src/internal/operations/backup_test.go | 4 +- src/internal/operations/common.go | 18 +- src/internal/operations/manifests.go | 35 ++- src/internal/operations/manifests_test.go | 46 +-- src/internal/operations/restore.go | 6 +- src/internal/operations/restore_test.go | 2 +- src/internal/streamstore/collectables.go | 63 ++++ src/internal/streamstore/collectables_test.go | 218 +++++++++++++ src/internal/streamstore/details.go | 159 ---------- src/internal/streamstore/details_test.go | 82 ----- src/internal/streamstore/mock/details.go | 48 --- src/internal/streamstore/mock/mock.go | 62 ++++ src/internal/streamstore/streamstore.go | 234 +++++++++++++- src/pkg/backup/backup.go | 23 +- .../details/testdata/testdata.go} | 0 src/pkg/fault/fault.go | 121 +++++-- src/pkg/fault/fault_test.go | 83 +++-- src/pkg/fault/item.go | 96 +++++- src/pkg/fault/item_test.go | 109 ++++++- src/pkg/fault/testdata/testdata.go | 26 ++ src/pkg/path/path.go | 3 +- .../loadtest/repository_load_test.go | 2 +- src/pkg/repository/repository.go | 127 +++++++- src/pkg/repository/repository_test.go | 10 +- .../repository/repository_unexported_test.go | 295 +++++++++++++++++- src/pkg/selectors/selectors_reduce_test.go | 2 +- src/pkg/store/backup.go | 13 - src/pkg/store/backup_test.go | 38 --- 44 files changed, 1613 insertions(+), 573 deletions(-) create mode 100644 src/internal/streamstore/collectables.go create mode 100644 src/internal/streamstore/collectables_test.go delete mode 100644 src/internal/streamstore/details.go delete mode 100644 src/internal/streamstore/details_test.go delete mode 100644 src/internal/streamstore/mock/details.go create mode 100644 src/internal/streamstore/mock/mock.go rename src/pkg/{selectors/testdata/details.go => backup/details/testdata/testdata.go} (100%) create mode 100644 src/pkg/fault/testdata/testdata.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 57c1be35a..b92307c01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Sharepoint library (document files) support: backup, list, details, and restore. - OneDrive item downloads that return 404 during backup (normally due to external deletion while Corso processes) are now skipped instead of quietly dropped. These items will appear in the skipped list alongside other skipped cases such as malware detection. +- Listing a single backup by id will also list the skipped and failed items that occurred during the backup. These can be filtered out with the flags `--failed-items hide`, `--skipped-items hide`, and `--recovered-errors hide`. ### Fixed - Fix repo connect not working without a config file diff --git a/src/cli/backup/backup.go b/src/cli/backup/backup.go index 83d0870a2..801d0b97d 100644 --- a/src/cli/backup/backup.go +++ b/src/cli/backup/backup.go @@ -2,6 +2,7 @@ package backup import ( "context" + "strings" "github.com/hashicorp/go-multierror" @@ -21,19 +22,9 @@ import ( "github.com/spf13/cobra" ) -// ============================================== -// Folder Object flags -// These options are flags for indicating -// that a time-based filter should be used for -// within returning objects for details. -// Used by: OneDrive, SharePoint -// ================================================ -var ( - fileCreatedAfter string - fileCreatedBefore string - fileModifiedAfter string - fileModifiedBefore string -) +// --------------------------------------------------------------------------- +// adding commands to cobra +// --------------------------------------------------------------------------- var subCommandFuncs = []func() *cobra.Command{ createCmd, @@ -63,6 +54,49 @@ func AddCommands(cmd *cobra.Command) { } } +// --------------------------------------------------------------------------- +// common flags and flag attachers for commands +// --------------------------------------------------------------------------- + +var ( + fileCreatedAfter string + fileCreatedBefore string + fileModifiedAfter string + fileModifiedBefore string +) + +// list output filter flags +var ( + failedItemsFN = "failed-items" + listFailedItems string + skippedItemsFN = "skipped-items" + listSkippedItems string + recoveredErrorsFN = "recovered-errors" + listRecoveredErrors string +) + +func addFailedItemsFN(cmd *cobra.Command) { + cmd.Flags().StringVar( + &listFailedItems, failedItemsFN, "show", + "Toggles showing or hiding the list of items that failed.") +} + +func addSkippedItemsFN(cmd *cobra.Command) { + cmd.Flags().StringVar( + &listSkippedItems, skippedItemsFN, "show", + "Toggles showing or hiding the list of items that were skipped.") +} + +func addRecoveredErrorsFN(cmd *cobra.Command) { + cmd.Flags().StringVar( + &listRecoveredErrors, recoveredErrorsFN, "show", + "Toggles showing or hiding the list of errors which corso recovered from.") +} + +// --------------------------------------------------------------------------- +// commands +// --------------------------------------------------------------------------- + // The backup category of commands. // `corso backup [] [...]` func backupCmd() *cobra.Command { @@ -107,7 +141,7 @@ var listCommand = "list" func listCmd() *cobra.Command { return &cobra.Command{ Use: listCommand, - Short: "List the history of backups for a service", + Short: "List the history of backups", RunE: handleListCmd, Args: cobra.NoArgs, } @@ -126,7 +160,7 @@ var detailsCommand = "details" func detailsCmd() *cobra.Command { return &cobra.Command{ Use: detailsCommand, - Short: "Shows the details of a backup for a service", + Short: "Shows the details of a backup", RunE: handleDetailsCmd, Args: cobra.NoArgs, } @@ -145,7 +179,7 @@ var deleteCommand = "delete" func deleteCmd() *cobra.Command { return &cobra.Command{ Use: deleteCommand, - Short: "Deletes a backup for a service", + Short: "Deletes a backup", RunE: handleDeleteCmd, Args: cobra.NoArgs, } @@ -157,6 +191,10 @@ func handleDeleteCmd(cmd *cobra.Command, args []string) error { return cmd.Help() } +// --------------------------------------------------------------------------- +// common handlers +// --------------------------------------------------------------------------- + func runBackups( ctx context.Context, r repository.Repository, @@ -176,8 +214,7 @@ func runBackups( "Failed to initialize %s backup for %s %s", serviceName, resourceOwnerType, - discSel.DiscreteOwner, - )) + discSel.DiscreteOwner)) continue } @@ -189,8 +226,7 @@ func runBackups( "Failed to run %s backup for %s %s", serviceName, resourceOwnerType, - discSel.DiscreteOwner, - )) + discSel.DiscreteOwner)) continue } @@ -250,10 +286,10 @@ func genericListCommand(cmd *cobra.Command, bID string, service path.ServiceType defer utils.CloseRepo(ctx, r) - if len(backupID) > 0 { - b, err := r.Backup(ctx, model.StableID(bID)) - if err != nil { - if errors.Is(err, data.ErrNotFound) { + if len(bID) > 0 { + fe, b, errs := r.GetBackupErrors(ctx, bID) + if errs.Failure() != nil { + if errors.Is(errs.Failure(), data.ErrNotFound) { return Only(ctx, errors.Errorf("No backup exists with the id %s", bID)) } @@ -261,6 +297,7 @@ func genericListCommand(cmd *cobra.Command, bID string, service path.ServiceType } b.Print(ctx) + fe.PrintItems(ctx, !ifShow(listFailedItems), !ifShow(listSkippedItems), !ifShow(listRecoveredErrors)) return nil } @@ -288,3 +325,7 @@ func getAccountAndConnect(ctx context.Context) (repository.Repository, *account. return r, &cfg.Account, nil } + +func ifShow(flag string) bool { + return strings.ToLower(strings.TrimSpace(flag)) == "show" +} diff --git a/src/cli/backup/exchange.go b/src/cli/backup/exchange.go index ec6a543d8..b1da988ba 100644 --- a/src/cli/backup/exchange.go +++ b/src/cli/backup/exchange.go @@ -7,6 +7,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" + "github.com/alcionai/clues" "github.com/alcionai/corso/src/cli/options" . "github.com/alcionai/corso/src/cli/print" "github.com/alcionai/corso/src/cli/utils" @@ -123,7 +124,11 @@ func addExchangeCommands(cmd *cobra.Command) *cobra.Command { fs.StringVar(&backupID, "backup", "", - "ID of the backup to retrieve.") + "Display a specific backup, including the items that failed or were skipped during processing.") + + addFailedItemsFN(c) + addSkippedItemsFN(c) + addRecoveredErrorsFN(c) case detailsCommand: c, fs = utils.AddCommand(cmd, exchangeDetailsCmd()) @@ -425,7 +430,9 @@ func runDetailsExchangeCmd( return nil, err } - d, _, errs := r.BackupDetails(ctx, backupID) + ctx = clues.Add(ctx, "backup_id", backupID) + + d, _, errs := r.GetBackupDetails(ctx, backupID) // TODO: log/track recoverable errors if errs.Failure() != nil { if errors.Is(errs.Failure(), data.ErrNotFound) { diff --git a/src/cli/backup/exchange_e2e_test.go b/src/cli/backup/exchange_e2e_test.go index 89360bfc3..d8bcb7fdf 100644 --- a/src/cli/backup/exchange_e2e_test.go +++ b/src/cli/backup/exchange_e2e_test.go @@ -333,7 +333,7 @@ func (suite *PreparedBackupExchangeE2ESuite) SetupSuite() { b, err := suite.repo.Backup(ctx, bop.Results.BackupID) require.NoError(t, err, "retrieving recent backup by ID") require.Equal(t, bIDs, string(b.ID), "repo backup matches results id") - _, b, errs := suite.repo.BackupDetails(ctx, bIDs) + _, b, errs := suite.repo.GetBackupDetails(ctx, bIDs) require.NoError(t, errs.Failure(), "retrieving recent backup details by ID") require.Empty(t, errs.Recovered(), "retrieving recent backup details by ID") require.Equal(t, bIDs, string(b.ID), "repo details matches results id") @@ -445,7 +445,7 @@ func (suite *PreparedBackupExchangeE2ESuite) TestExchangeDetailsCmd() { bID := suite.backupOps[set] // fetch the details from the repo first - deets, _, errs := suite.repo.BackupDetails(ctx, string(bID)) + deets, _, errs := suite.repo.GetBackupDetails(ctx, string(bID)) require.NoError(t, errs.Failure(), clues.ToCore(errs.Failure())) require.Empty(t, errs.Recovered()) diff --git a/src/cli/backup/onedrive.go b/src/cli/backup/onedrive.go index f5662f6ab..4a623e22a 100644 --- a/src/cli/backup/onedrive.go +++ b/src/cli/backup/onedrive.go @@ -7,6 +7,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" + "github.com/alcionai/clues" "github.com/alcionai/corso/src/cli/options" . "github.com/alcionai/corso/src/cli/print" "github.com/alcionai/corso/src/cli/utils" @@ -79,8 +80,12 @@ func addOneDriveCommands(cmd *cobra.Command) *cobra.Command { c, fs = utils.AddCommand(cmd, oneDriveListCmd()) fs.StringVar(&backupID, - utils.BackupFN, "", - "ID of the backup to retrieve.") + "backup", "", + "Display a specific backup, including the items that failed or were skipped during processing.") + + addFailedItemsFN(c) + addSkippedItemsFN(c) + addRecoveredErrorsFN(c) case detailsCommand: c, fs = utils.AddCommand(cmd, oneDriveDetailsCmd()) @@ -306,7 +311,9 @@ func runDetailsOneDriveCmd( return nil, err } - d, _, errs := r.BackupDetails(ctx, backupID) + ctx = clues.Add(ctx, "backup_id", backupID) + + d, _, errs := r.GetBackupDetails(ctx, backupID) // TODO: log/track recoverable errors if errs.Failure() != nil { if errors.Is(errs.Failure(), data.ErrNotFound) { diff --git a/src/cli/backup/sharepoint.go b/src/cli/backup/sharepoint.go index b6bfc6780..44b69c389 100644 --- a/src/cli/backup/sharepoint.go +++ b/src/cli/backup/sharepoint.go @@ -7,6 +7,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" + "github.com/alcionai/clues" "github.com/alcionai/corso/src/cli/options" . "github.com/alcionai/corso/src/cli/print" "github.com/alcionai/corso/src/cli/utils" @@ -104,8 +105,12 @@ func addSharePointCommands(cmd *cobra.Command) *cobra.Command { c, fs = utils.AddCommand(cmd, sharePointListCmd()) fs.StringVar(&backupID, - utils.BackupFN, "", - "ID of the backup to retrieve.") + "backup", "", + "Display a specific backup, including the items that failed or were skipped during processing.") + + addFailedItemsFN(c) + addSkippedItemsFN(c) + addRecoveredErrorsFN(c) case detailsCommand: c, fs = utils.AddCommand(cmd, sharePointDetailsCmd()) @@ -451,7 +456,9 @@ func runDetailsSharePointCmd( return nil, err } - d, _, errs := r.BackupDetails(ctx, backupID) + ctx = clues.Add(ctx, "backup_id", backupID) + + d, _, errs := r.GetBackupDetails(ctx, backupID) // TODO: log/track recoverable errors if errs.Failure() != nil { if errors.Is(errs.Failure(), data.ErrNotFound) { diff --git a/src/cli/backup/sharepoint_e2e_test.go b/src/cli/backup/sharepoint_e2e_test.go index 518ce9a2d..bb6f951dd 100644 --- a/src/cli/backup/sharepoint_e2e_test.go +++ b/src/cli/backup/sharepoint_e2e_test.go @@ -193,8 +193,8 @@ func (suite *BackupDeleteSharePointE2ESuite) TestSharePointBackupDeleteCmd() { require.NoError(t, err, clues.ToCore(err)) result := suite.recorder.String() - - assert.Equal(t, fmt.Sprintf("Deleted SharePoint backup %s\n", string(suite.backupOp.Results.BackupID)), result) + expect := fmt.Sprintf("Deleted SharePoint backup %s\n", string(suite.backupOp.Results.BackupID)) + assert.Equal(t, expect, result) } // moved out of the func above to make the linter happy diff --git a/src/cli/restore/exchange_e2e_test.go b/src/cli/restore/exchange_e2e_test.go index 45c567ebe..ccd973fe9 100644 --- a/src/cli/restore/exchange_e2e_test.go +++ b/src/cli/restore/exchange_e2e_test.go @@ -112,7 +112,7 @@ func (suite *RestoreExchangeE2ESuite) SetupSuite() { _, err = suite.repo.Backup(ctx, bop.Results.BackupID) require.NoError(t, err, "retrieving recent backup by ID", clues.ToCore(err)) - _, _, errs := suite.repo.BackupDetails(ctx, string(bop.Results.BackupID)) + _, _, errs := suite.repo.GetBackupDetails(ctx, string(bop.Results.BackupID)) require.NoError(t, errs.Failure(), "retrieving recent backup details by ID", clues.ToCore(err)) require.Empty(t, errs.Recovered(), "retrieving recent backup details by ID") } diff --git a/src/cli/utils/testdata/opts.go b/src/cli/utils/testdata/opts.go index 2f0891432..d01016402 100644 --- a/src/cli/utils/testdata/opts.go +++ b/src/cli/utils/testdata/opts.go @@ -10,9 +10,10 @@ import ( "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/backup/details/testdata" "github.com/alcionai/corso/src/pkg/fault" + ftd "github.com/alcionai/corso/src/pkg/fault/testdata" "github.com/alcionai/corso/src/pkg/selectors" - "github.com/alcionai/corso/src/pkg/selectors/testdata" "github.com/alcionai/corso/src/pkg/store" ) @@ -551,7 +552,9 @@ var ( // (selectors/testdata.GetDetailsSet(), nil, nil) when BackupDetails is called // on the nil instance. If an instance is given or Backups is called returns an // error. -type MockBackupGetter struct{} +type MockBackupGetter struct { + failure, recovered, skipped bool +} func (MockBackupGetter) Backup( context.Context, @@ -574,7 +577,7 @@ func (MockBackupGetter) BackupsByTag( return nil, errors.New("unexpected call to mock") } -func (bg *MockBackupGetter) BackupDetails( +func (bg *MockBackupGetter) GetBackupDetails( ctx context.Context, backupID string, ) (*details.Details, *backup.Backup, *fault.Bus) { @@ -584,3 +587,15 @@ func (bg *MockBackupGetter) BackupDetails( return nil, nil, fault.New(false).Fail(errors.New("unexpected call to mock")) } + +func (bg *MockBackupGetter) GetBackupErrors( + ctx context.Context, + backupID string, +) (*fault.Errors, *backup.Backup, *fault.Bus) { + if bg == nil { + fe := ftd.MakeErrors(bg.failure, bg.recovered, bg.skipped) + return &fe, nil, fault.New(true) + } + + return nil, nil, fault.New(false).Fail(errors.New("unexpected call to mock")) +} diff --git a/src/cmd/purge/purge.go b/src/cmd/purge/purge.go index 3de643aa5..74be0e805 100644 --- a/src/cmd/purge/purge.go +++ b/src/cmd/purge/purge.go @@ -145,7 +145,8 @@ func runPurgeForEachUser( } if len(ferrs.Errors().Recovered) > 0 { - errs = multierror.Append(errs, ferrs.Errors().Recovered...) + // TODO(keepers): remove multierr + errs = multierror.Append(errs, ferrs.Recovered()...) } for _, u := range userOrUsers(user, users) { diff --git a/src/go.mod b/src/go.mod index ee2766765..de9ef11d9 100644 --- a/src/go.mod +++ b/src/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 github.com/alcionai/clues v0.0.0-20230314154528-c469e1adafb6 - github.com/aws/aws-sdk-go v1.44.218 + github.com/aws/aws-sdk-go v1.44.220 github.com/aws/aws-xray-sdk-go v1.8.1 github.com/cenkalti/backoff/v4 v4.2.0 github.com/google/uuid v1.3.0 diff --git a/src/go.sum b/src/go.sum index d10d974e4..276367cd7 100644 --- a/src/go.sum +++ b/src/go.sum @@ -62,8 +62,8 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5 github.com/alessio/shellescape v1.4.1 h1:V7yhSDDn8LP4lc4jS8pFkt0zCnzVJlG5JXy9BVKJUX0= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= -github.com/aws/aws-sdk-go v1.44.218 h1:p707+xOCazWhkSpZOeyhtTcg7Z+asxxvueGgYPSitn4= -github.com/aws/aws-sdk-go v1.44.218/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/aws/aws-sdk-go v1.44.220 h1:yAj99qAt0Htjle9Up3DglgHfOP77lmFPrElA4jKnrBo= +github.com/aws/aws-sdk-go v1.44.220/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/aws/aws-xray-sdk-go v1.8.1 h1:O4pXV+hnCskaamGsZnFpzHyAmgPGusBMN6i7nnsy0Fo= github.com/aws/aws-xray-sdk-go v1.8.1/go.mod h1:wMmVYzej3sykAttNBkXQHK/+clAPWTOrPiajEk7Cp3A= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= diff --git a/src/internal/connector/discovery/discovery_test.go b/src/internal/connector/discovery/discovery_test.go index 43e880e4b..0c62829ea 100644 --- a/src/internal/connector/discovery/discovery_test.go +++ b/src/internal/connector/discovery/discovery_test.go @@ -40,7 +40,7 @@ func (suite *DiscoveryIntegrationSuite) TestUsers() { assert.NoError(t, err, clues.ToCore(err)) ferrs := errs.Errors() - assert.NoError(t, ferrs.Failure, clues.ToCore(ferrs.Failure)) + assert.Nil(t, ferrs.Failure) assert.Empty(t, ferrs.Recovered) assert.Less(t, 0, len(users)) diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 8a404f3d5..fc363d3a2 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -457,9 +457,8 @@ func (w Wrapper) DeleteSnapshot( snapshotID string, ) error { mid := manifest.ID(snapshotID) - if len(mid) == 0 { - return clues.New("attempt to delete unidentified snapshot").WithClues(ctx) + return clues.New("snapshot ID required for deletion").WithClues(ctx) } err := repo.WriteSession( diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index fc435f205..2555dbc27 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -120,9 +120,9 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { // ----- var ( - opStats backupStats - startTime = time.Now() - detailsStore = streamstore.NewDetails(op.kopia, op.account.ID(), op.Selectors.PathService()) + opStats backupStats + startTime = time.Now() + sstore = streamstore.NewStreamer(op.kopia, op.account.ID(), op.Selectors.PathService()) ) op.Results.BackupID = model.StableID(uuid.NewString()) @@ -151,7 +151,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { deets, err := op.do( ctx, &opStats, - detailsStore, + sstore, op.Results.BackupID) if err != nil { // No return here! We continue down to persistResults, even in case of failure. @@ -201,7 +201,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { err = op.createBackupModels( ctx, - detailsStore, + sstore, opStats.k.SnapshotID, op.Results.BackupID, deets.Details()) @@ -505,8 +505,7 @@ func mergeDetails( deets *details.Builder, errs *fault.Bus, ) error { - // Don't bother loading any of the base details if there's nothing we need to - // merge. + // Don't bother loading any of the base details if there's nothing we need to merge. if len(shortRefsFromPrevBackup) == 0 { return nil } @@ -658,30 +657,45 @@ func (op *BackupOperation) persistResults( // stores the operation details, results, and selectors in the backup manifest. func (op *BackupOperation) createBackupModels( ctx context.Context, - detailsStore streamstore.Writer, + sscw streamstore.CollectorWriter, snapID string, backupID model.StableID, backupDetails *details.Details, ) error { ctx = clues.Add(ctx, "snapshot_id", snapID) + // generate a new fault bus so that we can maintain clean + // separation between the errors we serialize and those that + // are generated during the serialization process. + errs := fault.New(true) if backupDetails == nil { return clues.New("no backup details to record").WithClues(ctx) } - detailsID, err := detailsStore.Write(ctx, backupDetails, op.Errors) + err := sscw.Collect(ctx, streamstore.DetailsCollector(backupDetails)) if err != nil { - return clues.Wrap(err, "creating backupDetails model").WithClues(ctx) + return clues.Wrap(err, "creating backupDetails persistence").WithClues(ctx) } - ctx = clues.Add(ctx, "details_id", detailsID) + err = sscw.Collect(ctx, streamstore.FaultErrorsCollector(op.Errors.Errors())) + if err != nil { + return clues.Wrap(err, "creating errors persistence").WithClues(ctx) + } + + ssid, err := sscw.Write(ctx, errs) + if err != nil { + return clues.Wrap(err, "persisting details and errors").WithClues(ctx) + } + + ctx = clues.Add(ctx, "streamstore_snapshot_id", ssid) b := backup.New( - snapID, detailsID, op.Status.String(), + snapID, ssid, + op.Status.String(), backupID, op.Selectors, op.Results.ReadWrites, op.Results.StartAndEndTime, - op.Errors) + errs) if err = op.store.Put(ctx, model.BackupSchema, b); err != nil { return clues.Wrap(err, "creating backup model").WithClues(ctx) diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index 0d22e3e77..2bc4678ba 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -1094,30 +1094,6 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_oneDrive() { runAndCheckBackup(t, ctx, &bo, mb) } -// nolint: unused -func mustGetDefaultDriveID( - t *testing.T, - ctx context.Context, //revive:disable-line:context-as-argument - service graph.Servicer, - userID string, -) string { - d, err := service.Client().UsersById(userID).Drive().Get(ctx, nil) - if err != nil { - err = graph.Wrap( - ctx, - err, - "retrieving default user drive"). - With("user", userID) - } - - require.NoError(t, err, clues.ToCore(err)) - - id := ptr.Val(d.GetId()) - require.NotEmpty(t, id, "drive ID not set") - - return id -} - // TestBackup_Run ensures that Integration Testing works for OneDrive func (suite *BackupOpIntegrationSuite) TestBackup_Run_oneDriveIncrementals() { // TODO: Enable once we have https://github.com/alcionai/corso/pull/2642 @@ -1159,6 +1135,31 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_oneDriveIncrementals() { fault.New(true)) require.NoError(t, err, clues.ToCore(err)) + // TODO: whomever can figure out a way to declare this outside of this func + // and not have the linter complain about unused is welcome to do so. + mustGetDefaultDriveID := func( + t *testing.T, + ctx context.Context, //revive:disable-line:context-as-argument + service graph.Servicer, + userID string, + ) string { + d, err := service.Client().UsersById(userID).Drive().Get(ctx, nil) + if err != nil { + err = graph.Wrap( + ctx, + err, + "retrieving default user drive"). + With("user", userID) + } + + require.NoError(t, err) + + id := ptr.Val(d.GetId()) + require.NotEmpty(t, id, "drive ID not set") + + return id + } + driveID := mustGetDefaultDriveID(t, ctx, gc.Service, suite.user) fileDBF := func(id, timeStamp, subject, body string) []byte { diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index f61b1291a..f74b6f8b8 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -1202,7 +1202,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsItems ctx, flush := tester.NewContext() defer flush() - mds := ssmock.DetailsStreamer{Entries: test.populatedDetails} + mds := ssmock.Streamer{Deets: test.populatedDetails} w := &store.Wrapper{Storer: mockBackupStorer{entries: test.populatedModels}} deets := details.Builder{} @@ -1317,7 +1317,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsFolde defer flush() var ( - mds = ssmock.DetailsStreamer{Entries: populatedDetails} + mds = ssmock.Streamer{Deets: populatedDetails} w = &store.Wrapper{Storer: mockBackupStorer{entries: populatedModels}} deets = details.Builder{} ) diff --git a/src/internal/operations/common.go b/src/internal/operations/common.go index c590c9cc3..98bbe2b90 100644 --- a/src/internal/operations/common.go +++ b/src/internal/operations/common.go @@ -3,6 +3,7 @@ package operations import ( "context" + "github.com/alcionai/clues" "github.com/pkg/errors" "github.com/alcionai/corso/src/internal/model" @@ -20,18 +21,27 @@ func getBackupAndDetailsFromID( detailsStore streamstore.Reader, errs *fault.Bus, ) (*backup.Backup, *details.Details, error) { - dID, bup, err := ms.GetDetailsIDFromBackupID(ctx, backupID) + bup, err := ms.GetBackup(ctx, backupID) if err != nil { return nil, nil, errors.Wrap(err, "getting backup details ID") } var ( deets details.Details - umt = details.UnmarshalTo(&deets) + umt = streamstore.DetailsReader(details.UnmarshalTo(&deets)) + ssid = bup.StreamStoreID ) - if err := detailsStore.Read(ctx, dID, umt, errs); err != nil { - return nil, nil, errors.Wrap(err, "getting backup details data") + if len(ssid) == 0 { + ssid = bup.DetailsID + } + + if len(ssid) == 0 { + return bup, nil, clues.New("no details or errors in backup").WithClues(ctx) + } + + if err := detailsStore.Read(ctx, ssid, umt, errs); err != nil { + return nil, nil, errors.Wrap(err, "reading backup data from streamstore") } return bup, &deets, nil diff --git a/src/internal/operations/manifests.go b/src/internal/operations/manifests.go index e625cd09d..36824840f 100644 --- a/src/internal/operations/manifests.go +++ b/src/internal/operations/manifests.go @@ -30,18 +30,18 @@ type manifestRestorer interface { restorer } -type getDetailsIDer interface { - GetDetailsIDFromBackupID( +type getBackuper interface { + GetBackup( ctx context.Context, backupID model.StableID, - ) (string, *backup.Backup, error) + ) (*backup.Backup, error) } // calls kopia to retrieve prior backup manifests, metadata collections to supply backup heuristics. func produceManifestsAndMetadata( ctx context.Context, mr manifestRestorer, - gdi getDetailsIDer, + gb getBackuper, reasons []kopia.Reason, tenantID string, getMetadata bool, @@ -93,27 +93,32 @@ func produceManifestsAndMetadata( mctx = clues.Add(mctx, "manifest_backup_id", man.ID) - dID, _, err := gdi.GetDetailsIDFromBackupID(mctx, model.StableID(bID)) - if err != nil { - // if no backup exists for any of the complete manifests, we want - // to fall back to a complete backup. - if errors.Is(err, data.ErrNotFound) { - logger.Ctx(ctx).Infow("backup missing, falling back to full backup", clues.In(mctx).Slice()...) - return ms, nil, false, nil - } + bup, err := gb.GetBackup(mctx, model.StableID(bID)) + // if no backup exists for any of the complete manifests, we want + // to fall back to a complete backup. + if errors.Is(err, data.ErrNotFound) { + logger.Ctx(ctx).Infow("backup missing, falling back to full backup", clues.In(mctx).Slice()...) + return ms, nil, false, nil + } + if err != nil { return nil, nil, false, errors.Wrap(err, "retrieving prior backup data") } - mctx = clues.Add(mctx, "manifest_details_id", dID) + ssid := bup.StreamStoreID + if len(ssid) == 0 { + ssid = bup.DetailsID + } + + mctx = clues.Add(mctx, "manifest_streamstore_id", ssid) // if no detailsID exists for any of the complete manifests, we want // to fall back to a complete backup. This is a temporary prevention // mechanism to keep backups from falling into a perpetually bad state. // This makes an assumption that the ID points to a populated set of // details; we aren't doing the work to look them up. - if len(dID) == 0 { - logger.Ctx(ctx).Infow("backup missing details ID, falling back to full backup", clues.In(mctx).Slice()...) + if len(ssid) == 0 { + logger.Ctx(ctx).Infow("backup missing streamstore ID, falling back to full backup", clues.In(mctx).Slice()...) return ms, nil, false, nil } diff --git a/src/internal/operations/manifests_test.go b/src/internal/operations/manifests_test.go index c341ca12c..0611db4ec 100644 --- a/src/internal/operations/manifests_test.go +++ b/src/internal/operations/manifests_test.go @@ -37,16 +37,20 @@ func (mmr mockManifestRestorer) FetchPrevSnapshotManifests( return mmr.mans, mmr.mrErr } -type mockGetDetailsIDer struct { - detailsID string - err error +type mockGetBackuper struct { + detailsID string + streamstoreID string + err error } -func (mg mockGetDetailsIDer) GetDetailsIDFromBackupID( +func (mg mockGetBackuper) GetBackup( ctx context.Context, backupID model.StableID, -) (string, *backup.Backup, error) { - return mg.detailsID, nil, mg.err +) (*backup.Backup, error) { + return &backup.Backup{ + DetailsID: mg.detailsID, + StreamStoreID: mg.streamstoreID, + }, mg.err } type mockColl struct { @@ -432,7 +436,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { table := []struct { name string mr mockManifestRestorer - gdi mockGetDetailsIDer + gb mockGetBackuper reasons []kopia.Reason getMeta bool assertErr assert.ErrorAssertionFunc @@ -446,7 +450,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { mockRestorer: mockRestorer{}, mans: []*kopia.ManifestEntry{}, }, - gdi: mockGetDetailsIDer{detailsID: did}, + gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, getMeta: false, assertErr: assert.NoError, @@ -459,7 +463,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { mockRestorer: mockRestorer{}, mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "", "", "")}, }, - gdi: mockGetDetailsIDer{detailsID: did}, + gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, getMeta: false, assertErr: assert.NoError, @@ -472,7 +476,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { mockRestorer: mockRestorer{}, mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "", "ir", "")}, }, - gdi: mockGetDetailsIDer{detailsID: did}, + gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, getMeta: false, assertErr: assert.NoError, @@ -485,7 +489,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { mockRestorer: mockRestorer{}, mrErr: assert.AnError, }, - gdi: mockGetDetailsIDer{detailsID: did}, + gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, getMeta: true, assertErr: assert.Error, @@ -501,7 +505,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { makeMan(path.EmailCategory, "", "", ""), }, }, - gdi: mockGetDetailsIDer{detailsID: did}, + gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, getMeta: true, assertErr: assert.NoError, // No error, even though verify failed. @@ -514,7 +518,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { mockRestorer: mockRestorer{}, mans: []*kopia.ManifestEntry{}, }, - gdi: mockGetDetailsIDer{detailsID: did}, + gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, getMeta: true, assertErr: assert.NoError, @@ -530,7 +534,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { makeMan(path.ContactsCategory, "", "ir", ""), }, }, - gdi: mockGetDetailsIDer{detailsID: did}, + gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, getMeta: true, assertErr: assert.NoError, @@ -545,7 +549,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { }}, mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "id", "", "")}, }, - gdi: mockGetDetailsIDer{detailsID: did}, + gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, getMeta: true, assertErr: assert.Error, @@ -558,7 +562,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { mockRestorer: mockRestorer{}, mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "", "", "bid")}, }, - gdi: mockGetDetailsIDer{}, + gb: mockGetBackuper{}, reasons: []kopia.Reason{}, getMeta: true, assertErr: assert.NoError, @@ -576,7 +580,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { makeMan(path.EmailCategory, "incmpl_id", "ir", ""), }, }, - gdi: mockGetDetailsIDer{detailsID: did}, + gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, getMeta: true, assertErr: assert.NoError, @@ -591,7 +595,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { }}, mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "id", "", "bid")}, }, - gdi: mockGetDetailsIDer{detailsID: did}, + gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, getMeta: true, assertErr: assert.NoError, @@ -610,7 +614,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { makeMan(path.ContactsCategory, "contact", "", "bid"), }, }, - gdi: mockGetDetailsIDer{detailsID: did}, + gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, getMeta: true, assertErr: assert.NoError, @@ -626,7 +630,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { mockRestorer: mockRestorer{err: assert.AnError}, mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "", "", "bid")}, }, - gdi: mockGetDetailsIDer{detailsID: did}, + gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, getMeta: true, assertErr: assert.Error, @@ -645,7 +649,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { mans, dcs, b, err := produceManifestsAndMetadata( ctx, &test.mr, - &test.gdi, + &test.gb, test.reasons, tid, test.getMeta, diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 36046f531..36986c8fb 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -118,8 +118,8 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De bytesRead: &stats.ByteCounter{}, restoreID: uuid.NewString(), } - start = time.Now() - detailsStore = streamstore.NewDetails(op.kopia, op.account.ID(), op.Selectors.PathService()) + start = time.Now() + sstore = streamstore.NewStreamer(op.kopia, op.account.ID(), op.Selectors.PathService()) ) // ----- @@ -144,7 +144,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De // Execution // ----- - deets, err := op.do(ctx, &opStats, detailsStore, start) + deets, err := op.do(ctx, &opStats, sstore, start) if err != nil { // No return here! We continue down to persistResults, even in case of failure. logger.Ctx(ctx). diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index 223d4a7a6..97572436e 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -375,7 +375,7 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() { ds, err := ro.Run(ctx) - require.NoError(t, err, "restoreOp.Run()", clues.ToCore(err)) + require.NoError(t, err, "restoreOp.Run() %+v", clues.ToCore(err)) require.NotEmpty(t, ro.Results, "restoreOp results") require.NotNil(t, ds, "restored details") assert.Equal(t, ro.Status, Completed, "restoreOp status") diff --git a/src/internal/streamstore/collectables.go b/src/internal/streamstore/collectables.go new file mode 100644 index 000000000..945ad8e5d --- /dev/null +++ b/src/internal/streamstore/collectables.go @@ -0,0 +1,63 @@ +package streamstore + +type Collectable struct { + mr Marshaller + Unmr Unmarshaller + itemName string + purpose string + Type string +} + +const ( + FaultErrorsType = "fault_error" + faultErrorsItemName = "fault_error" + faultErrorsPurpose = "fault_error" + + DetailsType = "details" + detailsItemName = "details" + detailsPurpose = "details" +) + +// FaultErrorsCollector generates a collection of fault.Errors +// containing the marshalled bytes from the provided marshaller. +func FaultErrorsCollector(mr Marshaller) Collectable { + return Collectable{ + mr: mr, + itemName: faultErrorsItemName, + purpose: faultErrorsPurpose, + Type: FaultErrorsType, + } +} + +// DetailsCollector generates a collection of details.DetailsModel +// entries containing the marshalled bytes from the provided marshaller. +func DetailsCollector(mr Marshaller) Collectable { + return Collectable{ + mr: mr, + itemName: detailsItemName, + purpose: detailsPurpose, + Type: DetailsType, + } +} + +// FaultErrorsReader reads a collection of fault.Errors +// entries using the provided unmarshaller. +func FaultErrorsReader(unmr Unmarshaller) Collectable { + return Collectable{ + Unmr: unmr, + itemName: faultErrorsItemName, + purpose: faultErrorsPurpose, + Type: FaultErrorsType, + } +} + +// DetailsReader reads a collection of details.DetailsModel +// entries using the provided unmarshaller. +func DetailsReader(unmr Unmarshaller) Collectable { + return Collectable{ + Unmr: unmr, + itemName: detailsItemName, + purpose: detailsPurpose, + Type: DetailsType, + } +} diff --git a/src/internal/streamstore/collectables_test.go b/src/internal/streamstore/collectables_test.go new file mode 100644 index 000000000..030293738 --- /dev/null +++ b/src/internal/streamstore/collectables_test.go @@ -0,0 +1,218 @@ +package streamstore + +import ( + "testing" + + "github.com/alcionai/clues" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/kopia" + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/path" +) + +type StreamStoreIntgSuite struct { + tester.Suite + kcloser func() + kwcloser func() + ss Streamer +} + +func TestStreamStoreIntgSuite(t *testing.T) { + suite.Run(t, &StreamStoreIntgSuite{ + Suite: tester.NewIntegrationSuite( + t, + [][]string{tester.AWSStorageCredEnvs}), + }) +} + +func (suite *StreamStoreIntgSuite) SetupSubTest() { + ctx, flush := tester.NewContext() + defer flush() + + t := suite.T() + + // need to initialize the repository before we can test connecting to it. + st := tester.NewPrefixedS3Storage(t) + + k := kopia.NewConn(st) + require.NoError(t, k.Initialize(ctx)) + + suite.kcloser = func() { k.Close(ctx) } + + kw, err := kopia.NewWrapper(k) + require.NoError(t, err) + + suite.kwcloser = func() { kw.Close(ctx) } + + suite.ss = NewStreamer(kw, "tenant", path.ExchangeService) +} + +func (suite *StreamStoreIntgSuite) TearDownSubTest() { + if suite.kcloser != nil { + defer suite.kcloser() + } + + if suite.kwcloser != nil { + defer suite.kwcloser() + } +} + +func (suite *StreamStoreIntgSuite) TestStreamer() { + table := []struct { + name string + deets func(*testing.T) *details.Details + errs func() *fault.Errors + hasSnapID assert.ValueAssertionFunc + }{ + { + name: "none", + deets: func(*testing.T) *details.Details { return nil }, + errs: func() *fault.Errors { return nil }, + hasSnapID: assert.Empty, + }, + { + name: "details", + deets: func(t *testing.T) *details.Details { + deetsBuilder := &details.Builder{} + require.NoError(t, deetsBuilder.Add( + "rr", "sr", "pr", "lr", + true, + details.ItemInfo{ + Exchange: &details.ExchangeInfo{Subject: "hello world"}, + })) + + return deetsBuilder.Details() + }, + errs: func() *fault.Errors { return nil }, + hasSnapID: assert.NotEmpty, + }, + { + name: "errors", + deets: func(*testing.T) *details.Details { return nil }, + errs: func() *fault.Errors { + bus := fault.New(false) + bus.Fail(clues.New("foo")) + bus.AddRecoverable(clues.New("bar")) + bus.AddRecoverable(fault.FileErr(clues.New("file"), "file-id", "file-name", map[string]any{"foo": "bar"})) + bus.AddSkip(fault.FileSkip(fault.SkipMalware, "file-id", "file-name", map[string]any{"foo": "bar"})) + + fe := bus.Errors() + return fe + }, + hasSnapID: assert.NotEmpty, + }, + { + name: "details and errors", + deets: func(t *testing.T) *details.Details { + deetsBuilder := &details.Builder{} + require.NoError(t, deetsBuilder.Add( + "rr", "sr", "pr", "lr", + true, + details.ItemInfo{ + Exchange: &details.ExchangeInfo{Subject: "hello world"}, + })) + + return deetsBuilder.Details() + }, + errs: func() *fault.Errors { + bus := fault.New(false) + bus.Fail(clues.New("foo")) + bus.AddRecoverable(clues.New("bar")) + bus.AddRecoverable(fault.FileErr(clues.New("file"), "file-id", "file-name", map[string]any{"foo": "bar"})) + bus.AddSkip(fault.FileSkip(fault.SkipMalware, "file-id", "file-name", map[string]any{"foo": "bar"})) + + fe := bus.Errors() + return fe + }, + hasSnapID: assert.NotEmpty, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + ctx, flush := tester.NewContext() + defer flush() + + var ( + t = suite.T() + ss = suite.ss + err error + ) + + deets := test.deets(t) + if deets != nil { + err = ss.Collect(ctx, DetailsCollector(deets)) + require.NoError(t, err) + } + + errs := test.errs() + if errs != nil { + err = ss.Collect(ctx, FaultErrorsCollector(errs)) + require.NoError(t, err) + } + + snapid, err := ss.Write(ctx, fault.New(true)) + require.NoError(t, err) + test.hasSnapID(t, snapid) + + if len(snapid) == 0 { + return + } + + var readDeets details.Details + if deets != nil { + err = ss.Read( + ctx, + snapid, + DetailsReader(details.UnmarshalTo(&readDeets)), + fault.New(true)) + require.NoError(t, err) + require.NotEmpty(t, readDeets) + + assert.Equal(t, len(deets.Entries), len(readDeets.Entries)) + assert.Equal(t, deets.Entries[0].ParentRef, readDeets.Entries[0].ParentRef) + assert.Equal(t, deets.Entries[0].ShortRef, readDeets.Entries[0].ShortRef) + assert.Equal(t, deets.Entries[0].RepoRef, readDeets.Entries[0].RepoRef) + assert.Equal(t, deets.Entries[0].LocationRef, readDeets.Entries[0].LocationRef) + assert.Equal(t, deets.Entries[0].Updated, readDeets.Entries[0].Updated) + assert.NotNil(t, readDeets.Entries[0].Exchange) + assert.Equal(t, *deets.Entries[0].Exchange, *readDeets.Entries[0].Exchange) + } else { + err := ss.Read( + ctx, + snapid, + DetailsReader(details.UnmarshalTo(&readDeets)), + fault.New(true)) + assert.ErrorIs(t, err, data.ErrNotFound) + assert.Empty(t, readDeets) + } + + var readErrs fault.Errors + if errs != nil { + err = ss.Read( + ctx, + snapid, + FaultErrorsReader(fault.UnmarshalErrorsTo(&readErrs)), + fault.New(true)) + require.NoError(t, err) + require.NotEmpty(t, readErrs) + + assert.ElementsMatch(t, errs.Skipped, readErrs.Skipped) + assert.ElementsMatch(t, errs.Recovered, readErrs.Recovered) + } else { + err := ss.Read( + ctx, + snapid, + FaultErrorsReader(fault.UnmarshalErrorsTo(&readErrs)), + fault.New(true)) + assert.ErrorIs(t, err, data.ErrNotFound) + assert.Empty(t, readErrs) + } + }) + } +} diff --git a/src/internal/streamstore/details.go b/src/internal/streamstore/details.go deleted file mode 100644 index a9f919f68..000000000 --- a/src/internal/streamstore/details.go +++ /dev/null @@ -1,159 +0,0 @@ -package streamstore - -import ( - "context" - - "github.com/alcionai/clues" - "github.com/alcionai/corso/src/internal/data" - "github.com/alcionai/corso/src/internal/kopia" - "github.com/alcionai/corso/src/internal/stats" - "github.com/alcionai/corso/src/pkg/fault" - "github.com/alcionai/corso/src/pkg/path" - "github.com/pkg/errors" -) - -var _ Streamer = &streamDetails{} - -type streamDetails struct { - kw *kopia.Wrapper - tenant string - service path.ServiceType -} - -// NewDetails creates a new storeStreamer for streaming -// details.Details structs. -func NewDetails( - kw *kopia.Wrapper, - tenant string, - service path.ServiceType, -) *streamDetails { - return &streamDetails{kw: kw, tenant: tenant, service: service} -} - -const ( - // detailsItemName is the name of the stream used to store - // backup details - detailsItemName = "details" - // collectionPurposeDetails is used to indicate - // what the collection is being used for - collectionPurposeDetails = "details" -) - -// Write persists a `details.Details` object in the stream store -func (ss *streamDetails) Write(ctx context.Context, deets Marshaller, errs *fault.Bus) (string, error) { - // construct the path of the container for the `details` item - p, err := path.Builder{}. - ToStreamStorePath( - ss.tenant, - collectionPurposeDetails, - ss.service, - false) - if err != nil { - return "", clues.Stack(err).WithClues(ctx) - } - - // TODO: We could use an io.Pipe here to avoid a double copy but that - // makes error handling a bit complicated - dbytes, err := deets.Marshal() - if err != nil { - return "", clues.Wrap(err, "marshalling backup details").WithClues(ctx) - } - - dc := &streamCollection{ - folderPath: p, - item: &streamItem{ - name: detailsItemName, - data: dbytes, - }, - } - - backupStats, _, _, err := ss.kw.BackupCollections( - ctx, - nil, - []data.BackupCollection{dc}, - nil, - nil, - false, - errs) - if err != nil { - return "", errors.Wrap(err, "storing details in repository") - } - - return backupStats.SnapshotID, nil -} - -// Read reads a `details.Details` object from the kopia repository -func (ss *streamDetails) Read( - ctx context.Context, - detailsID string, - umr Unmarshaller, - errs *fault.Bus, -) error { - // construct the path for the `details` item - detailsPath, err := path.Builder{}. - Append(detailsItemName). - ToStreamStorePath( - ss.tenant, - collectionPurposeDetails, - ss.service, - true, - ) - if err != nil { - return clues.Stack(err).WithClues(ctx) - } - - dcs, err := ss.kw.RestoreMultipleItems( - ctx, - detailsID, - []path.Path{detailsPath}, - &stats.ByteCounter{}, - errs) - if err != nil { - return errors.Wrap(err, "retrieving backup details data") - } - - // Expect only 1 data collection - if len(dcs) != 1 { - return clues.New("greater than 1 details collection found"). - WithClues(ctx). - With("collection_count", len(dcs)) - } - - var ( - dc = dcs[0] - found = false - items = dc.Items(ctx, errs) - ) - - for { - select { - case <-ctx.Done(): - return clues.New("context cancelled waiting for backup details data").WithClues(ctx) - - case itemData, ok := <-items: - if !ok { - if !found { - return clues.New("no backup details found").WithClues(ctx) - } - - return nil - } - - if err := umr(itemData.ToReader()); err != nil { - return clues.Wrap(err, "unmarshalling details data").WithClues(ctx) - } - - found = true - } - } -} - -// Delete deletes a `details.Details` object from the kopia repository -func (ss *streamDetails) Delete(ctx context.Context, detailsID string) error { - err := ss.kw.DeleteSnapshot(ctx, detailsID) - if err != nil { - return errors.Wrap(err, "deleting backup details") - } - - return nil -} diff --git a/src/internal/streamstore/details_test.go b/src/internal/streamstore/details_test.go deleted file mode 100644 index c0eee2e41..000000000 --- a/src/internal/streamstore/details_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package streamstore - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - - "github.com/alcionai/clues" - "github.com/alcionai/corso/src/internal/kopia" - "github.com/alcionai/corso/src/internal/tester" - "github.com/alcionai/corso/src/pkg/backup/details" - "github.com/alcionai/corso/src/pkg/fault" - "github.com/alcionai/corso/src/pkg/path" -) - -type StreamDetailsIntegrationSuite struct { - tester.Suite -} - -func TestStreamDetailsIntegrationSuite(t *testing.T) { - suite.Run(t, &StreamDetailsIntegrationSuite{ - Suite: tester.NewIntegrationSuite( - t, - [][]string{tester.AWSStorageCredEnvs}), - }) -} - -func (suite *StreamDetailsIntegrationSuite) TestDetails() { - ctx, flush := tester.NewContext() - defer flush() - - t := suite.T() - // need to initialize the repository before we can test connecting to it. - st := tester.NewPrefixedS3Storage(t) - k := kopia.NewConn(st) - - err := k.Initialize(ctx) - require.NoError(t, err, clues.ToCore(err)) - - defer k.Close(ctx) - - kw, err := kopia.NewWrapper(k) - require.NoError(t, err, clues.ToCore(err)) - - defer kw.Close(ctx) - - deetsBuilder := &details.Builder{} - - require.NoError( - t, - deetsBuilder.Add("ref", "shortref", "parentref", "locationRef", true, - details.ItemInfo{ - Exchange: &details.ExchangeInfo{ - Subject: "hello world", - }, - })) - - var ( - deets = deetsBuilder.Details() - sd = NewDetails(kw, "tenant", path.ExchangeService) - ) - - id, err := sd.Write(ctx, deets, fault.New(true)) - require.NoError(t, err, clues.ToCore(err)) - require.NotNil(t, id) - - var readDeets details.Details - err = sd.Read(ctx, id, details.UnmarshalTo(&readDeets), fault.New(true)) - require.NoError(t, err, clues.ToCore(err)) - require.NotEmpty(t, readDeets) - - assert.Equal(t, len(deets.Entries), len(readDeets.Entries)) - assert.Equal(t, deets.Entries[0].ParentRef, readDeets.Entries[0].ParentRef) - assert.Equal(t, deets.Entries[0].ShortRef, readDeets.Entries[0].ShortRef) - assert.Equal(t, deets.Entries[0].RepoRef, readDeets.Entries[0].RepoRef) - assert.Equal(t, deets.Entries[0].LocationRef, readDeets.Entries[0].LocationRef) - assert.Equal(t, deets.Entries[0].Updated, readDeets.Entries[0].Updated) - assert.NotNil(t, readDeets.Entries[0].Exchange) - assert.Equal(t, *deets.Entries[0].Exchange, *readDeets.Entries[0].Exchange) -} diff --git a/src/internal/streamstore/mock/details.go b/src/internal/streamstore/mock/details.go deleted file mode 100644 index 0a87d37df..000000000 --- a/src/internal/streamstore/mock/details.go +++ /dev/null @@ -1,48 +0,0 @@ -package mock - -import ( - "bytes" - "context" - "io" - - "github.com/pkg/errors" - - "github.com/alcionai/clues" - "github.com/alcionai/corso/src/internal/streamstore" - "github.com/alcionai/corso/src/pkg/backup/details" - "github.com/alcionai/corso/src/pkg/fault" -) - -var _ streamstore.Streamer = &DetailsStreamer{} - -type DetailsStreamer struct { - Entries map[string]*details.Details -} - -func (ds DetailsStreamer) Read( - ctx context.Context, - detailsID string, - umr streamstore.Unmarshaller, - errs *fault.Bus, -) error { - r := ds.Entries[detailsID] - - if r == nil { - return errors.Errorf("no details for ID %s", detailsID) - } - - bs, err := r.Marshal() - if err != nil { - return err - } - - return umr(io.NopCloser(bytes.NewReader(bs))) -} - -func (ds DetailsStreamer) Write(context.Context, streamstore.Marshaller, *fault.Bus) (string, error) { - return "", clues.New("not implmented") -} - -func (ds DetailsStreamer) Delete(context.Context, string) error { - return clues.New("not implmented") -} diff --git a/src/internal/streamstore/mock/mock.go b/src/internal/streamstore/mock/mock.go new file mode 100644 index 000000000..f7ce2cd6a --- /dev/null +++ b/src/internal/streamstore/mock/mock.go @@ -0,0 +1,62 @@ +package mock + +import ( + "bytes" + "context" + "io" + + "github.com/pkg/errors" + + "github.com/alcionai/clues" + "github.com/alcionai/corso/src/internal/streamstore" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" +) + +var _ streamstore.Streamer = &Streamer{} + +type Streamer struct { + Deets map[string]*details.Details + Errors map[string]*fault.Errors +} + +func (ms Streamer) Collect(context.Context, streamstore.Collectable) error { + return clues.New("not implented") +} + +func (ms Streamer) Read( + ctx context.Context, + snapshotID string, + col streamstore.Collectable, + errs *fault.Bus, +) error { + var mr streamstore.Marshaller + + switch col.Type { + case streamstore.DetailsType: + mr = ms.Deets[snapshotID] + case streamstore.FaultErrorsType: + mr = ms.Errors[snapshotID] + default: + return clues.New("unknown type: " + col.Type) + } + + if mr == nil { + return errors.Errorf("no marshaller for %s ID %s", col.Type, snapshotID) + } + + bs, err := mr.Marshal() + if err != nil { + return err + } + + return col.Unmr(io.NopCloser(bytes.NewReader(bs))) +} + +func (ms Streamer) Write(context.Context, *fault.Bus) (string, error) { + return "", clues.New("not implented") +} + +func (ms Streamer) Delete(context.Context, string) error { + return clues.New("not implented") +} diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index 21e0baf05..5de33998c 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -7,11 +7,88 @@ import ( "context" "io" + "github.com/alcionai/clues" + "github.com/pkg/errors" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/kopia" + "github.com/alcionai/corso/src/internal/stats" + "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) +// --------------------------------------------------------------------------- +// controller +// --------------------------------------------------------------------------- + +var _ Streamer = &storeStreamer{} + +type storeStreamer struct { + kw *kopia.Wrapper + tenant string + service path.ServiceType + dbcs []data.BackupCollection +} + +// NewStreamer creates a new streamstore Streamer for stream writing metadata files +// to the store. +func NewStreamer( + kw *kopia.Wrapper, + tenant string, + service path.ServiceType, +) *storeStreamer { + return &storeStreamer{ + kw: kw, + tenant: tenant, + service: service, + } +} + +// Collect eagerly searializes the marshalable bytes in the collectable into a +// data.BackupCollection. The collection is stored within the storeStreamer +// for persistence when Write is called. +func (ss *storeStreamer) Collect(ctx context.Context, col Collectable) error { + cs, err := collect(ctx, ss.tenant, ss.service, col) + if err != nil { + return clues.Wrap(err, "collecting data for stream store") + } + + ss.dbcs = append(ss.dbcs, cs) + + return nil +} + +// Write persists the collected objects in the stream store +func (ss *storeStreamer) Write(ctx context.Context, errs *fault.Bus) (string, error) { + id, err := write(ctx, ss.kw, ss.dbcs, errs) + if err != nil { + return "", clues.Wrap(err, "writing to stream store") + } + + return id, nil +} + +// Read reads a collector object from the kopia repository +func (ss *storeStreamer) Read(ctx context.Context, snapshotID string, col Collectable, errs *fault.Bus) error { + err := read(ctx, snapshotID, ss.tenant, ss.service, col, ss.kw, errs) + if err != nil { + return clues.Wrap(err, "reading from stream store") + } + + return nil +} + +// Delete deletes a `details.Details` object from the kopia repository +func (ss *storeStreamer) Delete(ctx context.Context, detailsID string) error { + err := ss.kw.DeleteSnapshot(ctx, detailsID) + if err != nil { + return errors.Wrap(err, "deleting snapshot in stream store") + } + + return nil +} + // --------------------------------------------------------------------------- // interfaces // --------------------------------------------------------------------------- @@ -19,17 +96,27 @@ import ( // Streamer is the core interface for all types of data streamed to and // from the store. type Streamer interface { + Collector Writer Reader Delete(context.Context, string) error } +type CollectorWriter interface { + Collector + Writer +} + +type Collector interface { + Collect(context.Context, Collectable) error +} + type Reader interface { - Read(context.Context, string, Unmarshaller, *fault.Bus) error + Read(context.Context, string, Collectable, *fault.Bus) error } type Writer interface { - Write(context.Context, Marshaller, *fault.Bus) (string, error) + Write(context.Context, *fault.Bus) (string, error) } // Marshallers are used to convert structs into bytes to be persisted in the store. @@ -99,3 +186,146 @@ func (di *streamItem) ToReader() io.ReadCloser { func (di *streamItem) Deleted() bool { return false } + +// --------------------------------------------------------------------------- +// common reader/writer/deleter +// --------------------------------------------------------------------------- + +// collect aggregates a collection of bytes +func collect( + ctx context.Context, + tenantID string, + service path.ServiceType, + col Collectable, +) (data.BackupCollection, error) { + // construct the path of the container + p, err := path.Builder{}.ToStreamStorePath(tenantID, col.purpose, service, false) + if err != nil { + return nil, clues.Stack(err).WithClues(ctx) + } + + // TODO: We could use an io.Pipe here to avoid a double copy but that + // makes error handling a bit complicated + bs, err := col.mr.Marshal() + if err != nil { + return nil, clues.Wrap(err, "marshalling body").WithClues(ctx) + } + + dc := streamCollection{ + folderPath: p, + item: &streamItem{ + name: col.itemName, + data: bs, + }, + } + + return &dc, nil +} + +type backuper interface { + BackupCollections( + ctx context.Context, + bases []kopia.IncrementalBase, + cs []data.BackupCollection, + globalExcludeSet map[string]map[string]struct{}, + tags map[string]string, + buildTreeWithBase bool, + errs *fault.Bus, + ) (*kopia.BackupStats, *details.Builder, map[string]kopia.PrevRefs, error) +} + +// write persists bytes to the store +func write( + ctx context.Context, + bup backuper, + dbcs []data.BackupCollection, + errs *fault.Bus, +) (string, error) { + backupStats, _, _, err := bup.BackupCollections( + ctx, + nil, + dbcs, + nil, + nil, + false, + errs) + if err != nil { + return "", errors.Wrap(err, "storing marshalled bytes in repository") + } + + return backupStats.SnapshotID, nil +} + +type restorer interface { + RestoreMultipleItems( + ctx context.Context, + snapshotID string, + paths []path.Path, + bc kopia.ByteCounter, + errs *fault.Bus, + ) ([]data.RestoreCollection, error) +} + +// read retrieves an object from the store +func read( + ctx context.Context, + snapshotID string, + tenantID string, + service path.ServiceType, + col Collectable, + rer restorer, + errs *fault.Bus, +) error { + // construct the path of the container + p, err := path.Builder{}. + Append(col.itemName). + ToStreamStorePath(tenantID, col.purpose, service, true) + if err != nil { + return clues.Stack(err).WithClues(ctx) + } + + cs, err := rer.RestoreMultipleItems( + ctx, + snapshotID, + []path.Path{p}, + &stats.ByteCounter{}, + errs) + if err != nil { + return errors.Wrap(err, "retrieving data") + } + + // Expect only 1 data collection + if len(cs) != 1 { + return clues.New("unexpected collection count"). + WithClues(ctx). + With("collection_count", len(cs)) + } + + var ( + c = cs[0] + found = false + items = c.Items(ctx, errs) + ) + + for { + select { + case <-ctx.Done(): + return clues.New("context cancelled waiting for data").WithClues(ctx) + + case itemData, ok := <-items: + if !ok { + if !found { + return clues.New("no data found").WithClues(ctx) + } + + return nil + } + + if err := col.Unmr(itemData.ToReader()); err != nil { + return clues.Wrap(err, "unmarshalling data").WithClues(ctx) + } + + found = true + } + } +} diff --git a/src/pkg/backup/backup.go b/src/pkg/backup/backup.go index a1cfd6d5c..67e5a9955 100644 --- a/src/pkg/backup/backup.go +++ b/src/pkg/backup/backup.go @@ -22,11 +22,11 @@ type Backup struct { // SnapshotID is the kopia snapshot ID SnapshotID string `json:"snapshotID"` - // Reference to `Details` - // We store the ModelStoreID since Details is immutable - DetailsID string `json:"detailsID"` + // Reference to the details and fault errors storage location. + // Used to read backup.Details and fault.Errors from the streamstore. + StreamStoreID string `json:"streamStoreID"` - // Status of the operation + // Status of the operation, eg: completed, failed, etc Status string `json:"status"` // Selector used in this operation @@ -47,13 +47,18 @@ type Backup struct { stats.ReadWrites stats.StartAndEndTime stats.SkippedCounts + + // **Deprecated** + // Reference to the backup details storage location. + // Used to read backup.Details from the streamstore. + DetailsID string `json:"detailsID"` } // interface compliance checks var _ print.Printable = &Backup{} func New( - snapshotID, detailsID, status string, + snapshotID, streamStoreID, status string, id model.StableID, selector selectors.Selector, rw stats.ReadWrites, @@ -69,7 +74,7 @@ func New( ) if ee.Failure != nil { - failMsg = ee.Failure.Error() + failMsg = ee.Failure.Msg errCount++ } @@ -92,9 +97,9 @@ func New( }, }, - Version: version.Backup, - SnapshotID: snapshotID, - DetailsID: detailsID, + Version: version.Backup, + SnapshotID: snapshotID, + StreamStoreID: streamStoreID, CreationTime: time.Now(), Status: status, diff --git a/src/pkg/selectors/testdata/details.go b/src/pkg/backup/details/testdata/testdata.go similarity index 100% rename from src/pkg/selectors/testdata/details.go rename to src/pkg/backup/details/testdata/testdata.go diff --git a/src/pkg/fault/fault.go b/src/pkg/fault/fault.go index b88227846..a9c83c321 100644 --- a/src/pkg/fault/fault.go +++ b/src/pkg/fault/fault.go @@ -1,11 +1,17 @@ package fault import ( + "context" + "encoding/json" "errors" + "io" "sync" + "github.com/alcionai/clues" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + + "github.com/alcionai/corso/src/cli/print" ) type Bus struct { @@ -163,11 +169,13 @@ func (e *Bus) addSkip(s *Skipped) *Bus { // Errors returns the plain record of errors that were aggregated // within a fult Bus. -func (e *Bus) Errors() Errors { - return Errors{ - Failure: e.failure, - Recovered: slices.Clone(e.recoverable), - Items: itemsIn(e.failure, e.recoverable), +func (e *Bus) Errors() *Errors { + items, nonItems := itemsIn(e.failure, e.recoverable) + + return &Errors{ + Failure: clues.ToCore(e.failure), + Recovered: nonItems, + Items: items, Skipped: slices.Clone(e.skipped), FailFast: e.failFast, } @@ -185,16 +193,14 @@ type Errors struct { // non-start cases (ex: cannot connect to client), hard- // stop issues (ex: credentials expired) or conscious exit // cases (ex: iteration error + failFast config). - Failure error `json:"failure"` + Failure *clues.ErrCore `json:"failure"` - // Recovered errors accumulate through a runtime under - // best-effort processing conditions. They imply that an - // error occurred, but the process was able to move on and - // complete afterwards. - // Eg: if a process is retrieving N items, and 1 of the - // items fails to be retrieved, but the rest of them succeed, - // we'd expect to see 1 error added to this slice. - Recovered []error `json:"-"` + // Recovered is the set of NON-Item errors that accumulated + // through a runtime under best-effort processing conditions. + // They imply that an error occurred, but the process was able + // to move on and complete afterwards. Any error that can be + // serialized to a fault.Item is found in the Items set instead. + Recovered []*clues.ErrCore `json:"recovered"` // Items are the reduction of all errors (both the failure and the // recovered values) in the Errors struct into a slice of items, @@ -212,14 +218,19 @@ type Errors struct { } // itemsIn reduces all errors (both the failure and recovered values) -// in the Errors struct into a slice of items, deduplicated by their -// ID. -func itemsIn(failure error, recovered []error) []Item { - is := map[string]Item{} +// in the Errors struct into a slice of items, deduplicated by their ID. +// Any non-item error is serialized to a clues.ErrCore and returned in +// the second list. +func itemsIn(failure error, recovered []error) ([]Item, []*clues.ErrCore) { + var ( + is = map[string]Item{} + non = []*clues.ErrCore{} + ) for _, err := range recovered { var ie *Item if !errors.As(err, &ie) { + non = append(non, clues.ToCore(err)) continue } @@ -231,7 +242,79 @@ func itemsIn(failure error, recovered []error) []Item { is[ie.ID] = *ie } - return maps.Values(is) + return maps.Values(is), non +} + +// Marshal runs json.Marshal on the errors. +func (e *Errors) Marshal() ([]byte, error) { + bs, err := json.Marshal(e) + return bs, err +} + +// UnmarshalErrorsTo produces a func that complies with the unmarshaller +// type in streamStore. +func UnmarshalErrorsTo(e *Errors) func(io.ReadCloser) error { + return func(rc io.ReadCloser) error { + return json.NewDecoder(rc).Decode(e) + } +} + +// Print writes the DetailModel Entries to StdOut, in the format +// requested by the caller. +func (e *Errors) PrintItems(ctx context.Context, ignoreErrors, ignoreSkips, ignoreRecovered bool) { + if len(e.Items)+len(e.Skipped)+len(e.Recovered) == 0 || + ignoreErrors && ignoreSkips && ignoreRecovered { + return + } + + sl := make([]print.Printable, 0) + + if !ignoreSkips { + for _, s := range e.Skipped { + sl = append(sl, print.Printable(s)) + } + } + + if !ignoreErrors { + for _, i := range e.Items { + sl = append(sl, print.Printable(i)) + } + } + + if !ignoreRecovered { + for _, rcv := range e.Recovered { + pec := errCoreToPrintable(rcv) + sl = append(sl, print.Printable(&pec)) + } + } + + print.All(ctx, sl...) +} + +var _ print.Printable = &printableErrCore{} + +type printableErrCore struct { + msg string +} + +func errCoreToPrintable(ec *clues.ErrCore) printableErrCore { + if ec == nil { + return printableErrCore{""} + } + + return printableErrCore{ec.Msg} +} + +func (pec printableErrCore) MinimumPrintable() any { + return pec +} + +func (pec printableErrCore) Headers() []string { + return []string{"Error"} +} + +func (pec printableErrCore) Values() []string { + return []string{pec.msg} } // --------------------------------------------------------------------------- diff --git a/src/pkg/fault/fault_test.go b/src/pkg/fault/fault_test.go index b7b776f63..c12016e51 100644 --- a/src/pkg/fault/fault_test.go +++ b/src/pkg/fault/fault_test.go @@ -6,11 +6,11 @@ import ( "fmt" "testing" + "github.com/alcionai/clues" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/alcionai/clues" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/fault" ) @@ -211,8 +211,8 @@ func (suite *FaultErrorsUnitSuite) TestErrors() { n.AddRecoverable(errors.New("2")) d := n.Errors() - assert.Equal(t, n.Failure(), d.Failure) - assert.ElementsMatch(t, n.Recovered(), d.Recovered) + assert.Equal(t, clues.ToCore(n.Failure()), d.Failure) + assert.Len(t, d.Recovered, len(n.Recovered())) assert.False(t, d.FailFast) // fail-fast @@ -224,90 +224,98 @@ func (suite *FaultErrorsUnitSuite) TestErrors() { n.AddRecoverable(errors.New("2")) d = n.Errors() - assert.Equal(t, n.Failure(), d.Failure) - assert.ElementsMatch(t, n.Recovered(), d.Recovered) + assert.Equal(t, clues.ToCore(n.Failure()), d.Failure) + assert.Len(t, d.Recovered, len(n.Recovered())) assert.True(t, d.FailFast) } func (suite *FaultErrorsUnitSuite) TestErrors_Items() { - ae := assert.AnError + ae := clues.Stack(assert.AnError) + noncore := []*clues.ErrCore{ae.Core()} addtl := map[string]any{"foo": "bar", "baz": 1} table := []struct { - name string - errs func() fault.Errors - expect []fault.Item + name string + errs func() *fault.Errors + expectItems []fault.Item + expectRecoverable []*clues.ErrCore }{ { name: "no errors", - errs: func() fault.Errors { + errs: func() *fault.Errors { return fault.New(false).Errors() }, - expect: []fault.Item{}, + expectItems: []fault.Item{}, + expectRecoverable: []*clues.ErrCore{}, }, { name: "no items", - errs: func() fault.Errors { + errs: func() *fault.Errors { b := fault.New(false) b.Fail(ae) b.AddRecoverable(ae) return b.Errors() }, - expect: []fault.Item{}, + expectItems: []fault.Item{}, + expectRecoverable: noncore, }, { name: "failure item", - errs: func() fault.Errors { + errs: func() *fault.Errors { b := fault.New(false) b.Fail(fault.OwnerErr(ae, "id", "name", addtl)) b.AddRecoverable(ae) return b.Errors() }, - expect: []fault.Item{*fault.OwnerErr(ae, "id", "name", addtl)}, + expectItems: []fault.Item{*fault.OwnerErr(ae, "id", "name", addtl)}, + expectRecoverable: noncore, }, { name: "recoverable item", - errs: func() fault.Errors { + errs: func() *fault.Errors { b := fault.New(false) b.Fail(ae) b.AddRecoverable(fault.OwnerErr(ae, "id", "name", addtl)) return b.Errors() }, - expect: []fault.Item{*fault.OwnerErr(ae, "id", "name", addtl)}, + expectItems: []fault.Item{*fault.OwnerErr(ae, "id", "name", addtl)}, + expectRecoverable: []*clues.ErrCore{}, }, { name: "two items", - errs: func() fault.Errors { + errs: func() *fault.Errors { b := fault.New(false) b.Fail(fault.OwnerErr(ae, "oid", "name", addtl)) b.AddRecoverable(fault.FileErr(ae, "fid", "name", addtl)) return b.Errors() }, - expect: []fault.Item{ + expectItems: []fault.Item{ *fault.OwnerErr(ae, "oid", "name", addtl), *fault.FileErr(ae, "fid", "name", addtl), }, + expectRecoverable: []*clues.ErrCore{}, }, { name: "duplicate items - failure priority", - errs: func() fault.Errors { + errs: func() *fault.Errors { b := fault.New(false) b.Fail(fault.OwnerErr(ae, "id", "name", addtl)) b.AddRecoverable(fault.FileErr(ae, "id", "name", addtl)) return b.Errors() }, - expect: []fault.Item{ + expectItems: []fault.Item{ *fault.OwnerErr(ae, "id", "name", addtl), }, + expectRecoverable: []*clues.ErrCore{}, }, { name: "duplicate items - last recoverable priority", - errs: func() fault.Errors { + errs: func() *fault.Errors { b := fault.New(false) b.Fail(ae) b.AddRecoverable(fault.FileErr(ae, "fid", "name", addtl)) @@ -315,14 +323,41 @@ func (suite *FaultErrorsUnitSuite) TestErrors_Items() { return b.Errors() }, - expect: []fault.Item{ + expectItems: []fault.Item{ *fault.FileErr(ae, "fid", "name2", addtl), }, + expectRecoverable: []*clues.ErrCore{}, + }, + { + name: "recoverable item and non-items", + errs: func() *fault.Errors { + b := fault.New(false) + b.Fail(ae) + b.AddRecoverable(fault.FileErr(ae, "fid", "name", addtl)) + b.AddRecoverable(ae) + + return b.Errors() + }, + expectItems: []fault.Item{ + *fault.FileErr(ae, "fid", "name", addtl), + }, + expectRecoverable: noncore, }, } for _, test := range table { suite.Run(test.name, func() { - assert.ElementsMatch(suite.T(), test.expect, test.errs().Items) + t := suite.T() + fe := test.errs() + + assert.ElementsMatch(t, test.expectItems, fe.Items) + require.Equal(t, test.expectRecoverable, fe.Recovered) + + for i := range test.expectRecoverable { + expect := test.expectRecoverable[i] + got := fe.Recovered[i] + + assert.Equal(t, *expect, *got) + } }) } } diff --git a/src/pkg/fault/item.go b/src/pkg/fault/item.go index 2965a91d1..83bdf48c1 100644 --- a/src/pkg/fault/item.go +++ b/src/pkg/fault/item.go @@ -1,5 +1,7 @@ package fault +import "github.com/alcionai/corso/src/cli/print" + const ( AddtlCreatedBy = "created_by" AddtlLastModBy = "last_modified_by" @@ -16,7 +18,23 @@ const ( ResourceOwnerType itemType = "resource_owner" ) -var _ error = &Item{} +func (it itemType) Printable() string { + switch it { + case FileType: + return "File" + case ContainerType: + return "Container" + case ResourceOwnerType: + return "Resource Owner" + } + + return "Unknown" +} + +var ( + _ error = &Item{} + _ print.Printable = &Item{} +) // Item contains a concrete reference to a thing that failed // during processing. The categorization of the item is determined @@ -66,6 +84,31 @@ func (i *Item) Error() string { return string("processing " + i.Type) } +func (i Item) MinimumPrintable() any { + return i +} + +// Headers returns the human-readable names of properties of an Item +// for printing out to a terminal. +func (i Item) Headers() []string { + return []string{"Action", "Type", "Name", "Container", "Cause"} +} + +// Values populates the printable values matching the Headers list. +func (i Item) Values() []string { + var cn string + + acn, ok := i.Additional[AddtlContainerName] + if ok { + str, ok := acn.(string) + if ok { + cn = str + } + } + + return []string{"Error", i.Type.Printable(), i.Name, cn, i.Cause} +} + // ContainerErr produces a Container-type Item for tracking erronous items func ContainerErr(cause error, id, name string, addtl map[string]any) *Item { return itemErr(ContainerType, cause, id, name, addtl) @@ -104,14 +147,18 @@ func itemErr(t itemType, cause error, id, name string, addtl map[string]any) *It // handled as normal errors. type skipCause string -// SkipMalware identifies a malware detection case. Files that graph api -// identifies as malware cannot be downloaded or uploaded, and will permanently -// fail any attempts to backup or restore. -const SkipMalware skipCause = "malware_detected" +const ( + // SkipMalware identifies a malware detection case. Files that graph + // api identifies as malware cannot be downloaded or uploaded, and will + // permanently fail any attempts to backup or restore. + SkipMalware skipCause = "malware_detected" -// SkipNotFound identifies that a file was skipped because we could -// not find it when trying to download contents -const SkipNotFound skipCause = "file_not_found" + // SkipNotFound identifies that a file was skipped because we could + // not find it when trying to download contents + SkipNotFound skipCause = "file_not_found" +) + +var _ print.Printable = &Skipped{} // Skipped items are permanently unprocessable due to well-known conditions. // In order to skip an item, the following conditions should be met: @@ -126,7 +173,7 @@ const SkipNotFound skipCause = "file_not_found" // fault interface. Skipped items are not errors, and Item{} errors are // not the basis for a Skip. type Skipped struct { - item Item + Item Item `json:"item"` } // String complies with the stringer interface. @@ -135,7 +182,7 @@ func (s *Skipped) String() string { return "" } - return "skipped " + s.item.Error() + ": " + s.item.Cause + return "skipped " + s.Item.Error() + ": " + s.Item.Cause } // HasCause compares the underlying cause against the parameter. @@ -144,7 +191,32 @@ func (s *Skipped) HasCause(c skipCause) bool { return false } - return s.item.Cause == string(c) + return s.Item.Cause == string(c) +} + +func (s Skipped) MinimumPrintable() any { + return s +} + +// Headers returns the human-readable names of properties of a skipped Item +// for printing out to a terminal. +func (s Skipped) Headers() []string { + return []string{"Action", "Type", "Name", "Container", "Cause"} +} + +// Values populates the printable values matching the Headers list. +func (s Skipped) Values() []string { + var cn string + + acn, ok := s.Item.Additional[AddtlContainerName] + if ok { + str, ok := acn.(string) + if ok { + cn = str + } + } + + return []string{"Skip", s.Item.Type.Printable(), s.Item.Name, cn, s.Item.Cause} } // ContainerSkip produces a Container-kind Item for tracking skipped items. @@ -165,7 +237,7 @@ func OwnerSkip(cause skipCause, id, name string, addtl map[string]any) *Skipped // itemSkip produces a Item of the provided type for tracking skipped items. func itemSkip(t itemType, cause skipCause, id, name string, addtl map[string]any) *Skipped { return &Skipped{ - item: Item{ + Item: Item{ ID: id, Name: name, Type: t, diff --git a/src/pkg/fault/item_test.go b/src/pkg/fault/item_test.go index f90136ff1..1fb9dc78a 100644 --- a/src/pkg/fault/item_test.go +++ b/src/pkg/fault/item_test.go @@ -81,6 +81,76 @@ func (suite *ItemUnitSuite) TestOwnerErr() { assert.Equal(t, expect, *i) } +func (suite *ItemUnitSuite) TestItemType_Printable() { + table := []struct { + t itemType + expect string + }{ + { + t: FileType, + expect: "File", + }, + { + t: ContainerType, + expect: "Container", + }, + { + t: ResourceOwnerType, + expect: "Resource Owner", + }, + { + t: itemType("foo"), + expect: "Unknown", + }, + } + for _, test := range table { + suite.Run(string(test.t), func() { + assert.Equal(suite.T(), test.expect, test.t.Printable()) + }) + } +} + +func (suite *ItemUnitSuite) TestItem_HeadersValues() { + var ( + err = assert.AnError + cause = err.Error() + addtl = map[string]any{ + AddtlContainerID: "cid", + AddtlContainerName: "cname", + } + ) + + table := []struct { + name string + item *Item + expect []string + }{ + { + name: "file", + item: FileErr(assert.AnError, "id", "name", addtl), + expect: []string{"Error", FileType.Printable(), "name", "cname", cause}, + }, + { + name: "container", + item: ContainerErr(assert.AnError, "id", "name", addtl), + expect: []string{"Error", ContainerType.Printable(), "name", "cname", cause}, + }, + { + name: "owner", + item: OwnerErr(assert.AnError, "id", "name", nil), + expect: []string{"Error", ResourceOwnerType.Printable(), "name", "", cause}, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + assert.Equal(t, []string{"Action", "Type", "Name", "Container", "Cause"}, test.item.Headers()) + assert.Equal(t, test.expect, test.item.Values()) + }) + } +} + func (suite *ItemUnitSuite) TestSkipped_String() { var ( t = suite.T() @@ -93,7 +163,7 @@ func (suite *ItemUnitSuite) TestSkipped_String() { assert.Contains(t, i.String(), "unknown type") i = &Skipped{Item{Type: FileType}} - assert.Contains(t, i.item.Error(), FileType) + assert.Contains(t, i.Item.Error(), FileType) } func (suite *ItemUnitSuite) TestContainerSkip() { @@ -143,3 +213,40 @@ func (suite *ItemUnitSuite) TestOwnerSkip() { assert.Equal(t, Skipped{expect}, *i) } + +func (suite *ItemUnitSuite) TestSkipped_HeadersValues() { + addtl := map[string]any{ + AddtlContainerID: "cid", + AddtlContainerName: "cname", + } + + table := []struct { + name string + skip *Skipped + expect []string + }{ + { + name: "file", + skip: FileSkip(SkipMalware, "id", "name", addtl), + expect: []string{"Skip", FileType.Printable(), "name", "cname", string(SkipMalware)}, + }, + { + name: "container", + skip: ContainerSkip(SkipMalware, "id", "name", addtl), + expect: []string{"Skip", ContainerType.Printable(), "name", "cname", string(SkipMalware)}, + }, + { + name: "owner", + skip: OwnerSkip(SkipMalware, "id", "name", nil), + expect: []string{"Skip", ResourceOwnerType.Printable(), "name", "", string(SkipMalware)}, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + assert.Equal(t, []string{"Action", "Type", "Name", "Container", "Cause"}, test.skip.Headers()) + assert.Equal(t, test.expect, test.skip.Values()) + }) + } +} diff --git a/src/pkg/fault/testdata/testdata.go b/src/pkg/fault/testdata/testdata.go new file mode 100644 index 000000000..0a28f6ccb --- /dev/null +++ b/src/pkg/fault/testdata/testdata.go @@ -0,0 +1,26 @@ +package testdata + +import ( + "github.com/stretchr/testify/assert" + + "github.com/alcionai/clues" + "github.com/alcionai/corso/src/pkg/fault" +) + +func MakeErrors(failure, recovered, skipped bool) fault.Errors { + fe := fault.Errors{} + + if failure { + fe.Failure = clues.Wrap(assert.AnError, "wrapped").Core() + } + + if recovered { + fe.Recovered = []*clues.ErrCore{clues.New("recoverable").Core()} + } + + if skipped { + fe.Skipped = []fault.Skipped{*fault.FileSkip(fault.SkipMalware, "id", "name", nil)} + } + + return fe +} diff --git a/src/pkg/path/path.go b/src/pkg/path/path.go index 9a8ee7d89..0d7bc95ca 100644 --- a/src/pkg/path/path.go +++ b/src/pkg/path/path.go @@ -326,8 +326,7 @@ func (pb Builder) ToStreamStorePath( tenant, metadataService.String(), purpose, - DetailsCategory.String(), - ), + DetailsCategory.String()), service: metadataService, category: DetailsCategory, hasItem: isItem, diff --git a/src/pkg/repository/loadtest/repository_load_test.go b/src/pkg/repository/loadtest/repository_load_test.go index d57667967..07f8bc438 100644 --- a/src/pkg/repository/loadtest/repository_load_test.go +++ b/src/pkg/repository/loadtest/repository_load_test.go @@ -248,7 +248,7 @@ func runBackupDetailsLoadTest( ) pprof.Do(ctx, labels, func(ctx context.Context) { - ds, b, errs = r.BackupDetails(ctx, backupID) + ds, b, errs = r.GetBackupDetails(ctx, backupID) }) require.NoError(t, errs.Failure(), "retrieving details in backup", backupID, clues.ToCore(errs.Failure())) diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index fd94bdb32..21ba4aef6 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -36,10 +36,14 @@ type BackupGetter interface { Backup(ctx context.Context, id model.StableID) (*backup.Backup, error) Backups(ctx context.Context, ids []model.StableID) ([]*backup.Backup, *fault.Bus) BackupsByTag(ctx context.Context, fs ...store.FilterOption) ([]*backup.Backup, error) - BackupDetails( + GetBackupDetails( ctx context.Context, backupID string, ) (*details.Details, *backup.Backup, *fault.Bus) + GetBackupErrors( + ctx context.Context, + backupID string, + ) (*fault.Errors, *backup.Backup, *fault.Bus) } type Repository interface { @@ -343,28 +347,58 @@ func (r repository) BackupsByTag(ctx context.Context, fs ...store.FilterOption) return sw.GetBackups(ctx, fs...) } -// BackupDetails returns the specified backup details object -func (r repository) BackupDetails( +// BackupDetails returns the specified backup.Details +func (r repository) GetBackupDetails( ctx context.Context, backupID string, ) (*details.Details, *backup.Backup, *fault.Bus) { - sw := store.NewKopiaStore(r.modelStore) errs := fault.New(false) - dID, b, err := sw.GetDetailsIDFromBackupID(ctx, model.StableID(backupID)) + deets, bup, err := getBackupDetails( + ctx, + backupID, + r.Account.ID(), + r.dataLayer, + store.NewKopiaStore(r.modelStore), + errs) + + return deets, bup, errs.Fail(err) +} + +// getBackupDetails handles the processing for GetBackupDetails. +func getBackupDetails( + ctx context.Context, + backupID, tenantID string, + kw *kopia.Wrapper, + sw *store.Wrapper, + errs *fault.Bus, +) (*details.Details, *backup.Backup, error) { + b, err := sw.GetBackup(ctx, model.StableID(backupID)) if err != nil { - return nil, nil, errs.Fail(err) + return nil, nil, err } - nd := streamstore.NewDetails( - r.dataLayer, - r.Account.ID(), - b.Selector.PathService(), + ssid := b.StreamStoreID + if len(ssid) == 0 { + ssid = b.DetailsID + } + + if len(ssid) == 0 { + return nil, b, clues.New("no streamstore id in backup").WithClues(ctx) + } + + var ( + sstore = streamstore.NewStreamer(kw, tenantID, b.Selector.PathService()) + deets details.Details ) - var deets details.Details - if err := nd.Read(ctx, dID, details.UnmarshalTo(&deets), errs); err != nil { - return nil, nil, errs.Fail(err) + err = sstore.Read( + ctx, + ssid, + streamstore.DetailsReader(details.UnmarshalTo(&deets)), + errs) + if err != nil { + return nil, nil, err } // Retroactively fill in isMeta information for items in older @@ -380,7 +414,60 @@ func (r repository) BackupDetails( deets.DetailsModel = deets.FilterMetaFiles() - return &deets, b, errs + return &deets, b, nil +} + +// BackupErrors returns the specified backup's fault.Errors +func (r repository) GetBackupErrors( + ctx context.Context, + backupID string, +) (*fault.Errors, *backup.Backup, *fault.Bus) { + errs := fault.New(false) + + fe, bup, err := getBackupErrors( + ctx, + backupID, + r.Account.ID(), + r.dataLayer, + store.NewKopiaStore(r.modelStore), + errs) + + return fe, bup, errs.Fail(err) +} + +// getBackupErrors handles the processing for GetBackupErrors. +func getBackupErrors( + ctx context.Context, + backupID, tenantID string, + kw *kopia.Wrapper, + sw *store.Wrapper, + errs *fault.Bus, +) (*fault.Errors, *backup.Backup, error) { + b, err := sw.GetBackup(ctx, model.StableID(backupID)) + if err != nil { + return nil, nil, err + } + + ssid := b.StreamStoreID + if len(ssid) == 0 { + return nil, b, clues.New("no errors in backup").WithClues(ctx) + } + + var ( + sstore = streamstore.NewStreamer(kw, tenantID, b.Selector.PathService()) + fe fault.Errors + ) + + err = sstore.Read( + ctx, + ssid, + streamstore.FaultErrorsReader(fault.UnmarshalErrorsTo(&fe)), + errs) + if err != nil { + return nil, nil, err + } + + return &fe, b, nil } // DeleteBackup removes the backup from both the model store and the backup storage. @@ -394,8 +481,16 @@ func (r repository) DeleteBackup(ctx context.Context, id model.StableID) error { return err } - if err := r.dataLayer.DeleteSnapshot(ctx, bu.DetailsID); err != nil { - return err + if len(bu.SnapshotID) > 0 { + if err := r.dataLayer.DeleteSnapshot(ctx, bu.SnapshotID); err != nil { + return err + } + } + + if len(bu.DetailsID) > 0 { + if err := r.dataLayer.DeleteSnapshot(ctx, bu.DetailsID); err != nil { + return err + } } sw := store.NewKopiaStore(r.modelStore) diff --git a/src/pkg/repository/repository_test.go b/src/pkg/repository/repository_test.go index 0813a932f..020dccaf8 100644 --- a/src/pkg/repository/repository_test.go +++ b/src/pkg/repository/repository_test.go @@ -20,15 +20,15 @@ import ( // unit tests // --------------- -type RepositorySuite struct { +type RepositoryUnitSuite struct { tester.Suite } -func TestRepositorySuite(t *testing.T) { - suite.Run(t, &RepositorySuite{Suite: tester.NewUnitSuite(t)}) +func TestRepositoryUnitSuite(t *testing.T) { + suite.Run(t, &RepositoryUnitSuite{Suite: tester.NewUnitSuite(t)}) } -func (suite *RepositorySuite) TestInitialize() { +func (suite *RepositoryUnitSuite) TestInitialize() { table := []struct { name string storage func() (storage.Storage, error) @@ -62,7 +62,7 @@ func (suite *RepositorySuite) TestInitialize() { // repository.Connect involves end-to-end communication with kopia, therefore this only // tests expected error cases -func (suite *RepositorySuite) TestConnect() { +func (suite *RepositoryUnitSuite) TestConnect() { table := []struct { name string storage func() (storage.Storage, error) diff --git a/src/pkg/repository/repository_unexported_test.go b/src/pkg/repository/repository_unexported_test.go index 34f7a6de6..6ad5672c9 100644 --- a/src/pkg/repository/repository_unexported_test.go +++ b/src/pkg/repository/repository_unexported_test.go @@ -1,6 +1,7 @@ package repository import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -9,42 +10,105 @@ import ( "github.com/alcionai/clues" "github.com/alcionai/corso/src/internal/kopia" + "github.com/alcionai/corso/src/internal/model" + "github.com/alcionai/corso/src/internal/operations" + "github.com/alcionai/corso/src/internal/stats" + "github.com/alcionai/corso/src/internal/streamstore" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/backup" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/selectors" + "github.com/alcionai/corso/src/pkg/store" ) -type RepositoryModelSuite struct { +type RepositoryModelIntgSuite struct { tester.Suite + kw *kopia.Wrapper + ms *kopia.ModelStore + sw *store.Wrapper + kopiaCloser func(ctx context.Context) } -func TestRepositoryModelSuite(t *testing.T) { - suite.Run(t, &RepositoryModelSuite{ +func TestRepositoryModelIntgSuite(t *testing.T) { + suite.Run(t, &RepositoryModelIntgSuite{ Suite: tester.NewIntegrationSuite( t, [][]string{tester.AWSStorageCredEnvs, tester.M365AcctCredEnvs}), }) } -func (suite *RepositoryModelSuite) TestWriteGetModel() { +func (suite *RepositoryModelIntgSuite) SetupSuite() { ctx, flush := tester.NewContext() defer flush() var ( - t = suite.T() - s = tester.NewPrefixedS3Storage(t) - kopiaRef = kopia.NewConn(s) + t = suite.T() + s = tester.NewPrefixedS3Storage(t) + k = kopia.NewConn(s) + err error ) - err := kopiaRef.Initialize(ctx) + require.NotNil(t, k) + + err = k.Initialize(ctx) require.NoError(t, err, clues.ToCore(err)) - err = kopiaRef.Connect(ctx) + err = k.Connect(ctx) require.NoError(t, err, clues.ToCore(err)) - defer kopiaRef.Close(ctx) + suite.kopiaCloser = func(ctx context.Context) { + k.Close(ctx) + } - ms, err := kopia.NewModelStore(kopiaRef) + suite.kw, err = kopia.NewWrapper(k) require.NoError(t, err, clues.ToCore(err)) + suite.ms, err = kopia.NewModelStore(k) + require.NoError(t, err, clues.ToCore(err)) + + suite.sw = store.NewKopiaStore(suite.ms) +} + +func (suite *RepositoryModelIntgSuite) TearDownSuite() { + ctx, flush := tester.NewContext() + defer flush() + + t := suite.T() + + if suite.ms != nil { + err := suite.ms.Close(ctx) + assert.NoError(t, err, clues.ToCore(err)) + } + + if suite.kw != nil { + err := suite.kw.Close(ctx) + assert.NoError(t, err, clues.ToCore(err)) + } + + if suite.kopiaCloser != nil { + suite.kopiaCloser(ctx) + } +} + +func (suite *RepositoryModelIntgSuite) TestGetRepositoryModel() { + ctx, flush := tester.NewContext() + defer flush() + + var ( + t = suite.T() + s = tester.NewPrefixedS3Storage(t) + k = kopia.NewConn(s) + ) + + require.NoError(t, k.Initialize(ctx)) + require.NoError(t, k.Connect(ctx)) + + defer k.Close(ctx) + + ms, err := kopia.NewModelStore(k) + require.NoError(t, err) + defer ms.Close(ctx) err = newRepoModel(ctx, ms, "fnords") @@ -54,3 +118,212 @@ func (suite *RepositoryModelSuite) TestWriteGetModel() { require.NoError(t, err, clues.ToCore(err)) assert.Equal(t, "fnords", string(got.ID)) } + +// helper func for writing backups +func writeBackup( + t *testing.T, + ctx context.Context, //revive:disable-line:context-as-argument + kw *kopia.Wrapper, + sw *store.Wrapper, + tID, snapID, backupID string, + sel selectors.Selector, + deets *details.Details, + errors *fault.Errors, + errs *fault.Bus, +) *backup.Backup { + var ( + serv = sel.PathService() + sstore = streamstore.NewStreamer(kw, tID, serv) + ) + + err := sstore.Collect(ctx, streamstore.DetailsCollector(deets)) + require.NoError(t, err, "collecting details in streamstore") + + err = sstore.Collect(ctx, streamstore.FaultErrorsCollector(errors)) + require.NoError(t, err, "collecting errors in streamstore") + + ssid, err := sstore.Write(ctx, errs) + require.NoError(t, err, "writing to streamstore") + + b := backup.New( + snapID, ssid, + operations.Completed.String(), + model.StableID(backupID), + sel, + stats.ReadWrites{}, + stats.StartAndEndTime{}, + errs) + + err = sw.Put(ctx, model.BackupSchema, b) + require.NoError(t, err) + + return b +} + +func (suite *RepositoryModelIntgSuite) TestGetBackupDetails() { + const tenantID = "tenant" + + info := details.ItemInfo{ + Folder: &details.FolderInfo{ + DisplayName: "test", + }, + } + + builder := &details.Builder{} + require.NoError(suite.T(), builder.Add("ref", "short", "pref", "lref", false, info)) + + table := []struct { + name string + writeBupID string + readBupID string + deets *details.Details + expectErr require.ErrorAssertionFunc + }{ + { + name: "good", + writeBupID: "squirrels", + readBupID: "squirrels", + deets: builder.Details(), + expectErr: require.NoError, + }, + { + name: "missing backup", + writeBupID: "chipmunks", + readBupID: "weasels", + deets: builder.Details(), + expectErr: require.Error, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + ctx, flush := tester.NewContext() + defer flush() + + var ( + t = suite.T() + b = writeBackup( + t, + ctx, + suite.kw, + suite.sw, + tenantID, "snapID", test.writeBupID, + selectors.NewExchangeBackup([]string{"brunhilda"}).Selector, + test.deets, + &fault.Errors{}, + fault.New(true)) + ) + + rDeets, rBup, err := getBackupDetails(ctx, test.readBupID, tenantID, suite.kw, suite.sw, fault.New(true)) + test.expectErr(t, err) + + if err != nil { + return + } + + assert.Equal(t, b.DetailsID, rBup.DetailsID, "returned details ID matches") + assert.Equal(t, test.deets, rDeets, "returned details ID matches") + }) + } +} + +func (suite *RepositoryModelIntgSuite) TestGetBackupErrors() { + const ( + tenantID = "tenant" + failFast = true + ) + + var ( + err = clues.Wrap(assert.AnError, "wrap") + cec = err.Core() + item = fault.FileErr(err, "file-id", "file-name", map[string]any{"foo": "bar"}) + skip = fault.FileSkip(fault.SkipMalware, "s-file-id", "s-file-name", map[string]any{"foo": "bar"}) + info = details.ItemInfo{ + Folder: &details.FolderInfo{ + DisplayName: "test", + }, + } + ) + + builder := &details.Builder{} + + require.NoError(suite.T(), builder.Add("ref", "short", "pref", "lref", false, info)) + + table := []struct { + name string + writeBupID string + readBupID string + deets *details.Details + errors *fault.Errors + expectErrors *fault.Errors + expectErr require.ErrorAssertionFunc + }{ + { + name: "nil errors", + writeBupID: "error_marmots", + readBupID: "error_marmots", + deets: builder.Details(), + errors: nil, + expectErrors: &fault.Errors{}, + expectErr: require.NoError, + }, + { + name: "good", + writeBupID: "error_squirrels", + readBupID: "error_squirrels", + deets: builder.Details(), + errors: &fault.Errors{ + Failure: cec, + Recovered: []*clues.ErrCore{cec}, + Items: []fault.Item{*item}, + Skipped: []fault.Skipped{*skip}, + FailFast: failFast, + }, + expectErrors: &fault.Errors{ + Failure: cec, + Recovered: []*clues.ErrCore{cec}, + Items: []fault.Item{*item}, + Skipped: []fault.Skipped{*skip}, + FailFast: failFast, + }, + expectErr: require.NoError, + }, + { + name: "missing backup", + writeBupID: "error_chipmunks", + readBupID: "error_weasels", + deets: builder.Details(), + errors: nil, + expectErr: require.Error, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + ctx, flush := tester.NewContext() + defer flush() + + var ( + t = suite.T() + b = writeBackup( + t, + ctx, + suite.kw, + suite.sw, + tenantID, "snapID", test.writeBupID, + selectors.NewExchangeBackup([]string{"brunhilda"}).Selector, + test.deets, + test.errors, + fault.New(failFast)) + ) + + rErrors, rBup, err := getBackupErrors(ctx, test.readBupID, tenantID, suite.kw, suite.sw, fault.New(failFast)) + test.expectErr(t, err) + + if err != nil { + return + } + + assert.Equal(t, b.StreamStoreID, rBup.StreamStoreID, "returned streamstore ID matches") + assert.Equal(t, test.expectErrors, rErrors, "retrieved errors match") + }) + } +} diff --git a/src/pkg/selectors/selectors_reduce_test.go b/src/pkg/selectors/selectors_reduce_test.go index a5155fdf8..6229dc164 100644 --- a/src/pkg/selectors/selectors_reduce_test.go +++ b/src/pkg/selectors/selectors_reduce_test.go @@ -10,9 +10,9 @@ import ( "github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/backup/details/testdata" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/selectors" - "github.com/alcionai/corso/src/pkg/selectors/testdata" ) type SelectorReduceSuite struct { diff --git a/src/pkg/store/backup.go b/src/pkg/store/backup.go index c04d0e290..0585944d4 100644 --- a/src/pkg/store/backup.go +++ b/src/pkg/store/backup.go @@ -81,16 +81,3 @@ func (w Wrapper) GetBackups( func (w Wrapper) DeleteBackup(ctx context.Context, backupID model.StableID) error { return w.Delete(ctx, model.BackupSchema, backupID) } - -// GetDetailsFromBackupID retrieves the backup.Details within the specified backup. -func (w Wrapper) GetDetailsIDFromBackupID( - ctx context.Context, - backupID model.StableID, -) (string, *backup.Backup, error) { - b, err := w.GetBackup(ctx, backupID) - if err != nil { - return "", nil, err - } - - return b.DetailsID, b, nil -} diff --git a/src/pkg/store/backup_test.go b/src/pkg/store/backup_test.go index dc46219ba..722b3d603 100644 --- a/src/pkg/store/backup_test.go +++ b/src/pkg/store/backup_test.go @@ -147,41 +147,3 @@ func (suite *StoreBackupUnitSuite) TestDeleteBackup() { }) } } - -func (suite *StoreBackupUnitSuite) TestGetDetailsIDFromBackupID() { - ctx, flush := tester.NewContext() - defer flush() - - table := []struct { - name string - mock *storeMock.MockModelStore - expect assert.ErrorAssertionFunc - }{ - { - name: "gets details from backup id", - mock: storeMock.NewMock(&bu, nil), - expect: assert.NoError, - }, - { - name: "errors", - mock: storeMock.NewMock(&bu, assert.AnError), - expect: assert.Error, - }, - } - for _, test := range table { - suite.Run(test.name, func() { - t := suite.T() - store := &store.Wrapper{Storer: test.mock} - - dResult, bResult, err := store.GetDetailsIDFromBackupID(ctx, model.StableID(uuid.NewString())) - test.expect(t, err, clues.ToCore(err)) - - if err != nil { - return - } - - assert.Equal(t, bu.DetailsID, dResult) - assert.Equal(t, bu.ID, bResult.ID) - }) - } -}