diff --git a/src/cli/backup/exchange.go b/src/cli/backup/exchange.go index b6bffef4c..558f800b6 100644 --- a/src/cli/backup/exchange.go +++ b/src/cli/backup/exchange.go @@ -314,8 +314,8 @@ func createExchangeCmd(cmd *cobra.Command, args []string) error { bups, ferrs := r.Backups(ctx, bIDs) // TODO: print/log recoverable errors - if ferrs.Err() != nil { - return Only(ctx, errors.Wrap(ferrs.Err(), "Unable to retrieve backup results from storage")) + if ferrs.Failure() != nil { + return Only(ctx, errors.Wrap(ferrs.Failure(), "Unable to retrieve backup results from storage")) } backup.PrintAll(ctx, bups) @@ -492,7 +492,7 @@ func detailsExchangeCmd(cmd *cobra.Command, args []string) error { // runDetailsExchangeCmd actually performs the lookup in backup details. // the fault.Errors return is always non-nil. Callers should check if -// errs.Err() == nil. +// errs.Failure() == nil. func runDetailsExchangeCmd( ctx context.Context, r repository.BackupGetter, @@ -505,12 +505,12 @@ func runDetailsExchangeCmd( d, _, errs := r.BackupDetails(ctx, backupID) // TODO: log/track recoverable errors - if errs.Err() != nil { - if errors.Is(errs.Err(), data.ErrNotFound) { + if errs.Failure() != nil { + if errors.Is(errs.Failure(), data.ErrNotFound) { return nil, errors.Errorf("No backup exists with the id %s", backupID) } - return nil, errors.Wrap(errs.Err(), "Failed to get backup details in the repository") + return nil, errors.Wrap(errs.Failure(), "Failed to get backup details in the repository") } sel := utils.IncludeExchangeRestoreDataSelectors(opts) diff --git a/src/cli/backup/exchange_integration_test.go b/src/cli/backup/exchange_integration_test.go index c24a04173..58b0c70da 100644 --- a/src/cli/backup/exchange_integration_test.go +++ b/src/cli/backup/exchange_integration_test.go @@ -288,8 +288,8 @@ func (suite *PreparedBackupExchangeIntegrationSuite) SetupSuite() { require.NoError(t, err, "retrieving recent backup by ID") require.Equal(t, bIDs, string(b.ID), "repo backup matches results id") _, b, errs := suite.repo.BackupDetails(ctx, bIDs) - require.NoError(t, errs.Err(), "retrieving recent backup details by ID") - require.Empty(t, errs.Errs(), "retrieving recent backup details by ID") + require.NoError(t, errs.Failure(), "retrieving recent backup details by ID") + require.Empty(t, errs.Recovered(), "retrieving recent backup details by ID") require.Equal(t, bIDs, string(b.ID), "repo details matches results id") suite.backupOps[set] = string(b.ID) @@ -397,8 +397,8 @@ func (suite *PreparedBackupExchangeIntegrationSuite) TestExchangeDetailsCmd() { // fetch the details from the repo first deets, _, errs := suite.repo.BackupDetails(ctx, string(bID)) - require.NoError(t, errs.Err()) - require.Empty(t, errs.Errs()) + require.NoError(t, errs.Failure()) + require.Empty(t, errs.Recovered()) cmd := tester.StubRootCmd( "backup", "details", "exchange", diff --git a/src/cli/backup/onedrive.go b/src/cli/backup/onedrive.go index 64f328c7d..f3fabd63f 100644 --- a/src/cli/backup/onedrive.go +++ b/src/cli/backup/onedrive.go @@ -237,8 +237,8 @@ func createOneDriveCmd(cmd *cobra.Command, args []string) error { bups, ferrs := r.Backups(ctx, bIDs) // TODO: print/log recoverable errors - if ferrs.Err() != nil { - return Only(ctx, errors.Wrap(ferrs.Err(), "Unable to retrieve backup results from storage")) + if ferrs.Failure() != nil { + return Only(ctx, errors.Wrap(ferrs.Failure(), "Unable to retrieve backup results from storage")) } backup.PrintAll(ctx, bups) @@ -384,7 +384,7 @@ func detailsOneDriveCmd(cmd *cobra.Command, args []string) error { // runDetailsOneDriveCmd actually performs the lookup in backup details. // the fault.Errors return is always non-nil. Callers should check if -// errs.Err() == nil. +// errs.Failure() == nil. func runDetailsOneDriveCmd( ctx context.Context, r repository.BackupGetter, @@ -397,12 +397,12 @@ func runDetailsOneDriveCmd( d, _, errs := r.BackupDetails(ctx, backupID) // TODO: log/track recoverable errors - if errs.Err() != nil { - if errors.Is(errs.Err(), data.ErrNotFound) { + if errs.Failure() != nil { + if errors.Is(errs.Failure(), data.ErrNotFound) { return nil, errors.Errorf("no backup exists with the id %s", backupID) } - return nil, errors.Wrap(errs.Err(), "Failed to get backup details in the repository") + return nil, errors.Wrap(errs.Failure(), "Failed to get backup details in the repository") } sel := utils.IncludeOneDriveRestoreDataSelectors(opts) diff --git a/src/cli/backup/sharepoint.go b/src/cli/backup/sharepoint.go index 2b64b524e..00239d8b1 100644 --- a/src/cli/backup/sharepoint.go +++ b/src/cli/backup/sharepoint.go @@ -257,8 +257,8 @@ func createSharePointCmd(cmd *cobra.Command, args []string) error { bups, ferrs := r.Backups(ctx, bIDs) // TODO: print/log recoverable errors - if ferrs.Err() != nil { - return Only(ctx, errors.Wrap(ferrs.Err(), "Unable to retrieve backup results from storage")) + if ferrs.Failure() != nil { + return Only(ctx, errors.Wrap(ferrs.Failure(), "Unable to retrieve backup results from storage")) } backup.PrintAll(ctx, bups) @@ -512,7 +512,7 @@ func detailsSharePointCmd(cmd *cobra.Command, args []string) error { // runDetailsSharePointCmd actually performs the lookup in backup details. // the fault.Errors return is always non-nil. Callers should check if -// errs.Err() == nil. +// errs.Failure() == nil. func runDetailsSharePointCmd( ctx context.Context, r repository.BackupGetter, @@ -525,12 +525,12 @@ func runDetailsSharePointCmd( d, _, errs := r.BackupDetails(ctx, backupID) // TODO: log/track recoverable errors - if errs.Err() != nil { - if errors.Is(errs.Err(), data.ErrNotFound) { + if errs.Failure() != nil { + if errors.Is(errs.Failure(), data.ErrNotFound) { return nil, errors.Errorf("no backup exists with the id %s", backupID) } - return nil, errors.Wrap(errs.Err(), "Failed to get backup details in the repository") + return nil, errors.Wrap(errs.Failure(), "Failed to get backup details in the repository") } sel := utils.IncludeSharePointRestoreDataSelectors(opts) diff --git a/src/cli/restore/exchange_integration_test.go b/src/cli/restore/exchange_integration_test.go index ce72f4ee4..4e63b4c79 100644 --- a/src/cli/restore/exchange_integration_test.go +++ b/src/cli/restore/exchange_integration_test.go @@ -110,8 +110,8 @@ func (suite *RestoreExchangeIntegrationSuite) SetupSuite() { require.NoError(t, err, "retrieving recent backup by ID") _, _, errs := suite.repo.BackupDetails(ctx, string(bop.Results.BackupID)) - require.NoError(t, errs.Err(), "retrieving recent backup details by ID") - require.Empty(t, errs.Errs(), "retrieving recent backup details by ID") + require.NoError(t, errs.Failure(), "retrieving recent backup details by ID") + require.Empty(t, errs.Recovered(), "retrieving recent backup details by ID") } } diff --git a/src/cli/utils/testdata/opts.go b/src/cli/utils/testdata/opts.go index bcb05ca4d..c280a04d7 100644 --- a/src/cli/utils/testdata/opts.go +++ b/src/cli/utils/testdata/opts.go @@ -501,7 +501,7 @@ func (MockBackupGetter) Backup( func (MockBackupGetter) Backups( context.Context, []model.StableID, -) ([]*backup.Backup, *fault.Errors) { +) ([]*backup.Backup, *fault.Bus) { return nil, fault.New(false).Fail(errors.New("unexpected call to mock")) } @@ -515,7 +515,7 @@ func (MockBackupGetter) BackupsByTag( func (bg *MockBackupGetter) BackupDetails( ctx context.Context, backupID string, -) (*details.Details, *backup.Backup, *fault.Errors) { +) (*details.Details, *backup.Backup, *fault.Bus) { if bg == nil { return testdata.GetDetailsSet(), nil, fault.New(true) } diff --git a/src/cmd/factory/impl/common.go b/src/cmd/factory/impl/common.go index ff7ed093e..a347d94db 100644 --- a/src/cmd/factory/impl/common.go +++ b/src/cmd/factory/impl/common.go @@ -53,7 +53,7 @@ func generateAndRestoreItems( howMany int, dbf dataBuilderFunc, opts control.Options, - errs *fault.Errors, + errs *fault.Bus, ) (*details.Details, error) { items := make([]item, 0, howMany) diff --git a/src/cmd/factory/impl/exchange.go b/src/cmd/factory/impl/exchange.go index 7e7418982..67fa6f46a 100644 --- a/src/cmd/factory/impl/exchange.go +++ b/src/cmd/factory/impl/exchange.go @@ -79,7 +79,7 @@ func handleExchangeEmailFactory(cmd *cobra.Command, args []string) error { } log := logger.Ctx(ctx) - for _, e := range errs.Errs() { + for _, e := range errs.Recovered() { log.Errorw(e.Error(), clues.InErr(err).Slice()...) } @@ -126,7 +126,7 @@ func handleExchangeCalendarEventFactory(cmd *cobra.Command, args []string) error } log := logger.Ctx(ctx) - for _, e := range errs.Errs() { + for _, e := range errs.Recovered() { log.Errorw(e.Error(), clues.InErr(err).Slice()...) } @@ -178,7 +178,7 @@ func handleExchangeContactFactory(cmd *cobra.Command, args []string) error { } log := logger.Ctx(ctx) - for _, e := range errs.Errs() { + for _, e := range errs.Recovered() { log.Errorw(e.Error(), clues.InErr(err).Slice()...) } diff --git a/src/cmd/getM365/getItem.go b/src/cmd/getM365/getItem.go index 8507d65b2..8846adc76 100644 --- a/src/cmd/getM365/getItem.go +++ b/src/cmd/getM365/getItem.go @@ -93,7 +93,7 @@ func runDisplayM365JSON( ctx context.Context, creds account.M365Config, user, itemID string, - errs *fault.Errors, + errs *fault.Bus, ) error { var ( bs []byte @@ -143,7 +143,7 @@ type itemer interface { GetItem( ctx context.Context, user, itemID string, - errs *fault.Errors, + errs *fault.Bus, ) (serialization.Parsable, *details.ExchangeInfo, error) Serialize( ctx context.Context, @@ -156,7 +156,7 @@ func getItem( ctx context.Context, itm itemer, user, itemID string, - errs *fault.Errors, + errs *fault.Bus, ) ([]byte, error) { sp, _, err := itm.GetItem(ctx, user, itemID, errs) if err != nil { diff --git a/src/internal/connector/data_collections.go b/src/internal/connector/data_collections.go index f8d0d3036..5968bb23c 100644 --- a/src/internal/connector/data_collections.go +++ b/src/internal/connector/data_collections.go @@ -37,7 +37,7 @@ func (gc *GraphConnector) DataCollections( sels selectors.Selector, metadata []data.RestoreCollection, ctrlOpts control.Options, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.BackupCollection, map[string]struct{}, error) { ctx, end := D.Span(ctx, "gc:dataCollections", D.Index("service", sels.Service.String())) defer end() @@ -198,7 +198,7 @@ func (gc *GraphConnector) RestoreDataCollections( dest control.RestoreDestination, opts control.Options, dcs []data.RestoreCollection, - errs *fault.Errors, + errs *fault.Bus, ) (*details.Details, error) { ctx, end := D.Span(ctx, "connector:restore") defer end() diff --git a/src/internal/connector/discovery/api/users.go b/src/internal/connector/discovery/api/users.go index 05528eb2c..05c1f27f0 100644 --- a/src/internal/connector/discovery/api/users.go +++ b/src/internal/connector/discovery/api/users.go @@ -86,7 +86,7 @@ func userOptions(fs *string) *users.UsersRequestBuilderGetRequestConfiguration { } // GetAll retrieves all users. -func (c Users) GetAll(ctx context.Context, errs *fault.Errors) ([]models.Userable, error) { +func (c Users) GetAll(ctx context.Context, errs *fault.Bus) ([]models.Userable, error) { service, err := c.service() if err != nil { return nil, err @@ -110,17 +110,17 @@ func (c Users) GetAll(ctx context.Context, errs *fault.Errors) ([]models.Userabl var ( us = make([]models.Userable, 0) - et = errs.Tracker() + el = errs.Local() ) iterator := func(item any) bool { - if et.Err() != nil { + if el.Failure() != nil { return false } u, err := validateUser(item) if err != nil { - et.Add(clues.Wrap(err, "validating user").WithClues(ctx).With(graph.ErrData(err)...)) + el.AddRecoverable(clues.Wrap(err, "validating user").WithClues(ctx).With(graph.ErrData(err)...)) } else { us = append(us, u) } @@ -132,7 +132,7 @@ func (c Users) GetAll(ctx context.Context, errs *fault.Errors) ([]models.Userabl return nil, clues.Wrap(err, "iterating all users").WithClues(ctx).With(graph.ErrData(err)...) } - return us, et.Err() + return us, el.Failure() } func (c Users) GetByID(ctx context.Context, userID string) (models.Userable, error) { diff --git a/src/internal/connector/discovery/discovery.go b/src/internal/connector/discovery/discovery.go index a8da177e1..ea71eb2bb 100644 --- a/src/internal/connector/discovery/discovery.go +++ b/src/internal/connector/discovery/discovery.go @@ -17,7 +17,7 @@ import ( // --------------------------------------------------------------------------- type getAller interface { - GetAll(context.Context, *fault.Errors) ([]models.Userable, error) + GetAll(context.Context, *fault.Bus) ([]models.Userable, error) } type getter interface { @@ -38,7 +38,7 @@ type getWithInfoer interface { // --------------------------------------------------------------------------- // Users fetches all users in the tenant. -func Users(ctx context.Context, ga getAller, errs *fault.Errors) ([]models.Userable, error) { +func Users(ctx context.Context, ga getAller, errs *fault.Bus) ([]models.Userable, error) { return ga.GetAll(ctx, errs) } diff --git a/src/internal/connector/exchange/api/contacts.go b/src/internal/connector/exchange/api/contacts.go index 0051afe4b..9658db809 100644 --- a/src/internal/connector/exchange/api/contacts.go +++ b/src/internal/connector/exchange/api/contacts.go @@ -72,7 +72,7 @@ func (c Contacts) DeleteContainer( func (c Contacts) GetItem( ctx context.Context, user, itemID string, - _ *fault.Errors, // no attachments to iterate over, so this goes unused + _ *fault.Bus, // no attachments to iterate over, so this goes unused ) (serialization.Parsable, *details.ExchangeInfo, error) { cont, err := c.stable.Client().UsersById(user).ContactsById(itemID).Get(ctx, nil) if err != nil { @@ -109,7 +109,7 @@ func (c Contacts) EnumerateContainers( ctx context.Context, userID, baseDirID string, fn func(graph.CacheFolder) error, - errs *fault.Errors, + errs *fault.Bus, ) error { service, err := c.service() if err != nil { @@ -126,14 +126,14 @@ func (c Contacts) EnumerateContainers( With("options_fields", fields) } - et := errs.Tracker() + el := errs.Local() builder := service.Client(). UsersById(userID). ContactFoldersById(baseDirID). ChildFolders() for { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -143,12 +143,12 @@ func (c Contacts) EnumerateContainers( } for _, fold := range resp.GetValue() { - if et.Err() != nil { - break + if el.Failure() != nil { + return el.Failure() } if err := checkIDAndName(fold); err != nil { - et.Add(clues.Stack(err). + el.AddRecoverable(clues.Stack(err). WithClues(ctx). With(graph.ErrData(err)...). Label(fault.LabelForceNoBackupCreation)) @@ -163,7 +163,7 @@ func (c Contacts) EnumerateContainers( temp := graph.NewCacheFolder(fold, nil, nil) if err := fn(temp); err != nil { - et.Add(clues.Stack(err). + el.AddRecoverable(clues.Stack(err). WithClues(fctx). With(graph.ErrData(err)...). Label(fault.LabelForceNoBackupCreation)) @@ -180,7 +180,7 @@ func (c Contacts) EnumerateContainers( builder = users.NewItemContactFoldersItemChildFoldersRequestBuilder(link, service.Adapter()) } - return et.Err() + return el.Failure() } // --------------------------------------------------------------------------- diff --git a/src/internal/connector/exchange/api/events.go b/src/internal/connector/exchange/api/events.go index d3be3af46..9d373d548 100644 --- a/src/internal/connector/exchange/api/events.go +++ b/src/internal/connector/exchange/api/events.go @@ -96,7 +96,7 @@ func (c Events) GetContainerByID( func (c Events) GetItem( ctx context.Context, user, itemID string, - errs *fault.Errors, + errs *fault.Bus, ) (serialization.Parsable, *details.ExchangeInfo, error) { var ( err error @@ -141,7 +141,7 @@ func (c Events) EnumerateContainers( ctx context.Context, userID, baseDirID string, fn func(graph.CacheFolder) error, - errs *fault.Errors, + errs *fault.Bus, ) error { service, err := c.service() if err != nil { @@ -153,11 +153,11 @@ func (c Events) EnumerateContainers( return clues.Wrap(err, "setting calendar options").WithClues(ctx).With(graph.ErrData(err)...) } - et := errs.Tracker() + el := errs.Local() builder := service.Client().UsersById(userID).Calendars() for { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -167,13 +167,13 @@ func (c Events) EnumerateContainers( } for _, cal := range resp.GetValue() { - if et.Err() != nil { + if el.Failure() != nil { break } cd := CalendarDisplayable{Calendarable: cal} if err := checkIDAndName(cd); err != nil { - et.Add(clues.Stack(err). + el.AddRecoverable(clues.Stack(err). WithClues(ctx). With(graph.ErrData(err)...). Label(fault.LabelForceNoBackupCreation)) @@ -191,7 +191,7 @@ func (c Events) EnumerateContainers( path.Builder{}.Append(ptr.Val(cd.GetId())), // storage path path.Builder{}.Append(ptr.Val(cd.GetDisplayName()))) // display location if err := fn(temp); err != nil { - et.Add(clues.Stack(err). + el.AddRecoverable(clues.Stack(err). WithClues(fctx). With(graph.ErrData(err)...). Label(fault.LabelForceNoBackupCreation)) @@ -208,7 +208,7 @@ func (c Events) EnumerateContainers( builder = users.NewItemCalendarsRequestBuilder(link, service.Adapter()) } - return et.Err() + return el.Failure() } // --------------------------------------------------------------------------- diff --git a/src/internal/connector/exchange/api/mail.go b/src/internal/connector/exchange/api/mail.go index a29aa11fb..baf1d5834 100644 --- a/src/internal/connector/exchange/api/mail.go +++ b/src/internal/connector/exchange/api/mail.go @@ -124,7 +124,7 @@ func (c Mail) GetContainerByID( func (c Mail) GetItem( ctx context.Context, user, itemID string, - errs *fault.Errors, + errs *fault.Bus, ) (serialization.Parsable, *details.ExchangeInfo, error) { mail, err := c.stable.Client().UsersById(user).MessagesById(itemID).Get(ctx, nil) if err != nil { @@ -164,21 +164,21 @@ func (c Mail) EnumerateContainers( ctx context.Context, userID, baseDirID string, fn func(graph.CacheFolder) error, - errs *fault.Errors, + errs *fault.Bus, ) error { service, err := c.service() if err != nil { return clues.Stack(err).WithClues(ctx).With(graph.ErrData(err)...) } - et := errs.Tracker() + el := errs.Local() builder := service.Client(). UsersById(userID). MailFolders(). Delta() for { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -188,7 +188,7 @@ func (c Mail) EnumerateContainers( } for _, v := range resp.GetValue() { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -199,7 +199,7 @@ func (c Mail) EnumerateContainers( temp := graph.NewCacheFolder(v, nil, nil) if err := fn(temp); err != nil { - et.Add(clues.Stack(err). + el.AddRecoverable(clues.Stack(err). WithClues(fctx). With(graph.ErrData(err)...). Label(fault.LabelForceNoBackupCreation)) @@ -216,7 +216,7 @@ func (c Mail) EnumerateContainers( builder = users.NewItemMailFoldersDeltaRequestBuilder(link, service.Adapter()) } - return et.Err() + return el.Failure() } // --------------------------------------------------------------------------- diff --git a/src/internal/connector/exchange/contact_folder_cache.go b/src/internal/connector/exchange/contact_folder_cache.go index 6b6d01513..624503563 100644 --- a/src/internal/connector/exchange/contact_folder_cache.go +++ b/src/internal/connector/exchange/contact_folder_cache.go @@ -47,7 +47,7 @@ func (cfc *contactFolderCache) populateContactRoot( // as of (Oct-07-2022) func (cfc *contactFolderCache) Populate( ctx context.Context, - errs *fault.Errors, + errs *fault.Bus, baseID string, baseContainerPather ...string, ) error { diff --git a/src/internal/connector/exchange/container_resolver.go b/src/internal/connector/exchange/container_resolver.go index 706e4784b..93c0a156d 100644 --- a/src/internal/connector/exchange/container_resolver.go +++ b/src/internal/connector/exchange/container_resolver.go @@ -28,7 +28,7 @@ type containersEnumerator interface { ctx context.Context, userID, baseDirID string, fn func(graph.CacheFolder) error, - errs *fault.Errors, + errs *fault.Bus, ) error } diff --git a/src/internal/connector/exchange/data_collections.go b/src/internal/connector/exchange/data_collections.go index 1d62d4f56..bb84116af 100644 --- a/src/internal/connector/exchange/data_collections.go +++ b/src/internal/connector/exchange/data_collections.go @@ -64,7 +64,7 @@ type DeltaPath struct { func parseMetadataCollections( ctx context.Context, colls []data.RestoreCollection, - errs *fault.Errors, + errs *fault.Bus, ) (CatDeltaPaths, error) { // cdp stores metadata cdp := CatDeltaPaths{ @@ -168,7 +168,7 @@ func DataCollections( acct account.M365Config, su support.StatusUpdater, ctrlOpts control.Options, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.BackupCollection, map[string]struct{}, error) { eb, err := selector.ToExchangeBackup() if err != nil { @@ -178,7 +178,7 @@ func DataCollections( var ( user = selector.DiscreteOwner collections = []data.BackupCollection{} - et = errs.Tracker() + el = errs.Local() ) cdps, err := parseMetadataCollections(ctx, metadata, errs) @@ -187,7 +187,7 @@ func DataCollections( } for _, scope := range eb.Scopes() { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -201,14 +201,14 @@ func DataCollections( su, errs) if err != nil { - et.Add(err) + el.AddRecoverable(err) continue } collections = append(collections, dcs...) } - return collections, nil, et.Err() + return collections, nil, el.Failure() } func getterByType(ac api.Client, category path.CategoryType) (addedAndRemovedItemIDsGetter, error) { @@ -235,7 +235,7 @@ func createCollections( dps DeltaPaths, ctrlOpts control.Options, su support.StatusUpdater, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.BackupCollection, error) { var ( allCollections = make([]data.BackupCollection, 0) diff --git a/src/internal/connector/exchange/event_calendar_cache.go b/src/internal/connector/exchange/event_calendar_cache.go index b7dd1ded4..236406755 100644 --- a/src/internal/connector/exchange/event_calendar_cache.go +++ b/src/internal/connector/exchange/event_calendar_cache.go @@ -62,7 +62,7 @@ func (ecc *eventCalendarCache) populateEventRoot(ctx context.Context) error { // @param baseID: ignored. Present to conform to interface func (ecc *eventCalendarCache) Populate( ctx context.Context, - errs *fault.Errors, + errs *fault.Bus, baseID string, baseContainerPath ...string, ) error { diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index a1082f7c1..8d2381633 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -45,7 +45,7 @@ type itemer interface { GetItem( ctx context.Context, user, itemID string, - errs *fault.Errors, + errs *fault.Bus, ) (serialization.Parsable, *details.ExchangeInfo, error) Serialize( ctx context.Context, @@ -127,7 +127,7 @@ func NewCollection( // Items utility function to asynchronously execute process to fill data channel with // M365 exchange objects and returns the data channel -func (col *Collection) Items(ctx context.Context, errs *fault.Errors) <-chan data.Stream { +func (col *Collection) Items(ctx context.Context, errs *fault.Bus) <-chan data.Stream { go col.streamItems(ctx, errs) return col.data } @@ -163,7 +163,7 @@ func (col Collection) DoNotMergeItems() bool { // streamItems is a utility function that uses col.collectionType to be able to serialize // all the M365IDs defined in the added field. data channel is closed by this function -func (col *Collection) streamItems(ctx context.Context, errs *fault.Errors) { +func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { var ( success int64 totalBytes int64 @@ -177,7 +177,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Errors) { ) defer func() { - col.finishPopulation(ctx, int(success), totalBytes, errs.Err()) + col.finishPopulation(ctx, int(success), totalBytes, errs.Failure()) }() if len(col.added)+len(col.removed) > 0 { @@ -226,7 +226,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Errors) { // add any new items for id := range col.added { - if errs.Err() != nil { + if errs.Failure() != nil { break } @@ -253,7 +253,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Errors) { atomic.AddInt64(&success, 1) log.With("err", err).Infow("item not found", clues.InErr(err).Slice()...) } else { - errs.Add(clues.Wrap(err, "fetching item").Label(fault.LabelForceNoBackupCreation)) + errs.AddRecoverable(clues.Wrap(err, "fetching item").Label(fault.LabelForceNoBackupCreation)) } return @@ -261,7 +261,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Errors) { data, err := col.items.Serialize(ctx, item, user, id) if err != nil { - errs.Add(clues.Wrap(err, "serializing item").Label(fault.LabelForceNoBackupCreation)) + errs.AddRecoverable(clues.Wrap(err, "serializing item").Label(fault.LabelForceNoBackupCreation)) return } @@ -291,7 +291,7 @@ func getItemWithRetries( ctx context.Context, userID, itemID string, items itemer, - errs *fault.Errors, + errs *fault.Bus, ) (serialization.Parsable, *details.ExchangeInfo, error) { item, info, err := items.GetItem(ctx, userID, itemID, errs) if err != nil { diff --git a/src/internal/connector/exchange/exchange_data_collection_test.go b/src/internal/connector/exchange/exchange_data_collection_test.go index b556f7a06..4c75e773d 100644 --- a/src/internal/connector/exchange/exchange_data_collection_test.go +++ b/src/internal/connector/exchange/exchange_data_collection_test.go @@ -30,7 +30,7 @@ type mockItemer struct { func (mi *mockItemer) GetItem( context.Context, string, string, - *fault.Errors, + *fault.Bus, ) (serialization.Parsable, *details.ExchangeInfo, error) { mi.getCount++ return nil, nil, mi.getErr diff --git a/src/internal/connector/exchange/mail_folder_cache.go b/src/internal/connector/exchange/mail_folder_cache.go index 90a070919..91ca208ce 100644 --- a/src/internal/connector/exchange/mail_folder_cache.go +++ b/src/internal/connector/exchange/mail_folder_cache.go @@ -72,7 +72,7 @@ func (mc *mailFolderCache) populateMailRoot(ctx context.Context) error { // for the base container in the cache. func (mc *mailFolderCache) Populate( ctx context.Context, - errs *fault.Errors, + errs *fault.Bus, baseID string, baseContainerPath ...string, ) error { diff --git a/src/internal/connector/exchange/service_functions.go b/src/internal/connector/exchange/service_functions.go index 875ed7e66..07467a59e 100644 --- a/src/internal/connector/exchange/service_functions.go +++ b/src/internal/connector/exchange/service_functions.go @@ -36,7 +36,7 @@ func createService(credentials account.M365Config) (*graph.Service, error) { func PopulateExchangeContainerResolver( ctx context.Context, qp graph.QueryParams, - errs *fault.Errors, + errs *fault.Bus, ) (graph.ContainerResolver, error) { var ( res graph.ContainerResolver diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index bc0398fbc..106c70c8d 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -39,7 +39,7 @@ func filterContainersAndFillCollections( scope selectors.ExchangeScope, dps DeltaPaths, ctrlOpts control.Options, - errs *fault.Errors, + errs *fault.Bus, ) error { var ( // folder ID -> delta url or folder path lookups @@ -68,11 +68,11 @@ func filterContainersAndFillCollections( return err } - et := errs.Tracker() + el := errs.Local() for _, c := range resolver.Items() { - if et.Err() != nil { - return et.Err() + if el.Failure() != nil { + return el.Failure() } cID := *c.GetId() @@ -102,7 +102,7 @@ func filterContainersAndFillCollections( added, removed, newDelta, err := getter.GetAddedAndRemovedItemIDs(ctx, qp.ResourceOwner, cID, prevDelta) if err != nil { if !graph.IsErrDeletedInFlight(err) { - et.Add(clues.Stack(err).Label(fault.LabelForceNoBackupCreation)) + el.AddRecoverable(clues.Stack(err).Label(fault.LabelForceNoBackupCreation)) continue } @@ -157,12 +157,12 @@ func filterContainersAndFillCollections( // in the `previousPath` set, but does not exist in the current container // resolver (which contains all the resource owners' current containers). for id, p := range tombstones { - if et.Err() != nil { - return et.Err() + if el.Failure() != nil { + return el.Failure() } if collections[id] != nil { - et.Add(clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ctx)) + el.AddRecoverable(clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ctx)) continue } @@ -218,7 +218,7 @@ func filterContainersAndFillCollections( collections["metadata"] = col - return et.Err() + return el.Failure() } // produces a set of id:path pairs from the deltapaths map. diff --git a/src/internal/connector/exchange/service_iterators_test.go b/src/internal/connector/exchange/service_iterators_test.go index 80311fbbc..b9755432d 100644 --- a/src/internal/connector/exchange/service_iterators_test.go +++ b/src/internal/connector/exchange/service_iterators_test.go @@ -91,8 +91,8 @@ func (m mockResolver) DestinationNameToID(dest string) string { return m.added[d func (m mockResolver) IDToPath(context.Context, string, bool) (*path.Builder, *path.Builder, error) { return nil, nil, nil } -func (m mockResolver) PathInCache(string) (string, bool) { return "", false } -func (m mockResolver) Populate(context.Context, *fault.Errors, string, ...string) error { return nil } +func (m mockResolver) PathInCache(string) (string, bool) { return "", false } +func (m mockResolver) Populate(context.Context, *fault.Bus, string, ...string) error { return nil } // --------------------------------------------------------------------------- // tests diff --git a/src/internal/connector/exchange/service_restore.go b/src/internal/connector/exchange/service_restore.go index dff05be54..fb945ec54 100644 --- a/src/internal/connector/exchange/service_restore.go +++ b/src/internal/connector/exchange/service_restore.go @@ -37,7 +37,7 @@ func RestoreExchangeObject( policy control.CollisionPolicy, service graph.Servicer, destination, user string, - errs *fault.Errors, + errs *fault.Bus, ) (*details.ExchangeInfo, error) { if policy != control.Copy { return nil, clues.Wrap(clues.New(policy.String()), "policy not supported for Exchange restore").WithClues(ctx) @@ -102,7 +102,7 @@ func RestoreExchangeEvent( service graph.Servicer, cp control.CollisionPolicy, destination, user string, - errs *fault.Errors, + errs *fault.Bus, ) (*details.ExchangeInfo, error) { event, err := support.CreateEventFromBytes(bits) if err != nil { @@ -112,7 +112,7 @@ func RestoreExchangeEvent( ctx = clues.Add(ctx, "item_id", ptr.Val(event.GetId())) var ( - et = errs.Tracker() + el = errs.Local() transformedEvent = support.ToEventSimplified(event) attached []models.Attachmentable ) @@ -140,19 +140,19 @@ func RestoreExchangeEvent( } for _, attach := range attached { - if et.Err() != nil { + if el.Failure() != nil { break } if err := uploadAttachment(ctx, uploader, attach); err != nil { - et.Add(err) + el.AddRecoverable(err) } } info := api.EventInfo(event) info.Size = int64(len(bits)) - return info, et.Err() + return info, el.Failure() } // RestoreMailMessage utility function to place an exchange.Mail @@ -167,7 +167,7 @@ func RestoreMailMessage( service graph.Servicer, cp control.CollisionPolicy, destination, user string, - errs *fault.Errors, + errs *fault.Bus, ) (*details.ExchangeInfo, error) { // Creates messageable object from original bytes originalMessage, err := support.CreateMessageFromBytes(bits) @@ -240,7 +240,7 @@ func SendMailToBackStore( service graph.Servicer, user, destination string, message models.Messageable, - errs *fault.Errors, + errs *fault.Bus, ) error { attached := message.GetAttachments() @@ -257,7 +257,7 @@ func SendMailToBackStore( } var ( - et = errs.Tracker() + el = errs.Local() id = ptr.Val(response.GetId()) uploader = &mailAttachmentUploader{ userID: user, @@ -268,7 +268,7 @@ func SendMailToBackStore( ) for _, attachment := range attached { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -283,13 +283,13 @@ func SendMailToBackStore( continue } - et.Add(errors.Wrap(err, "uploading mail attachment")) + el.AddRecoverable(errors.Wrap(err, "uploading mail attachment")) break } } - return et.Err() + return el.Failure() } // RestoreExchangeDataCollections restores M365 objects in data.RestoreCollection to MSFT @@ -302,7 +302,7 @@ func RestoreExchangeDataCollections( dest control.RestoreDestination, dcs []data.RestoreCollection, deets *details.Builder, - errs *fault.Errors, + errs *fault.Bus, ) (*support.ConnectorOperationStatus, error) { var ( directoryCaches = make(map[string]map[path.CategoryType]graph.ContainerResolver) @@ -310,7 +310,7 @@ func RestoreExchangeDataCollections( userID string // TODO policy to be updated from external source after completion of refactoring policy = control.Copy - et = errs.Tracker() + el = errs.Local() ) if len(dcs) > 0 { @@ -319,7 +319,7 @@ func RestoreExchangeDataCollections( } for _, dc := range dcs { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -337,7 +337,7 @@ func RestoreExchangeDataCollections( userCaches, errs) if err != nil { - et.Add(clues.Wrap(err, "creating destination").WithClues(ctx)) + el.AddRecoverable(clues.Wrap(err, "creating destination").WithClues(ctx)) continue } @@ -355,10 +355,10 @@ func RestoreExchangeDataCollections( support.Restore, len(dcs), metrics, - et.Err(), + el.Failure(), dest.ContainerName) - return status, et.Err() + return status, el.Failure() } // restoreCollection handles restoration of an individual collection. @@ -369,7 +369,7 @@ func restoreCollection( folderID string, policy control.CollisionPolicy, deets *details.Builder, - errs *fault.Errors, + errs *fault.Bus, ) (support.CollectionMetrics, bool) { ctx, end := D.Span(ctx, "gc:exchange:restoreCollection", D.Label("path", dc.FullPath())) defer end() @@ -400,11 +400,11 @@ func restoreCollection( for { select { case <-ctx.Done(): - errs.Add(clues.Wrap(ctx.Err(), "context cancelled").WithClues(ctx)) + errs.AddRecoverable(clues.Wrap(ctx.Err(), "context cancelled").WithClues(ctx)) return metrics, true case itemData, ok := <-items: - if !ok || errs.Err() != nil { + if !ok || errs.Failure() != nil { return metrics, false } @@ -416,7 +416,7 @@ func restoreCollection( _, err := buf.ReadFrom(itemData.ToReader()) if err != nil { - errs.Add(clues.Wrap(err, "reading item bytes").WithClues(ictx)) + errs.AddRecoverable(clues.Wrap(err, "reading item bytes").WithClues(ictx)) continue } @@ -432,7 +432,7 @@ func restoreCollection( user, errs) if err != nil { - errs.Add(err) + errs.AddRecoverable(err) continue } @@ -441,7 +441,7 @@ func restoreCollection( itemPath, err := dc.FullPath().Append(itemData.UUID(), true) if err != nil { - errs.Add(clues.Wrap(err, "building full path with item").WithClues(ctx)) + errs.AddRecoverable(clues.Wrap(err, "building full path with item").WithClues(ctx)) continue } @@ -476,7 +476,7 @@ func CreateContainerDestination( directory path.Path, destination string, caches map[path.CategoryType]graph.ContainerResolver, - errs *fault.Errors, + errs *fault.Bus, ) (string, error) { var ( newCache = false @@ -589,7 +589,7 @@ func establishMailRestoreLocation( mfc graph.ContainerResolver, user string, isNewCache bool, - errs *fault.Errors, + errs *fault.Bus, ) (string, error) { // Process starts with the root folder in order to recreate // the top-level folder with the same tactic @@ -648,7 +648,7 @@ func establishContactsRestoreLocation( cfc graph.ContainerResolver, user string, isNewCache bool, - errs *fault.Errors, + errs *fault.Bus, ) (string, error) { cached, ok := cfc.PathInCache(folders[0]) if ok { @@ -684,7 +684,7 @@ func establishEventsRestoreLocation( ecc graph.ContainerResolver, // eventCalendarCache user string, isNewCache bool, - errs *fault.Errors, + errs *fault.Bus, ) (string, error) { // Need to prefix with the "Other Calendars" folder so lookup happens properly. cached, ok := ecc.PathInCache(folders[0]) diff --git a/src/internal/connector/graph/cache_container.go b/src/internal/connector/graph/cache_container.go index 89b40c25a..224b3d027 100644 --- a/src/internal/connector/graph/cache_container.go +++ b/src/internal/connector/graph/cache_container.go @@ -65,7 +65,7 @@ type ContainerResolver interface { // @param ctx is necessary param for Graph API tracing // @param baseFolderID represents the M365ID base that the resolver will // conclude its search. Default input is "". - Populate(ctx context.Context, errs *fault.Errors, baseFolderID string, baseContainerPather ...string) error + Populate(ctx context.Context, errs *fault.Bus, baseFolderID string, baseContainerPather ...string) error // PathInCache performs a look up of a path reprensentation // and returns the m365ID of directory iff the pathString diff --git a/src/internal/connector/graph/metadata_collection.go b/src/internal/connector/graph/metadata_collection.go index cbb72250a..478e61653 100644 --- a/src/internal/connector/graph/metadata_collection.go +++ b/src/internal/connector/graph/metadata_collection.go @@ -134,7 +134,7 @@ func (md MetadataCollection) DoNotMergeItems() bool { func (md MetadataCollection) Items( ctx context.Context, - _ *fault.Errors, // not used, just here for interface compliance + _ *fault.Bus, // not used, just here for interface compliance ) <-chan data.Stream { res := make(chan data.Stream) diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index da6beea23..b52813568 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -68,7 +68,7 @@ func NewGraphConnector( itemClient *http.Client, acct account.Account, r resource, - errs *fault.Errors, + errs *fault.Bus, ) (*GraphConnector, error) { m365, err := acct.M365Config() if err != nil { @@ -129,7 +129,7 @@ func (gc *GraphConnector) createService() (*graph.Service, error) { // setTenantUsers queries the M365 to identify the users in the // workspace. The users field is updated during this method // iff the returned error is nil -func (gc *GraphConnector) setTenantUsers(ctx context.Context, errs *fault.Errors) error { +func (gc *GraphConnector) setTenantUsers(ctx context.Context, errs *fault.Bus) error { ctx, end := D.Span(ctx, "gc:setTenantUsers") defer end() @@ -150,7 +150,7 @@ func (gc *GraphConnector) setTenantUsers(ctx context.Context, errs *fault.Errors // setTenantSites queries the M365 to identify the sites in the // workspace. The sites field is updated during this method // iff the returned error is nil. -func (gc *GraphConnector) setTenantSites(ctx context.Context, errs *fault.Errors) error { +func (gc *GraphConnector) setTenantSites(ctx context.Context, errs *fault.Bus) error { gc.Sites = map[string]string{} ctx, end := D.Span(ctx, "gc:setTenantSites") @@ -222,7 +222,7 @@ func (gc *GraphConnector) GetSiteIDs() []string { func (gc *GraphConnector) UnionSiteIDsAndWebURLs( ctx context.Context, ids, urls []string, - errs *fault.Errors, + errs *fault.Bus, ) ([]string, error) { if len(gc.Sites) == 0 { if err := gc.setTenantSites(ctx, errs); err != nil { @@ -308,7 +308,7 @@ func getResources( query func(context.Context, graph.Servicer) (serialization.Parsable, error), parser func(parseNode serialization.ParseNode) (serialization.Parsable, error), identify func(any) (string, string, error), - errs *fault.Errors, + errs *fault.Bus, ) (map[string]string, error) { resources := map[string]string{} @@ -324,17 +324,17 @@ func getResources( return nil, clues.Stack(err).WithClues(ctx).With(graph.ErrData(err)...) } - et := errs.Tracker() + el := errs.Local() callbackFunc := func(item any) bool { - if et.Err() != nil { + if el.Failure() != nil { return false } k, v, err := identify(item) if err != nil { if !errors.Is(err, errKnownSkippableCase) { - et.Add(clues.Stack(err). + el.AddRecoverable(clues.Stack(err). WithClues(ctx). With("query_url", gs.Adapter().GetBaseUrl())) } @@ -351,5 +351,5 @@ func getResources( return nil, clues.Stack(err).WithClues(ctx).With(graph.ErrData(err)...) } - return resources, et.Err() + return resources, el.Failure() } diff --git a/src/internal/connector/mockconnector/mock_data_collection.go b/src/internal/connector/mockconnector/mock_data_collection.go index 4718c2a29..b8c86ebc8 100644 --- a/src/internal/connector/mockconnector/mock_data_collection.go +++ b/src/internal/connector/mockconnector/mock_data_collection.go @@ -111,7 +111,7 @@ func (medc MockExchangeDataCollection) DoNotMergeItems() bool { return med // channel is closed when there are no more items available. func (medc *MockExchangeDataCollection) Items( ctx context.Context, - _ *fault.Errors, // unused + _ *fault.Bus, // unused ) <-chan data.Stream { res := make(chan data.Stream) diff --git a/src/internal/connector/mockconnector/mock_data_list.go b/src/internal/connector/mockconnector/mock_data_list.go index 8fe9d06ab..6eaa5deab 100644 --- a/src/internal/connector/mockconnector/mock_data_list.go +++ b/src/internal/connector/mockconnector/mock_data_list.go @@ -48,7 +48,7 @@ func (mlc *MockListCollection) PreviousPath() path.Path { func (mlc *MockListCollection) Items( ctx context.Context, - _ *fault.Errors, // unused + _ *fault.Bus, // unused ) <-chan data.Stream { res := make(chan data.Stream) diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 34c0152fa..4caf0b8bc 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -165,7 +165,7 @@ func (oc *Collection) IsEmpty() bool { // Items() returns the channel containing M365 Exchange objects func (oc *Collection) Items( ctx context.Context, - errs *fault.Errors, // TODO: currently unused while onedrive isn't up to date with clues/fault + errs *fault.Bus, // TODO: currently unused while onedrive isn't up to date with clues/fault ) <-chan data.Stream { go oc.populateItems(ctx, errs) return oc.data @@ -241,7 +241,7 @@ func (od *Item) ModTime() time.Time { // populateItems iterates through items added to the collection // and uses the collection `itemReader` to read the item -func (oc *Collection) populateItems(ctx context.Context, errs *fault.Errors) { +func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { var ( byteCount int64 itemsRead int64 @@ -249,7 +249,7 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Errors) { itemsFound int64 dirsFound int64 wg sync.WaitGroup - et = errs.Tracker() + el = errs.Local() ) // Retrieve the OneDrive folder path to set later in @@ -272,7 +272,7 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Errors) { defer close(semaphoreCh) for _, item := range oc.driveItems { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -323,7 +323,7 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Errors) { oc.ctrl.ToggleFeatures.EnablePermissionsBackup) if err != nil { - et.Add(clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation)) + el.AddRecoverable(clues.Wrap(err, "getting item metadata").Label(fault.LabelForceNoBackupCreation)) return } } @@ -370,7 +370,7 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Errors) { // check for errors following retries if err != nil { - et.Add(clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) + el.AddRecoverable(clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) return nil, err } @@ -443,7 +443,7 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Errors) { wg.Wait() - oc.reportAsCompleted(ctx, int(itemsFound), int(itemsRead), byteCount, et.Err()) + oc.reportAsCompleted(ctx, int(itemsFound), int(itemsRead), byteCount, el.Failure()) } func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64, err error) { diff --git a/src/internal/connector/onedrive/collections.go b/src/internal/connector/onedrive/collections.go index 9f0a6c6e4..2899e83fc 100644 --- a/src/internal/connector/onedrive/collections.go +++ b/src/internal/connector/onedrive/collections.go @@ -119,7 +119,7 @@ func NewCollections( func deserializeMetadata( ctx context.Context, cols []data.RestoreCollection, - errs *fault.Errors, + errs *fault.Bus, ) (map[string]string, map[string]map[string]string, error) { logger.Ctx(ctx).Infow( "deserialzing previous backup metadata", @@ -128,11 +128,11 @@ func deserializeMetadata( var ( prevDeltas = map[string]string{} prevFolders = map[string]map[string]string{} - et = errs.Tracker() + el = errs.Local() ) for _, col := range cols { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -186,7 +186,7 @@ func deserializeMetadata( err = clues.Stack(err).WithClues(ictx) - et.Add(err) + el.AddRecoverable(err) logger.Ctx(ictx). With("err", err). Errorw("deserializing base backup metadata", clues.InErr(err).Slice()...) @@ -216,7 +216,7 @@ func deserializeMetadata( } } - return prevDeltas, prevFolders, et.Err() + return prevDeltas, prevFolders, el.Failure() } var errExistingMapping = clues.New("mapping already exists for same drive ID") @@ -257,7 +257,7 @@ func deserializeMap[T any](reader io.ReadCloser, alreadyFound map[string]T) erro func (c *Collections) Get( ctx context.Context, prevMetadata []data.RestoreCollection, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.BackupCollection, map[string]struct{}, error) { prevDeltas, oldPathsByDriveID, err := deserializeMetadata(ctx, prevMetadata, errs) if err != nil { @@ -599,12 +599,12 @@ func (c *Collections) UpdateCollections( excluded map[string]struct{}, itemCollection map[string]string, invalidPrevDelta bool, - errs *fault.Errors, + errs *fault.Bus, ) error { - et := errs.Tracker() + el := errs.Local() for _, item := range items { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -632,7 +632,9 @@ func (c *Collections) UpdateCollections( collectionPath, err := c.getCollectionPath(driveID, item) if err != nil { - return clues.Stack(err).WithClues(ictx) + el.AddRecoverable(clues.Stack(err). + WithClues(ictx). + Label(fault.LabelForceNoBackupCreation)) } // Skip items that don't match the folder selectors we were given. @@ -650,7 +652,7 @@ func (c *Collections) UpdateCollections( if ok { prevPath, err = path.FromDataLayerPath(prevPathStr, false) if err != nil { - et.Add(clues.Wrap(err, "invalid previous path"). + el.AddRecoverable(clues.Wrap(err, "invalid previous path"). WithClues(ictx). With("path_string", prevPathStr)) } @@ -754,7 +756,7 @@ func (c *Collections) UpdateCollections( } } - return et.Err() + return el.Failure() } func shouldSkipDrive(ctx context.Context, drivePath path.Path, m folderMatcher, driveName string) bool { diff --git a/src/internal/connector/onedrive/collections_test.go b/src/internal/connector/onedrive/collections_test.go index 165394508..954bffc11 100644 --- a/src/internal/connector/onedrive/collections_test.go +++ b/src/internal/connector/onedrive/collections_test.go @@ -2046,7 +2046,7 @@ func (suite *OneDriveCollectionsSuite) TestCollectItems() { excluded map[string]struct{}, itemCollection map[string]string, doNotMergeItems bool, - errs *fault.Errors, + errs *fault.Bus, ) error { return nil } diff --git a/src/internal/connector/onedrive/data_collections.go b/src/internal/connector/onedrive/data_collections.go index aaac83f3c..31ed31f8d 100644 --- a/src/internal/connector/onedrive/data_collections.go +++ b/src/internal/connector/onedrive/data_collections.go @@ -38,7 +38,7 @@ func DataCollections( service graph.Servicer, su support.StatusUpdater, ctrlOpts control.Options, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.BackupCollection, map[string]struct{}, error) { odb, err := selector.ToOneDriveBackup() if err != nil { @@ -46,7 +46,7 @@ func DataCollections( } var ( - et = errs.Tracker() + el = errs.Local() user = selector.DiscreteOwner collections = []data.BackupCollection{} allExcludes = map[string]struct{}{} @@ -54,7 +54,7 @@ func DataCollections( // for each scope that includes oneDrive items, get all for _, scope := range odb.Scopes() { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -72,7 +72,7 @@ func DataCollections( odcs, excludes, err := nc.Get(ctx, metadata, errs) if err != nil { - et.Add(clues.Stack(err).Label(fault.LabelForceNoBackupCreation)) + el.AddRecoverable(clues.Stack(err).Label(fault.LabelForceNoBackupCreation)) } collections = append(collections, odcs...) @@ -80,5 +80,5 @@ func DataCollections( maps.Copy(allExcludes, excludes) } - return collections, allExcludes, et.Err() + return collections, allExcludes, el.Failure() } diff --git a/src/internal/connector/onedrive/drive.go b/src/internal/connector/onedrive/drive.go index 0ef7ed965..bcc78b124 100644 --- a/src/internal/connector/onedrive/drive.go +++ b/src/internal/connector/onedrive/drive.go @@ -150,7 +150,7 @@ type itemCollector func( excluded map[string]struct{}, fileCollectionMap map[string]string, validPrevDelta bool, - errs *fault.Errors, + errs *fault.Bus, ) error type itemPager interface { @@ -196,7 +196,7 @@ func collectItems( collector itemCollector, oldPaths map[string]string, prevDelta string, - errs *fault.Errors, + errs *fault.Bus, ) (DeltaUpdate, map[string]string, map[string]struct{}, error) { var ( newDeltaURL = "" @@ -360,7 +360,7 @@ func GetAllFolders( gs graph.Servicer, pager drivePager, prefix string, - errs *fault.Errors, + errs *fault.Bus, ) ([]*Displayable, error) { drives, err := drives(ctx, pager, true) if err != nil { @@ -369,11 +369,11 @@ func GetAllFolders( var ( folders = map[string]*Displayable{} - et = errs.Tracker() + el = errs.Local() ) for _, d := range drives { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -392,7 +392,7 @@ func GetAllFolders( excluded map[string]struct{}, itemCollection map[string]string, doNotMergeItems bool, - errs *fault.Errors, + errs *fault.Bus, ) error { for _, item := range items { // Skip the root item. @@ -425,7 +425,7 @@ func GetAllFolders( _, _, _, err = collectItems(ictx, defaultItemPager(gs, id, ""), id, name, collector, map[string]string{}, "", errs) if err != nil { - et.Add(clues.Wrap(err, "enumerating items in drive")) + el.AddRecoverable(clues.Wrap(err, "enumerating items in drive")) } } @@ -435,7 +435,7 @@ func GetAllFolders( res = append(res, f) } - return res, et.Err() + return res, el.Failure() } func DeleteItem( diff --git a/src/internal/connector/onedrive/item_test.go b/src/internal/connector/onedrive/item_test.go index f386aa118..fca6a5614 100644 --- a/src/internal/connector/onedrive/item_test.go +++ b/src/internal/connector/onedrive/item_test.go @@ -75,7 +75,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() { excluded map[string]struct{}, itemCollection map[string]string, doNotMergeItems bool, - errs *fault.Errors, + errs *fault.Bus, ) error { for _, item := range items { if item.GetFile() != nil { diff --git a/src/internal/connector/onedrive/restore.go b/src/internal/connector/onedrive/restore.go index 759edb53b..28631a362 100644 --- a/src/internal/connector/onedrive/restore.go +++ b/src/internal/connector/onedrive/restore.go @@ -39,7 +39,7 @@ func RestoreCollections( opts control.Options, dcs []data.RestoreCollection, deets *details.Builder, - errs *fault.Errors, + errs *fault.Bus, ) (*support.ConnectorOperationStatus, error) { var ( restoreMetrics support.CollectionMetrics @@ -63,13 +63,13 @@ func RestoreCollections( }) var ( - et = errs.Tracker() + el = errs.Local() parentPermissions = map[string][]UserPermission{} ) // Iterate through the data collections and restore the contents of each for _, dc := range dcs { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -95,7 +95,7 @@ func RestoreCollections( opts.RestorePermissions, errs) if err != nil { - et.Add(err) + el.AddRecoverable(err) } for k, v := range folderPerms { @@ -114,10 +114,10 @@ func RestoreCollections( support.Restore, len(dcs), restoreMetrics, - et.Err(), + el.Failure(), dest.ContainerName) - return status, et.Err() + return status, el.Failure() } // RestoreCollection handles restoration of an individual collection. @@ -135,7 +135,7 @@ func RestoreCollection( deets *details.Builder, permissionIDMappings map[string]string, restorePerms bool, - errs *fault.Errors, + errs *fault.Bus, ) (support.CollectionMetrics, map[string][]UserPermission, map[string]string, error) { var ( metrics = support.CollectionMetrics{} @@ -195,12 +195,12 @@ func RestoreCollection( } var ( - et = errs.Tracker() + el = errs.Local() items = dc.Items(ctx, errs) ) for { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -215,7 +215,7 @@ func RestoreCollection( itemPath, err := dc.FullPath().Append(itemData.UUID(), true) if err != nil { - et.Add(clues.Wrap(err, "appending item to full path").WithClues(ctx)) + el.AddRecoverable(clues.Wrap(err, "appending item to full path").WithClues(ctx)) continue } @@ -262,7 +262,7 @@ func RestoreCollection( } if err != nil { - et.Add(err) + el.AddRecoverable(err) continue } @@ -292,7 +292,7 @@ func RestoreCollection( meta, err := getMetadata(metaReader) if err != nil { - et.Add(clues.Wrap(err, "getting directory metadata").WithClues(ctx)) + el.AddRecoverable(clues.Wrap(err, "getting directory metadata").WithClues(ctx)) continue } @@ -315,7 +315,7 @@ func RestoreCollection( copyBuffer, source) if err != nil { - et.Add(err) + el.AddRecoverable(err) continue } @@ -331,7 +331,7 @@ func RestoreCollection( } } - return metrics, folderPerms, permissionIDMappings, et.Err() + return metrics, folderPerms, permissionIDMappings, el.Failure() } type fileFetcher interface { diff --git a/src/internal/connector/sharepoint/api/pages.go b/src/internal/connector/sharepoint/api/pages.go index 136c7c750..168202533 100644 --- a/src/internal/connector/sharepoint/api/pages.go +++ b/src/internal/connector/sharepoint/api/pages.go @@ -30,13 +30,13 @@ func GetSitePages( serv *discover.BetaService, siteID string, pages []string, - errs *fault.Errors, + errs *fault.Bus, ) ([]models.SitePageable, error) { var ( col = make([]models.SitePageable, 0) semaphoreCh = make(chan struct{}, fetchChannelSize) opts = retrieveSitePageOptions() - et = errs.Tracker() + el = errs.Local() wg sync.WaitGroup m sync.Mutex ) @@ -51,7 +51,7 @@ func GetSitePages( } for _, entry := range pages { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -70,7 +70,7 @@ func GetSitePages( page, err = serv.Client().SitesById(siteID).PagesById(pageID).Get(ctx, opts) if err != nil { - et.Add(clues.Wrap(err, "fetching page").WithClues(ctx).With(graph.ErrData(err)...)) + el.AddRecoverable(clues.Wrap(err, "fetching page").WithClues(ctx).With(graph.ErrData(err)...)) return } @@ -80,7 +80,7 @@ func GetSitePages( wg.Wait() - return col, et.Err() + return col, el.Failure() } // GetSite returns a minimal Site with the SiteID and the WebURL diff --git a/src/internal/connector/sharepoint/collection.go b/src/internal/connector/sharepoint/collection.go index 8a2b11bab..e3e7db661 100644 --- a/src/internal/connector/sharepoint/collection.go +++ b/src/internal/connector/sharepoint/collection.go @@ -112,7 +112,7 @@ func (sc Collection) DoNotMergeItems() bool { func (sc *Collection) Items( ctx context.Context, - errs *fault.Errors, + errs *fault.Bus, ) <-chan data.Stream { go sc.populate(ctx, errs) return sc.data @@ -183,7 +183,7 @@ func (sc *Collection) finishPopulation( } // populate utility function to retrieve data from back store for a given collection -func (sc *Collection) populate(ctx context.Context, errs *fault.Errors) { +func (sc *Collection) populate(ctx context.Context, errs *fault.Bus) { var ( metrics numMetrics writer = kw.NewJsonSerializationWriter() @@ -221,11 +221,11 @@ func (sc *Collection) retrieveLists( ctx context.Context, wtr *kw.JsonSerializationWriter, progress chan<- struct{}, - errs *fault.Errors, + errs *fault.Bus, ) (numMetrics, error) { var ( metrics numMetrics - et = errs.Tracker() + el = errs.Local() ) lists, err := loadSiteLists(ctx, sc.service, sc.fullPath.ResourceOwner(), sc.jobs, errs) @@ -237,13 +237,13 @@ func (sc *Collection) retrieveLists( // For each models.Listable, object is serialized and the metrics are collected. // The progress is objected via the passed in channel. for _, lst := range lists { - if et.Err() != nil { + if el.Failure() != nil { break } byteArray, err := serializeContent(wtr, lst) if err != nil { - et.Add(clues.Wrap(err, "serializing list").WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) + el.AddRecoverable(clues.Wrap(err, "serializing list").WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) continue } @@ -269,18 +269,18 @@ func (sc *Collection) retrieveLists( } } - return metrics, et.Err() + return metrics, el.Failure() } func (sc *Collection) retrievePages( ctx context.Context, wtr *kw.JsonSerializationWriter, progress chan<- struct{}, - errs *fault.Errors, + errs *fault.Bus, ) (numMetrics, error) { var ( metrics numMetrics - et = errs.Tracker() + el = errs.Local() ) betaService := sc.betaService @@ -305,13 +305,13 @@ func (sc *Collection) retrievePages( // Pageable objects are not supported in v1.0 of msgraph at this time. // TODO: Verify Parsable interface supported with modified-Pageable for _, pg := range pages { - if et.Err() != nil { + if el.Failure() != nil { break } byteArray, err := serializeContent(wtr, pg) if err != nil { - et.Add(clues.Wrap(err, "serializing page").WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) + el.AddRecoverable(clues.Wrap(err, "serializing page").WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) continue } @@ -331,7 +331,7 @@ func (sc *Collection) retrievePages( } } - return metrics, et.Err() + return metrics, el.Failure() } func serializeContent(writer *kw.JsonSerializationWriter, obj absser.Parsable) ([]byte, error) { diff --git a/src/internal/connector/sharepoint/data_collections.go b/src/internal/connector/sharepoint/data_collections.go index 67c08d423..469822eea 100644 --- a/src/internal/connector/sharepoint/data_collections.go +++ b/src/internal/connector/sharepoint/data_collections.go @@ -36,7 +36,7 @@ func DataCollections( serv graph.Servicer, su statusUpdater, ctrlOpts control.Options, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.BackupCollection, map[string]struct{}, error) { b, err := selector.ToSharePointBackup() if err != nil { @@ -44,13 +44,13 @@ func DataCollections( } var ( - et = errs.Tracker() + el = errs.Local() site = b.DiscreteOwner collections = []data.BackupCollection{} ) for _, scope := range b.Scopes() { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -74,7 +74,7 @@ func DataCollections( ctrlOpts, errs) if err != nil { - et.Add(err) + el.AddRecoverable(err) continue } @@ -90,7 +90,7 @@ func DataCollections( ctrlOpts, errs) if err != nil { - et.Add(err) + el.AddRecoverable(err) continue } @@ -104,7 +104,7 @@ func DataCollections( ctrlOpts, errs) if err != nil { - et.Add(err) + el.AddRecoverable(err) continue } } @@ -113,7 +113,7 @@ func DataCollections( foldersComplete <- struct{}{} } - return collections, nil, et.Err() + return collections, nil, el.Failure() } func collectLists( @@ -122,12 +122,12 @@ func collectLists( tenantID, siteID string, updater statusUpdater, ctrlOpts control.Options, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.BackupCollection, error) { logger.Ctx(ctx).With("site", siteID).Debug("Creating SharePoint List Collections") var ( - et = errs.Tracker() + el = errs.Local() spcs = make([]data.BackupCollection, 0) ) @@ -137,7 +137,7 @@ func collectLists( } for _, tuple := range lists { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -148,7 +148,7 @@ func collectLists( path.ListsCategory, false) if err != nil { - et.Add(clues.Wrap(err, "creating list collection path").WithClues(ctx)) + el.AddRecoverable(clues.Wrap(err, "creating list collection path").WithClues(ctx)) } collection := NewCollection(dir, serv, List, updater.UpdateStatus, ctrlOpts) @@ -157,7 +157,7 @@ func collectLists( spcs = append(spcs, collection) } - return spcs, et.Err() + return spcs, el.Failure() } // collectLibraries constructs a onedrive Collections struct and Get()s @@ -170,7 +170,7 @@ func collectLibraries( scope selectors.SharePointScope, updater statusUpdater, ctrlOpts control.Options, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.BackupCollection, map[string]struct{}, error) { logger.Ctx(ctx).Debug("creating SharePoint Library collections") @@ -206,12 +206,12 @@ func collectPages( siteID string, updater statusUpdater, ctrlOpts control.Options, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.BackupCollection, error) { logger.Ctx(ctx).Debug("creating SharePoint Pages collections") var ( - et = errs.Tracker() + el = errs.Local() spcs = make([]data.BackupCollection, 0) ) @@ -230,7 +230,7 @@ func collectPages( } for _, tuple := range tuples { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -241,7 +241,7 @@ func collectPages( path.PagesCategory, false) if err != nil { - et.Add(clues.Wrap(err, "creating page collection path").WithClues(ctx)) + el.AddRecoverable(clues.Wrap(err, "creating page collection path").WithClues(ctx)) } collection := NewCollection(dir, serv, Pages, updater.UpdateStatus, ctrlOpts) @@ -251,7 +251,7 @@ func collectPages( spcs = append(spcs, collection) } - return spcs, et.Err() + return spcs, el.Failure() } type folderMatcher struct { diff --git a/src/internal/connector/sharepoint/list.go b/src/internal/connector/sharepoint/list.go index f753d847e..79989983c 100644 --- a/src/internal/connector/sharepoint/list.go +++ b/src/internal/connector/sharepoint/list.go @@ -92,12 +92,12 @@ func loadSiteLists( gs graph.Servicer, siteID string, listIDs []string, - errs *fault.Errors, + errs *fault.Bus, ) ([]models.Listable, error) { var ( results = make([]models.Listable, 0) semaphoreCh = make(chan struct{}, fetchChannelSize) - et = errs.Tracker() + el = errs.Local() wg sync.WaitGroup m sync.Mutex ) @@ -112,7 +112,7 @@ func loadSiteLists( } for _, listID := range listIDs { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -131,13 +131,13 @@ func loadSiteLists( entry, err = gs.Client().SitesById(siteID).ListsById(id).Get(ctx, nil) if err != nil { - et.Add(clues.Wrap(err, "getting site list").WithClues(ctx).With(graph.ErrData(err)...)) + el.AddRecoverable(clues.Wrap(err, "getting site list").WithClues(ctx).With(graph.ErrData(err)...)) return } cols, cTypes, lItems, err := fetchListContents(ctx, gs, siteID, id, errs) if err != nil { - et.Add(clues.Wrap(err, "getting list contents")) + el.AddRecoverable(clues.Wrap(err, "getting list contents")) return } @@ -150,7 +150,7 @@ func loadSiteLists( wg.Wait() - return results, et.Err() + return results, el.Failure() } // fetchListContents utility function to retrieve associated M365 relationships @@ -160,7 +160,7 @@ func fetchListContents( ctx context.Context, service graph.Servicer, siteID, listID string, - errs *fault.Errors, + errs *fault.Bus, ) ( []models.ColumnDefinitionable, []models.ContentTypeable, @@ -193,17 +193,17 @@ func fetchListItems( ctx context.Context, gs graph.Servicer, siteID, listID string, - errs *fault.Errors, + errs *fault.Bus, ) ([]models.ListItemable, error) { var ( prefix = gs.Client().SitesById(siteID).ListsById(listID) builder = prefix.Items() itms = make([]models.ListItemable, 0) - et = errs.Tracker() + el = errs.Local() ) for { - if errs.Err() != nil { + if errs.Failure() != nil { break } @@ -213,7 +213,7 @@ func fetchListItems( } for _, itm := range resp.GetValue() { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -221,7 +221,7 @@ func fetchListItems( fields, err := newPrefix.Fields().Get(ctx, nil) if err != nil { - et.Add(clues.Wrap(err, "getting list fields").WithClues(ctx).With(graph.ErrData(err)...)) + el.AddRecoverable(clues.Wrap(err, "getting list fields").WithClues(ctx).With(graph.ErrData(err)...)) continue } @@ -237,7 +237,7 @@ func fetchListItems( builder = mssite.NewItemListsItemItemsRequestBuilder(*resp.GetOdataNextLink(), gs.Adapter()) } - return itms, et.Err() + return itms, el.Failure() } // fetchColumns utility function to return columns from a site. @@ -300,16 +300,16 @@ func fetchContentTypes( ctx context.Context, gs graph.Servicer, siteID, listID string, - errs *fault.Errors, + errs *fault.Bus, ) ([]models.ContentTypeable, error) { var ( - et = errs.Tracker() + el = errs.Local() cTypes = make([]models.ContentTypeable, 0) builder = gs.Client().SitesById(siteID).ListsById(listID).ContentTypes() ) for { - if errs.Err() != nil { + if errs.Failure() != nil { break } @@ -319,7 +319,7 @@ func fetchContentTypes( } for _, cont := range resp.GetValue() { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -327,7 +327,7 @@ func fetchContentTypes( links, err := fetchColumnLinks(ctx, gs, siteID, listID, id) if err != nil { - et.Add(err) + el.AddRecoverable(err) continue } @@ -335,7 +335,7 @@ func fetchContentTypes( cs, err := fetchColumns(ctx, gs, siteID, listID, id) if err != nil { - et.Add(err) + el.AddRecoverable(err) continue } @@ -351,7 +351,7 @@ func fetchContentTypes( builder = mssite.NewItemListsItemContentTypesRequestBuilder(*resp.GetOdataNextLink(), gs.Adapter()) } - return cTypes, et.Err() + return cTypes, el.Failure() } func fetchColumnLinks( diff --git a/src/internal/connector/sharepoint/restore.go b/src/internal/connector/sharepoint/restore.go index 3eeec9f4b..890dd23a4 100644 --- a/src/internal/connector/sharepoint/restore.go +++ b/src/internal/connector/sharepoint/restore.go @@ -46,7 +46,7 @@ func RestoreCollections( dest control.RestoreDestination, dcs []data.RestoreCollection, deets *details.Builder, - errs *fault.Errors, + errs *fault.Bus, ) (*support.ConnectorOperationStatus, error) { var ( err error @@ -215,7 +215,7 @@ func RestoreListCollection( dc data.RestoreCollection, restoreContainerName string, deets *details.Builder, - errs *fault.Errors, + errs *fault.Bus, ) (support.CollectionMetrics, error) { ctx, end := D.Span(ctx, "gc:sharepoint:restoreListCollection", D.Label("path", dc.FullPath())) defer end() @@ -225,13 +225,13 @@ func RestoreListCollection( directory = dc.FullPath() siteID = directory.ResourceOwner() items = dc.Items(ctx, errs) - et = errs.Tracker() + el = errs.Local() ) trace.Log(ctx, "gc:sharepoint:restoreListCollection", directory.String()) for { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -252,7 +252,7 @@ func RestoreListCollection( siteID, restoreContainerName) if err != nil { - et.Add(err) + el.AddRecoverable(err) continue } @@ -260,7 +260,7 @@ func RestoreListCollection( itemPath, err := dc.FullPath().Append(itemData.UUID(), true) if err != nil { - et.Add(clues.Wrap(err, "appending item to full path").WithClues(ctx)) + el.AddRecoverable(clues.Wrap(err, "appending item to full path").WithClues(ctx)) continue } @@ -276,7 +276,7 @@ func RestoreListCollection( } } - return metrics, et.Err() + return metrics, el.Failure() } // RestorePageCollection handles restoration of an individual site page collection. @@ -289,7 +289,7 @@ func RestorePageCollection( dc data.RestoreCollection, restoreContainerName string, deets *details.Builder, - errs *fault.Errors, + errs *fault.Bus, ) (support.CollectionMetrics, error) { var ( metrics = support.CollectionMetrics{} @@ -308,13 +308,13 @@ func RestorePageCollection( } var ( - et = errs.Tracker() + el = errs.Local() service = discover.NewBetaService(adpt) items = dc.Items(ctx, errs) ) for { - if et.Err() != nil { + if el.Failure() != nil { break } @@ -335,7 +335,7 @@ func RestorePageCollection( siteID, restoreContainerName) if err != nil { - et.Add(err) + el.AddRecoverable(err) continue } @@ -343,7 +343,7 @@ func RestorePageCollection( itemPath, err := dc.FullPath().Append(itemData.UUID(), true) if err != nil { - et.Add(clues.Wrap(err, "appending item to full path").WithClues(ctx)) + el.AddRecoverable(clues.Wrap(err, "appending item to full path").WithClues(ctx)) continue } @@ -359,5 +359,5 @@ func RestorePageCollection( } } - return metrics, et.Err() + return metrics, el.Failure() } diff --git a/src/internal/data/data_collection.go b/src/internal/data/data_collection.go index f32f17135..ff11d4e1a 100644 --- a/src/internal/data/data_collection.go +++ b/src/internal/data/data_collection.go @@ -33,7 +33,7 @@ type Collection interface { // Each returned struct contains the next item in the collection // The channel is closed when there are no more items in the collection or if // an unrecoverable error caused an early termination in the sender. - Items(ctx context.Context, errs *fault.Errors) <-chan Stream + Items(ctx context.Context, errs *fault.Bus) <-chan Stream // FullPath returns a path struct that acts as a metadata tag for this // Collection. FullPath() path.Path diff --git a/src/internal/kopia/data_collection.go b/src/internal/kopia/data_collection.go index d01eb10e5..9c2ebf5c7 100644 --- a/src/internal/kopia/data_collection.go +++ b/src/internal/kopia/data_collection.go @@ -26,7 +26,7 @@ type kopiaDataCollection struct { func (kdc *kopiaDataCollection) Items( ctx context.Context, - _ *fault.Errors, // unused, just matching the interface + _ *fault.Bus, // unused, just matching the interface ) <-chan data.Stream { res := make(chan data.Stream) diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index 93ed6c503..d2537aad2 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -139,7 +139,7 @@ type corsoProgress struct { toMerge map[string]PrevRefs mu sync.RWMutex totalBytes int64 - errs *fault.Errors + errs *fault.Bus } // Kopia interface function used as a callback when kopia finishes processing a @@ -169,7 +169,7 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) { // never had to materialize their details in-memory. if d.info == nil { if d.prevPath == nil { - cp.errs.Add(clues.New("item sourced from previous backup with no previous path"). + cp.errs.AddRecoverable(clues.New("item sourced from previous backup with no previous path"). With( "service", d.repoPath.Service().String(), "category", d.repoPath.Category().String(), @@ -263,7 +263,7 @@ func (cp *corsoProgress) CachedFile(fname string, size int64) { func (cp *corsoProgress) Error(relpath string, err error, isIgnored bool) { defer cp.UploadProgress.Error(relpath, err, isIgnored) - cp.errs.Add(clues.Wrap(err, "kopia reported error"). + cp.errs.AddRecoverable(clues.Wrap(err, "kopia reported error"). With("is_ignored", isIgnored, "relative_path", relpath). Label(fault.LabelForceNoBackupCreation)) } @@ -335,7 +335,7 @@ func collectionEntries( itemPath, err := streamedEnts.FullPath().Append(e.UUID(), true) if err != nil { err = errors.Wrap(err, "getting full item path") - progress.errs.Add(err) + progress.errs.AddRecoverable(err) logger.Ctx(ctx).With("err", err).Errorw("getting full item path", clues.InErr(err).Slice()...) diff --git a/src/internal/kopia/upload_test.go b/src/internal/kopia/upload_test.go index 0b33e8db6..e2a769908 100644 --- a/src/internal/kopia/upload_test.go +++ b/src/internal/kopia/upload_test.go @@ -518,7 +518,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileCachedNoPrevPathErrors() { assert.Empty(t, cp.pending) assert.Empty(t, bd.Details().Entries) - assert.Error(t, cp.errs.Err()) + assert.Error(t, cp.errs.Failure()) } func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchyNewItem() { diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 40871cd42..ba4d5f267 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -134,7 +134,7 @@ func (w Wrapper) BackupCollections( globalExcludeSet map[string]struct{}, tags map[string]string, buildTreeWithBase bool, - errs *fault.Errors, + errs *fault.Bus, ) (*BackupStats, *details.Builder, map[string]PrevRefs, error) { if w.c == nil { return nil, nil, nil, clues.Stack(errNotConnected).WithClues(ctx) @@ -168,8 +168,7 @@ func (w Wrapper) BackupCollections( base, collections, globalExcludeSet, - progress, - ) + progress) if err != nil { return nil, nil, nil, errors.Wrap(err, "building kopia directories") } @@ -184,7 +183,7 @@ func (w Wrapper) BackupCollections( return nil, nil, nil, err } - return s, progress.deets, progress.toMerge, progress.errs.Err() + return s, progress.deets, progress.toMerge, progress.errs.Failure() } func (w Wrapper) makeSnapshotWithRoot( @@ -383,7 +382,7 @@ func (w Wrapper) RestoreMultipleItems( snapshotID string, paths []path.Path, bcounter ByteCounter, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.RestoreCollection, error) { ctx, end := D.Span(ctx, "kopia:restoreMultipleItems") defer end() @@ -400,23 +399,23 @@ func (w Wrapper) RestoreMultipleItems( var ( // Maps short ID of parent path to data collection for that folder. cols = map[string]*kopiaDataCollection{} - et = errs.Tracker() + el = errs.Local() ) for _, itemPath := range paths { - if et.Err() != nil { - return nil, et.Err() + if el.Failure() != nil { + return nil, el.Failure() } ds, err := getItemStream(ctx, itemPath, snapshotRoot, bcounter) if err != nil { - et.Add(clues.Stack(err).Label(fault.LabelForceNoBackupCreation)) + el.AddRecoverable(clues.Stack(err).Label(fault.LabelForceNoBackupCreation)) continue } parentPath, err := itemPath.Dir() if err != nil { - et.Add(clues.Wrap(err, "making directory collection"). + el.AddRecoverable(clues.Wrap(err, "making directory collection"). WithClues(ctx). Label(fault.LabelForceNoBackupCreation)) @@ -443,7 +442,7 @@ func (w Wrapper) RestoreMultipleItems( res = append(res, c) } - return res, et.Err() + return res, el.Failure() } // DeleteSnapshot removes the provided manifest from kopia. diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index baa7a5d9b..8d57c6db0 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -394,7 +394,7 @@ type mockBackupCollection struct { streams []data.Stream } -func (c *mockBackupCollection) Items(context.Context, *fault.Errors) <-chan data.Stream { +func (c *mockBackupCollection) Items(context.Context, *fault.Bus) <-chan data.Stream { res := make(chan data.Stream) go func() { diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index b990c3d4c..bb08dff79 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -100,7 +100,7 @@ type backupStats struct { } type detailsWriter interface { - WriteBackupDetails(context.Context, *details.Details, *fault.Errors) (string, error) + WriteBackupDetails(context.Context, *details.Details, *fault.Bus) (string, error) } // --------------------------------------------------------------------------- @@ -166,12 +166,12 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { With("err", err). Errorw("doing backup", clues.InErr(err).Slice()...) op.Errors.Fail(errors.Wrap(err, "doing backup")) - opStats.readErr = op.Errors.Err() + opStats.readErr = op.Errors.Failure() } // TODO: the consumer (sdk or cli) should run this, not operations. - recoverableCount := len(op.Errors.Errs()) - for i, err := range op.Errors.Errs() { + recoverableCount := len(op.Errors.Recovered()) + for i, err := range op.Errors.Recovered() { logger.Ctx(ctx). With("error", err). With(clues.InErr(err).Slice()...). @@ -185,14 +185,14 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { err = op.persistResults(startTime, &opStats) if err != nil { op.Errors.Fail(errors.Wrap(err, "persisting backup results")) - opStats.writeErr = op.Errors.Err() + opStats.writeErr = op.Errors.Failure() - return op.Errors.Err() + return op.Errors.Failure() } // force exit without backup in certain cases. // see: https://github.com/alcionai/corso/pull/2510#discussion_r1113532530 - for _, e := range op.Errors.Errs() { + for _, e := range op.Errors.Recovered() { if clues.HasLabel(e, fault.LabelForceNoBackupCreation) { logger.Ctx(ctx). With("error", e). @@ -200,7 +200,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { Infow("completed backup; conditional error forcing exit without model persistence", "results", op.Results) - return op.Errors.Fail(errors.Wrap(e, "forced backup")).Err() + return op.Errors.Fail(errors.Wrap(e, "forced backup")).Failure() } } @@ -212,9 +212,9 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { deets.Details()) if err != nil { op.Errors.Fail(errors.Wrap(err, "persisting backup")) - opStats.writeErr = op.Errors.Err() + opStats.writeErr = op.Errors.Failure() - return op.Errors.Err() + return op.Errors.Failure() } logger.Ctx(ctx).Infow("completed backup", "results", op.Results) @@ -322,7 +322,7 @@ func produceBackupDataCollections( sel selectors.Selector, metadata []data.RestoreCollection, ctrlOpts control.Options, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.BackupCollection, map[string]struct{}, error) { complete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Discovering items to backup")) defer func() { @@ -346,7 +346,7 @@ type backuper interface { excluded map[string]struct{}, tags map[string]string, buildTreeWithBase bool, - errs *fault.Errors, + errs *fault.Bus, ) (*kopia.BackupStats, *details.Builder, map[string]kopia.PrevRefs, error) } @@ -405,7 +405,7 @@ func consumeBackupDataCollections( excludes map[string]struct{}, backupID model.StableID, isIncremental bool, - errs *fault.Errors, + errs *fault.Bus, ) (*kopia.BackupStats, *details.Builder, map[string]kopia.PrevRefs, error) { complete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Backing up data")) defer func() { @@ -514,7 +514,7 @@ func mergeDetails( mans []*kopia.ManifestEntry, shortRefsFromPrevBackup map[string]kopia.PrevRefs, deets *details.Builder, - errs *fault.Errors, + errs *fault.Bus, ) error { // Don't bother loading any of the base details if there's nothing we need to // merge. diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index 021a377a3..d31a7d2e2 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -154,8 +154,8 @@ func runAndCheckBackup( assert.Less(t, int64(0), bo.Results.BytesRead, "bytes read") assert.Less(t, int64(0), bo.Results.BytesUploaded, "bytes uploaded") assert.Equal(t, 1, bo.Results.ResourceOwners, "count of resource owners") - assert.NoError(t, bo.Errors.Err(), "incremental non-recoverable error") - assert.Empty(t, bo.Errors.Errs(), "incremental recoverable/iteration errors") + assert.NoError(t, bo.Errors.Failure(), "incremental non-recoverable error") + assert.Empty(t, bo.Errors.Recovered(), "incremental recoverable/iteration errors") assert.NoError(t, bo.Results.ReadErrors, "errors reading data") assert.NoError(t, bo.Results.WriteErrors, "errors writing data") assert.Equal(t, 1, mb.TimesCalled[events.BackupStart], "backup-start events") @@ -626,8 +626,8 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() { assert.Greater(t, bo.Results.BytesRead, incBO.Results.BytesRead, "incremental bytes read") assert.Greater(t, bo.Results.BytesUploaded, incBO.Results.BytesUploaded, "incremental bytes uploaded") assert.Equal(t, bo.Results.ResourceOwners, incBO.Results.ResourceOwners, "incremental backup resource owner") - assert.NoError(t, incBO.Errors.Err(), "incremental non-recoverable error") - assert.Empty(t, incBO.Errors.Errs(), "count incremental recoverable/iteration errors") + assert.NoError(t, incBO.Errors.Failure(), "incremental non-recoverable error") + assert.Empty(t, incBO.Errors.Recovered(), "count incremental recoverable/iteration errors") assert.NoError(t, incBO.Results.ReadErrors, "incremental read errors") assert.NoError(t, incBO.Results.WriteErrors, "incremental write errors") assert.Equal(t, 1, incMB.TimesCalled[events.BackupStart], "incremental backup-start events") @@ -1057,8 +1057,8 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchangeIncrementals() { // +4 on read/writes to account for metadata: 1 delta and 1 path for each type. assert.Equal(t, test.itemsWritten+4, incBO.Results.ItemsWritten, "incremental items written") assert.Equal(t, test.itemsRead+4, incBO.Results.ItemsRead, "incremental items read") - assert.NoError(t, incBO.Errors.Err(), "incremental non-recoverable error") - assert.Empty(t, incBO.Errors.Errs(), "incremental recoverable/iteration errors") + assert.NoError(t, incBO.Errors.Failure(), "incremental non-recoverable error") + assert.Empty(t, incBO.Errors.Recovered(), "incremental recoverable/iteration errors") assert.NoError(t, incBO.Results.ReadErrors, "incremental read errors") assert.NoError(t, incBO.Results.WriteErrors, "incremental write errors") assert.Equal(t, 1, incMB.TimesCalled[events.BackupStart], "incremental backup-start events") diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index 7ccdeaeb0..6e4fc8ad5 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -63,7 +63,7 @@ func (mr *mockRestorer) RestoreMultipleItems( snapshotID string, paths []path.Path, bc kopia.ByteCounter, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.RestoreCollection, error) { mr.gotPaths = append(mr.gotPaths, paths...) @@ -99,7 +99,7 @@ func (mbu mockBackuper) BackupCollections( excluded map[string]struct{}, tags map[string]string, buildTreeWithBase bool, - errs *fault.Errors, + errs *fault.Bus, ) (*kopia.BackupStats, *details.Builder, map[string]kopia.PrevRefs, error) { if mbu.checkFunc != nil { mbu.checkFunc(bases, cs, tags, buildTreeWithBase) @@ -117,7 +117,7 @@ type mockDetailsReader struct { func (mdr mockDetailsReader) ReadBackupDetails( ctx context.Context, detailsID string, - errs *fault.Errors, + errs *fault.Bus, ) (*details.Details, error) { r := mdr.entries[detailsID] diff --git a/src/internal/operations/common.go b/src/internal/operations/common.go index 18266a734..4b9caeb54 100644 --- a/src/internal/operations/common.go +++ b/src/internal/operations/common.go @@ -13,7 +13,7 @@ import ( ) type detailsReader interface { - ReadBackupDetails(ctx context.Context, detailsID string, errs *fault.Errors) (*details.Details, error) + ReadBackupDetails(ctx context.Context, detailsID string, errs *fault.Bus) (*details.Details, error) } func getBackupAndDetailsFromID( @@ -21,7 +21,7 @@ func getBackupAndDetailsFromID( backupID model.StableID, ms *store.Wrapper, detailsStore detailsReader, - errs *fault.Errors, + errs *fault.Bus, ) (*backup.Backup, *details.Details, error) { dID, bup, err := ms.GetDetailsIDFromBackupID(ctx, backupID) if err != nil { diff --git a/src/internal/operations/manifests.go b/src/internal/operations/manifests.go index 305443b07..e625cd09d 100644 --- a/src/internal/operations/manifests.go +++ b/src/internal/operations/manifests.go @@ -45,7 +45,7 @@ func produceManifestsAndMetadata( reasons []kopia.Reason, tenantID string, getMetadata bool, - errs *fault.Errors, + errs *fault.Bus, ) ([]*kopia.ManifestEntry, []data.RestoreCollection, bool, error) { var ( metadataFiles = graph.AllMetadataFileNames() @@ -70,7 +70,7 @@ func produceManifestsAndMetadata( // // TODO(ashmrtn): This may need updating if we start sourcing item backup // details from previous snapshots when using kopia-assisted incrementals. - if err := verifyDistinctBases(ctx, ms, errs); err != nil { + if err := verifyDistinctBases(ctx, ms); err != nil { logger.Ctx(ctx).With("error", err).Infow( "base snapshot collision, falling back to full backup", clues.In(ctx).Slice()...) @@ -135,18 +135,10 @@ func produceManifestsAndMetadata( // of manifests, that each manifest's Reason (owner, service, category) is only // included once. If a reason is duplicated by any two manifests, an error is // returned. -func verifyDistinctBases(ctx context.Context, mans []*kopia.ManifestEntry, errs *fault.Errors) error { - var ( - failed bool - reasons = map[string]manifest.ID{} - et = errs.Tracker() - ) +func verifyDistinctBases(ctx context.Context, mans []*kopia.ManifestEntry) error { + reasons := map[string]manifest.ID{} for _, man := range mans { - if et.Err() != nil { - break - } - // Incomplete snapshots are used only for kopia-assisted incrementals. The // fact that we need this check here makes it seem like this should live in // the kopia code. However, keeping it here allows for better debugging as @@ -161,24 +153,16 @@ func verifyDistinctBases(ctx context.Context, mans []*kopia.ManifestEntry, errs reasonKey := reason.ResourceOwner + reason.Service.String() + reason.Category.String() if b, ok := reasons[reasonKey]; ok { - failed = true - - et.Add(clues.New("manifests have overlapping reasons"). + return clues.New("manifests have overlapping reasons"). WithClues(ctx). - With("other_manifest_id", b)) - - continue + With("other_manifest_id", b) } reasons[reasonKey] = man.ID } } - if failed { - return clues.New("multiple base snapshots qualify").WithClues(ctx) - } - - return et.Err() + return nil } // collectMetadata retrieves all metadata files associated with the manifest. @@ -188,7 +172,7 @@ func collectMetadata( man *kopia.ManifestEntry, fileNames []string, tenantID string, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.RestoreCollection, error) { paths := []path.Path{} diff --git a/src/internal/operations/manifests_test.go b/src/internal/operations/manifests_test.go index 62878fcaa..ba0859c39 100644 --- a/src/internal/operations/manifests_test.go +++ b/src/internal/operations/manifests_test.go @@ -53,7 +53,7 @@ type mockColl struct { p path.Path } -func (mc mockColl) Items(context.Context, *fault.Errors) <-chan data.Stream { +func (mc mockColl) Items(context.Context, *fault.Bus) <-chan data.Stream { return nil } @@ -393,7 +393,7 @@ func (suite *OperationsManifestsUnitSuite) TestVerifyDistinctBases() { ctx, flush := tester.NewContext() defer flush() - err := verifyDistinctBases(ctx, test.mans, fault.New(true)) + err := verifyDistinctBases(ctx, test.mans) test.expect(suite.T(), err) }) } @@ -837,7 +837,7 @@ func (suite *BackupManifestSuite) TestBackupOperation_VerifyDistinctBases() { ctx, flush := tester.NewContext() defer flush() - test.errCheck(suite.T(), verifyDistinctBases(ctx, test.input, fault.New(true))) + test.errCheck(suite.T(), verifyDistinctBases(ctx, test.input)) }) } } diff --git a/src/internal/operations/operation.go b/src/internal/operations/operation.go index ba68a3bdd..1e32e6258 100644 --- a/src/internal/operations/operation.go +++ b/src/internal/operations/operation.go @@ -55,7 +55,7 @@ const ( type operation struct { CreatedAt time.Time `json:"createdAt"` - Errors *fault.Errors `json:"errors"` + Errors *fault.Bus `json:"errors"` Options control.Options `json:"options"` Status opStatus `json:"status"` @@ -100,7 +100,7 @@ func connectToM365( ctx context.Context, sel selectors.Selector, acct account.Account, - errs *fault.Errors, + errs *fault.Bus, ) (*connector.GraphConnector, error) { complete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Connecting to M365")) defer func() { diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 596735bf6..16bf315a9 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -104,7 +104,7 @@ type restorer interface { snapshotID string, paths []path.Path, bc kopia.ByteCounter, - errs *fault.Errors, + errs *fault.Bus, ) ([]data.RestoreCollection, error) } @@ -153,12 +153,12 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De With("err", err). Errorw("doing restore", clues.InErr(err).Slice()...) op.Errors.Fail(errors.Wrap(err, "doing restore")) - opStats.readErr = op.Errors.Err() + opStats.readErr = op.Errors.Failure() } // TODO: the consumer (sdk or cli) should run this, not operations. - recoverableCount := len(op.Errors.Errs()) - for i, err := range op.Errors.Errs() { + recoverableCount := len(op.Errors.Recovered()) + for i, err := range op.Errors.Recovered() { logger.Ctx(ctx). With("error", err). With(clues.InErr(err).Slice()...). @@ -172,9 +172,9 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De err = op.persistResults(ctx, start, &opStats) if err != nil { op.Errors.Fail(errors.Wrap(err, "persisting restore results")) - opStats.writeErr = op.Errors.Err() + opStats.writeErr = op.Errors.Failure() - return nil, op.Errors.Err() + return nil, op.Errors.Failure() } logger.Ctx(ctx).Infow("completed restore", "results", op.Results) @@ -340,7 +340,7 @@ func formatDetailsForRestoration( ctx context.Context, sel selectors.Selector, deets *details.Details, - errs *fault.Errors, + errs *fault.Bus, ) ([]path.Path, error) { fds, err := sel.Reduce(ctx, deets, errs) if err != nil { @@ -351,17 +351,17 @@ func formatDetailsForRestoration( fdsPaths = fds.Paths() paths = make([]path.Path, len(fdsPaths)) shortRefs = make([]string, len(fdsPaths)) - et = errs.Tracker() + el = errs.Local() ) for i := range fdsPaths { - if et.Err() != nil { + if el.Failure() != nil { break } p, err := path.FromDataLayerPath(fdsPaths[i], true) if err != nil { - et.Add(clues. + el.AddRecoverable(clues. Wrap(err, "parsing details path after reduction"). WithMap(clues.In(ctx)). With("path", fdsPaths[i])) @@ -386,5 +386,5 @@ func formatDetailsForRestoration( logger.Ctx(ctx).With("short_refs", shortRefs).Infof("found %d details entries to restore", len(shortRefs)) - return paths, et.Err() + return paths, el.Failure() } diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index cb72cd34a..f11e0ec4a 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -290,7 +290,6 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() { ds, err := ro.Run(ctx) require.NoError(t, err, "restoreOp.Run()") - require.Empty(t, ro.Errors.Errs(), "restoreOp.Run() recoverable errors") require.NotEmpty(t, ro.Results, "restoreOp results") require.NotNil(t, ds, "restored details") assert.Equal(t, ro.Status, Completed, "restoreOp status") @@ -299,8 +298,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() { assert.Less(t, 0, ro.Results.ItemsWritten, "restored items written") assert.Less(t, int64(0), ro.Results.BytesRead, "bytes read") assert.Equal(t, 1, ro.Results.ResourceOwners, "resource Owners") - assert.NoError(t, ro.Errors.Err(), "non-recoverable error") - assert.Empty(t, ro.Errors.Errs(), "recoverable errors") + assert.NoError(t, ro.Errors.Failure(), "non-recoverable error") + assert.Empty(t, ro.Errors.Recovered(), "recoverable errors") assert.NoError(t, ro.Results.ReadErrors, "errors while reading restore data") assert.NoError(t, ro.Results.WriteErrors, "errors while writing restore data") assert.Equal(t, suite.numItems, ro.Results.ItemsWritten, "backup and restore wrote the same num of items") diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index e5cd2c1ce..a9c7beeb5 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -47,7 +47,7 @@ const ( func (ss *streamStore) WriteBackupDetails( ctx context.Context, backupDetails *details.Details, - errs *fault.Errors, + errs *fault.Bus, ) (string, error) { // construct the path of the container for the `details` item p, err := path.Builder{}. @@ -95,7 +95,7 @@ func (ss *streamStore) WriteBackupDetails( func (ss *streamStore) ReadBackupDetails( ctx context.Context, detailsID string, - errs *fault.Errors, + errs *fault.Bus, ) (*details.Details, error) { // construct the path for the `details` item detailsPath, err := path.Builder{}. @@ -195,7 +195,7 @@ func (dc *streamCollection) DoNotMergeItems() bool { // Items() always returns a channel with a single data.Stream // representing the object to be persisted -func (dc *streamCollection) Items(context.Context, *fault.Errors) <-chan data.Stream { +func (dc *streamCollection) Items(context.Context, *fault.Bus) <-chan data.Stream { items := make(chan data.Stream, 1) defer close(items) items <- dc.item diff --git a/src/pkg/backup/backup.go b/src/pkg/backup/backup.go index cbf15bb6a..ae61a8e24 100644 --- a/src/pkg/backup/backup.go +++ b/src/pkg/backup/backup.go @@ -37,8 +37,8 @@ type Backup struct { Version int `json:"version"` // Errors contains all errors aggregated during a backup operation. - Errors fault.ErrorsData `json:"errors"` - ErrorCount int `json:"errorCount"` + Errors fault.Errors `json:"errors"` + ErrorCount int `json:"errorCount"` // stats are embedded so that the values appear as top-level properties stats.Errs // Deprecated, replaced with Errors. @@ -55,12 +55,12 @@ func New( selector selectors.Selector, rw stats.ReadWrites, se stats.StartAndEndTime, - errs *fault.Errors, + errs *fault.Bus, ) *Backup { - errData := errs.Data() + errData := errs.Errors() - errCount := len(errs.Data().Errs) - if errData.Err != nil { + errCount := len(errs.Errors().Recovered) + if errData.Failure != nil { errCount++ } @@ -169,12 +169,12 @@ func (b Backup) countErrors() int { } // future tracking - if b.Errors.Err != nil || len(b.Errors.Errs) > 0 { - if b.Errors.Err != nil { + if b.Errors.Failure != nil || len(b.Errors.Recovered) > 0 { + if b.Errors.Failure != nil { errCount++ } - errCount += len(b.Errors.Errs) + errCount += len(b.Errors.Recovered) } return errCount diff --git a/src/pkg/backup/backup_test.go b/src/pkg/backup/backup_test.go index 6fb4536d4..4b85922a1 100644 --- a/src/pkg/backup/backup_test.go +++ b/src/pkg/backup/backup_test.go @@ -41,8 +41,8 @@ func stubBackup(t time.Time) backup.Backup { DetailsID: "details", Status: "status", Selector: sel.Selector, - Errors: fault.ErrorsData{ - Errs: []error{errors.New("read"), errors.New("write")}, + Errors: fault.Errors{ + Recovered: []error{errors.New("read"), errors.New("write")}, }, Errs: stats.Errs{ ReadErrors: errors.New("1"), diff --git a/src/pkg/fault/example_fault_test.go b/src/pkg/fault/example_fault_test.go index 904f27c63..67ddcc4b4 100644 --- a/src/pkg/fault/example_fault_test.go +++ b/src/pkg/fault/example_fault_test.go @@ -21,18 +21,18 @@ type mockController struct { errors any } -func connectClient() error { return nil } -func dependencyCall() error { return nil } -func getIthItem(i int) error { return nil } -func getData() ([]string, error) { return nil, nil } -func storeData([]string, *fault.Errors) {} +func connectClient() error { return nil } +func dependencyCall() error { return nil } +func getIthItem(i int) error { return nil } +func getData() ([]string, error) { return nil, nil } +func storeData([]string, *fault.Bus) {} type mockOper struct { - Errors *fault.Errors + Errors *fault.Bus } -func newOperation() mockOper { return mockOper{fault.New(true)} } -func (m mockOper) Run() *fault.Errors { return m.Errors } +func newOperation() mockOper { return mockOper{fault.New(true)} } +func (m mockOper) Run() *fault.Bus { return m.Errors } type mockDepenedency struct{} @@ -47,44 +47,40 @@ var dependency = mockDepenedency{} // --------------------------------------------------------------------------- // ExampleNew highlights assumptions and best practices -// for generating fault.Errors structs. +// for generating fault.Bus structs. func ExampleNew() { - // fault.Errors should only be generated during the construction of - // another controller, such as a new Backup or Restore Operations. - // Configurations like failFast are set during construction. - // - // Generating new fault.Errors structs outside of an operation - // controller is a smell, and should be avoided. If you need - // to aggregate errors, you should accept an interface and pass - // an fault.Errors instance into it. + // New fault.Bus instances should only get generated during initialization. + // Such as when starting up a new Backup or Restore Operation. + // Configuration (eg: failFast) is set during construction and cannot + // be updated. ctrl = mockController{ errors: fault.New(false), } } -// ExampleErrors_Fail describes the assumptions and best practices +// ExampleBus_Fail describes the assumptions and best practices // for setting the Failure error. -func ExampleErrors_Fail() { +func ExampleBus_Fail() { errs := fault.New(false) // Fail() is used to record non-recoverable errors. // // Fail() should only get called in the last step before returning - // a fault.Errors from a controller. In all other cases, you - // should simply return an error and expect the upstream controller - // to call Fail() for you. - topLevelHandler := func(errs *fault.Errors) *fault.Errors { + // a fault.Bus from a controller. In all other cases, you + // can stick to standard golang error handling and expect some upstream + // controller to call Fail() for you (if necessary). + topLevelHandler := func(errs *fault.Bus) *fault.Bus { if err := connectClient(); err != nil { return errs.Fail(err) } return errs } - if errs := topLevelHandler(errs); errs.Err() != nil { - fmt.Println(errs.Err()) + if errs := topLevelHandler(errs); errs.Failure() != nil { + fmt.Println(errs.Failure()) } - // Only the topmost func in the stack should set the Fail() err. + // Only the top-most func in the stack should set the failure. // IE: Fail() is not Wrap(). In lower levels, errors should get // wrapped and returned like normal, and only handled by fault // at the end. @@ -102,22 +98,30 @@ func ExampleErrors_Fail() { } } -// ExampleErrors_Add describes the assumptions and best practices +// ExampleBus_AddRecoverable describes the assumptions and best practices // for aggregating iterable or recoverable errors. -func ExampleErrors_Add() { +func ExampleBus_AddRecoverable() { errs := fault.New(false) - // Add() is used to record any recoverable error. + // AddRecoverable() is used to record any recoverable error. // - // Add() should only get called as the last error handling step - // within a loop or stream. In all other cases, you can return - // an error like normal and expect the upstream point of iteration - // to call Add() for you. + // What counts as a recoverable error? That's up to the given + // implementation. Normally, it's an inability to process one + // of many items within an iteration (ex: couldn't download 1 of + // 1000 emails). But just because an error occurred during a loop + // doesn't mean it's recoverable, ex: a failure to retrieve the next + // page when accumulating a batch of resources isn't usually + // recoverable. The choice is always up to the function at hand. + // + // AddRecoverable() should only get called as the top-most location + // of error handling within the recoverable process. Child functions + // should stick to normal golang error handling and expect the upstream + // controller to call AddRecoverable() for you. for i := range items { clientBasedGetter := func(i int) error { if err := getIthItem(i); err != nil { - // lower level calls don't Add to the fault.Errors. - // they handl errors like normal. + // lower level calls don't AddRecoverable to the fault.Bus. + // they stick to normal golang error handling. return errors.Wrap(err, "dependency") } @@ -126,154 +130,158 @@ func ExampleErrors_Add() { if err := clientBasedGetter(i); err != nil { // Here at the top of the loop is the correct place - // to Add an error using fault. - errs.Add(err) + // to aggregate the error using fault. + // Side note: technically, you should use a local bus + // here (see below) instead of errs. + errs.AddRecoverable(err) } } - // Iteration should exit anytime the primary error in fault is - // non-nil. fault.Errors does not expose the failFast flag - // directly. Instead, errors from Add() will automatically - // promote to the Err() value. Therefore, loops only ned to - // check the errs.Err(). If it is non-nil, then the loop should break. + // Iteration should exit anytime the fault failure is non-nil. + // fault.Bus does not expose the failFast flag directly. Instead, + // when failFast is true, errors from AddRecoverable() automatically + // promote to the Failure() spot. Recoverable handling only needs to + // check the errs.Failure(). If it is non-nil, then the loop should break. for i := range items { - if errs.Err() != nil { - // if failFast == true errs.Add() was called, + if errs.Failure() != nil { + // if failFast == true errs.AddRecoverable() was called, // we'll catch the error here. break } if err := getIthItem(i); err != nil { - errs.Add(err) + errs.AddRecoverable(err) } } } -// ExampleErrors_Err describes retrieving the non-recoverable error. -func ExampleErrors_Err() { +// ExampleBus_Failure describes retrieving the non-recoverable error. +func ExampleBus_Failure() { errs := fault.New(false) errs.Fail(errors.New("catastrophe")) - // Err() returns the primary failure. - err := errs.Err() + // Failure() returns the primary failure. + err := errs.Failure() fmt.Println(err) // if multiple Failures occur, each one after the first gets - // added to the Errs slice. + // added to the Recoverable slice as an overflow measure. errs.Fail(errors.New("another catastrophe")) - errSl := errs.Errs() + errSl := errs.Recovered() for _, e := range errSl { fmt.Println(e) } - // If Err() is nil, then you can assume the operation completed. + // If Failure() is nil, then you can assume the operation completed. // A complete operation is not necessarily an error-free operation. - // Recoverable errors may still have been added using Add(err). - // - // Even if Err() is nil, Errs() can be non-empty. + // Recoverable errors may still have been added using AddRecoverable(err). // Make sure you check both. - errs = fault.New(true) - - // If failFast is set to true, then the first error Add()ed gets + // If failFast is set to true, then the first recoerable error Added gets // promoted to the Err() position. - - errs.Add(errors.New("not catastrophic, but still becomes the Err()")) - err = errs.Err() + errs = fault.New(true) + errs.AddRecoverable(errors.New("not catastrophic, but still becomes the Failure()")) + err = errs.Failure() fmt.Println(err) // Output: catastrophe // another catastrophe - // not catastrophic, but still becomes the Err() + // not catastrophic, but still becomes the Failure() } -// ExampleErrors_Errs describes retrieving individual errors. -func ExampleErrors_Errs() { +// ExampleBus_Recovered describes the errors that processing was able to +// recover from and continue. +func ExampleErrors_Recovered() { errs := fault.New(false) - errs.Add(errors.New("not catastrophic")) - errs.Add(errors.New("something unwanted")) + errs.AddRecoverable(errors.New("not catastrophic")) + errs.AddRecoverable(errors.New("something unwanted")) - // Errs() gets the slice of all recoverable errors Add()ed during - // the run, but which did not force the process to exit. + // Recovered() gets the slice of all recoverable errors added during + // the run, but which did not cause a failure. // - // Errs() only needs to be investigated by the end user at the - // conclusion of an operation. Checking Errs() within lower- - // layer code is a smell. Funcs should return a standard error, - // or errs.Err(), if they need upstream handlers to handle the errors. - errSl := errs.Errs() + // Recovered() should never be investigated during lower level processing. + // Implementation only ever needs to check Failure(). If an error didn't + // promote to the Failure slot, then it should be ignored. + // + // The end user, at the conclusion of an operation, is the intended recipient + // of the Recovered error slice. After returning to the interface layer + // (the CLI or SDK), it's the job of the end user at that location to + // iterate through those errors and record them as wanted. + errSl := errs.Recovered() for _, err := range errSl { fmt.Println(err) } - // One or more errors in errs.Errs() does not necessarily mean the - // process failed. You can have non-zero Errs() but a nil Err(). - if errs.Err() == nil { - fmt.Println("Err() is nil") + // One or more errors in errs.Recovered() does not necessarily mean the + // process failed. You can have non-zero Recovered() but a nil Failure(). + if errs.Failure() == nil { + fmt.Println("Failure() is nil") } - // If Errs() is nil, then you can assume that no recoverable or - // iteration-based errors occurred. But that does not necessarily + // Inversely, if Recovered() is nil, then you can assume that no recoverable + // or iteration-based errors occurred. But that does not necessarily // mean the operation was able to complete. // - // Even if Errs() contains zero items, Err() can be non-nil. + // Even if Recovered() contains zero items, Err() can be non-nil. // Make sure you check both. // Output: not catastrophic // something unwanted - // Err() is nil + // Failure() is nil } -func ExampleErrors_Tracker() { +func ExampleBus_Local() { // It is common for Corso to run operations in parallel, // and for iterations to be nested within iterations. To // avoid mistakenly returning an error that was sourced // from some other async iteration, recoverable instances - // are aggrgated into a Tracker. + // are aggrgated into a Local. errs := fault.New(false) - trkr := errs.Tracker() + el := errs.Local() err := func() error { for i := range items { - if trkr.Err() != nil { + if el.Failure() != nil { break } if err := getIthItem(i); err != nil { - // instead of calling errs.Add(err), we call the - // trackers Add method. The error will still get - // added to the errs.Errs() set. But if this err - // causes the run to fail, only this tracker treats + // instead of calling errs.AddRecoverable(err), we call the + // local bus's Add method. The error will still get + // added to the errs.Recovered() set. But if this err + // causes the run to fail, only this local bus treats // it as the causal failure. - trkr.Add(err) + el.AddRecoverable(err) } } - return trkr.Err() + return el.Failure() }() if err != nil { - // handle the Err() that appeared in the tracker - fmt.Println("err occurred", errs.Err()) + // handle the Failure() that appeared in the local bus. + fmt.Println("failure occurred", errs.Failure()) } } -// ExampleErrorsE2e showcases a more complex integration. -func Example_errors_e2e() { +// ExampleE2e showcases a more complex integration. +func Example_e2e() { oper := newOperation() // imagine that we're a user, calling into corso SDK. // (fake funcs used here to minimize example bloat) // // The operation is our controller, we expect it to - // generate a new fault.Errors when constructed, and + // generate a new fault.Bus when constructed, and // to return that struct when we call Run() errs := oper.Run() // Let's investigate what went on inside. Since we're at - // the top of our controller, and returning a fault.Errors, + // the top of our controller, and returning a fault.Bus, // all the error handlers set the Fail() case. + /* Run() */ - func() *fault.Errors { + func() *fault.Bus { if err := connectClient(); err != nil { // Fail() here; we're top level in the controller // and this is a non-recoverable issue @@ -297,12 +305,13 @@ func Example_errors_e2e() { // What about the lower level handling? storeData didn't // return an error, so what's happening there? + /* storeData */ - func(data []any, errs *fault.Errors) { + err := func(data []any, errs *fault.Bus) error { // this is downstream in our code somewhere storer := func(a any) error { if err := dependencyCall(); err != nil { - // we're not passing in or calling fault.Errors here, + // we're not passing in or calling fault.Bus here, // because this isn't the iteration handler, it's just // a regular error. return errors.Wrap(err, "dependency") @@ -311,36 +320,48 @@ func Example_errors_e2e() { return nil } + el := errs.Local() + for _, d := range data { - if errs.Err() != nil { + if el.Failure() != nil { break } if err := storer(d); err != nil { // Since we're at the top of the iteration, we need - // to add each error to the fault.Errors struct. - errs.Add(err) + // to add each error to the fault.localBus struct. + el.AddRecoverable(err) } } - }(nil, nil) - // then at the end of the oper.Run, we investigate the results. - if errs.Err() != nil { - // handle the primary error - fmt.Println("err occurred", errs.Err()) + // at the end of the func, we need to return local.Failure() + // just in case the local bus promoted an error to the failure + // position. If we don't return it like normal error handling, + // then we'll lose scope of that error. + return el.Failure() + }(nil, nil) + if err != nil { + fmt.Println("errored", err) } - for _, err := range errs.Errs() { + // At the end of the oper.Run, when returning to the interface + // layer, we investigate the results. + if errs.Failure() != nil { + // handle the primary error + fmt.Println("err occurred", errs.Failure()) + } + + for _, err := range errs.Recovered() { // handle each recoverable error fmt.Println("recoverable err occurred", err) } } -// ExampleErrors_Err_return showcases when to return err or nil vs errs.Err() -func ExampleErrors_Err_return() { - // The general rule of thumb is to always handle the error directly - // by returning err, or nil, or any variety of extension (wrap, - // stack, clues, etc). +// ExampleBus_Failure_return showcases when to return an error or +// nil vs errs.Failure() vs *fault.Bus +func ExampleErrors_Failure_return() { + // The general rule of thumb is stick to standard golang error + // handling whenever possible. fn := func() error { if err := dependency.do(); err != nil { return errors.Wrap(err, "direct") @@ -352,26 +373,47 @@ func ExampleErrors_Err_return() { fmt.Println(err) } - // The exception is if you're handling recoverable errors. Those - // funcs should always return errs.Err(), in case a recoverable - // error happened on the last round of iteration. - fn2 := func(todo []string, errs *fault.Errors) error { + // The first exception is if you're handling recoverable errors. Recoverable + // error handling should create a local bus instance, and return localBus.Failure() + // so that the immediate upstream caller can be made aware of the current failure. + fn2 := func(todo []string, errs *fault.Bus) error { for range todo { - if errs.Err() != nil { - return errs.Err() + if errs.Failure() != nil { + return errs.Failure() } if err := dependency.do(); err != nil { - errs.Add(errors.Wrap(err, "recoverable")) + errs.AddRecoverable(errors.Wrap(err, "recoverable")) } } - return errs.Err() + return errs.Failure() } if err := fn2([]string{"a"}, fault.New(true)); err != nil { fmt.Println(err) } + // The second exception is if you're returning at the interface layer. + // In that case, you're expected to return the fault.Bus itself, so that + // callers can review the fault data. + operationFn := func(errs *fault.Bus) *fault.Bus { + if _, err := getData(); err != nil { + return errs.Fail(err) + } + + return errs + } + + fbus := operationFn(fault.New(true)) + + if fbus.Failure() != nil { + fmt.Println("failure", fbus.Failure()) + } + + for _, err := range fbus.Recovered() { + fmt.Println("recovered", err) + } + // Output: direct: caught one // recoverable: caught one } diff --git a/src/pkg/fault/fault.go b/src/pkg/fault/fault.go index b13f6916d..52ad235b8 100644 --- a/src/pkg/fault/fault.go +++ b/src/pkg/fault/fault.go @@ -6,21 +6,21 @@ import ( "golang.org/x/exp/slices" ) -type Errors struct { +type Bus struct { mu *sync.Mutex - // err identifies non-recoverable errors. This includes - // non-start cases (ex: cannot connect to client), hard- - // stop issues (ex: credentials expired) or conscious exit - // cases (ex: iteration error + failFast config). - err error + // Failure probably identifies errors that were added to the bus + // or localBus via AddRecoverable, but which were promoted + // to the failure position due to failFast=true configuration. + // Alternatively, the process controller might have set failure + // by calling Fail(err). + failure error - // errs is the accumulation of recoverable or iterated - // errors. Eg: if a process is retrieving N items, and - // 1 of the items fails to be retrieved, but the rest of - // them succeed, we'd expect to see 1 error added to this - // slice. - errs []error + // recoverable is the accumulation of recoverable errors. + // Eg: if a process is retrieving N items, and 1 of the + // items fails to be retrieved, but the rest of them succeed, + // we'd expect to see 1 error added to this slice. + recoverable []error // if failFast is true, the first errs addition will // get promoted to the err value. This signifies a @@ -29,51 +29,71 @@ type Errors struct { failFast bool } -// ErrorsData provides the errors data alone, without sync +// Errors provides the errors data alone, without sync // controls, allowing the data to be persisted. -type ErrorsData struct { - Err error `json:"-"` - Errs []error `json:"-"` - FailFast bool `json:"failFast"` +type Errors struct { + // Failure identifies a non-recoverable error. This includes + // non-start cases (ex: cannot connect to client), hard- + // stop issues (ex: credentials expired) or conscious exit + // cases (ex: iteration error + failFast config). + Failure error `json:"failure"` + + // Recovered errors accumulate through a runtime under + // best-effort processing conditions. They imply that an + // error occurred, but the process was able to move on and + // complete afterwards. + // Eg: if a process is retrieving N items, and 1 of the + // items fails to be retrieved, but the rest of them succeed, + // we'd expect to see 1 error added to this slice. + Recovered []error `json:"-"` + + // If FailFast is true, then the first Recoverable error will + // promote to the Failure spot, causing processing to exit. + FailFast bool `json:"failFast"` } // New constructs a new error with default values in place. -func New(failFast bool) *Errors { - return &Errors{ - mu: &sync.Mutex{}, - errs: []error{}, - failFast: failFast, +func New(failFast bool) *Bus { + return &Bus{ + mu: &sync.Mutex{}, + recoverable: []error{}, + failFast: failFast, } } -// Err returns the primary error. If not nil, this +// Failure returns the primary error. If not nil, this // indicates the operation exited prior to completion. -func (e *Errors) Err() error { - return e.err +func (e *Bus) Failure() error { + return e.failure } -// Errs returns the slice of recoverable and -// iterated errors. -func (e *Errors) Errs() []error { - return e.errs +// Recovered returns the slice of errors that occurred in +// recoverable points of processing. This is often during +// iteration where a single failure (ex: retrieving an item), +// doesn't require the entire process to end. +func (e *Bus) Recovered() []error { + return e.recoverable } -// Data returns the plain set of error data -// without any sync properties. -func (e *Errors) Data() ErrorsData { - return ErrorsData{ - Err: e.err, - Errs: slices.Clone(e.errs), - FailFast: e.failFast, +// Errors returns the plain record of errors that were aggregated +// within a fult Bus. +func (e *Bus) Errors() Errors { + return Errors{ + Failure: e.failure, + Recovered: slices.Clone(e.recoverable), + FailFast: e.failFast, } } -// TODO: introduce Failer interface - -// Fail sets the non-recoverable error (ie: errors.err) -// in the errors struct. If a non-recoverable error is -// already present, the error gets added to the errs slice. -func (e *Errors) Fail(err error) *Errors { +// Fail sets the non-recoverable error (ie: bus.failure) +// in the bus. If a failure error is already present, +// the error gets added to the recoverable slice for +// purposes of tracking. +// +// TODO: Return Data, not Bus. The consumers of a failure +// should care about the state of data, not the communication +// pattern. +func (e *Bus) Fail(err error) *Bus { if err == nil { return e } @@ -81,28 +101,33 @@ func (e *Errors) Fail(err error) *Errors { e.mu.Lock() defer e.mu.Unlock() - return e.setErr(err) + return e.setFailure(err) } -// setErr handles setting errors.err. Sync locking gets +// setErr handles setting bus.failure. Sync locking gets // handled upstream of this call. -func (e *Errors) setErr(err error) *Errors { - if e.err == nil { - e.err = err +func (e *Bus) setFailure(err error) *Bus { + if e.failure == nil { + e.failure = err return e } - e.errs = append(e.errs, err) + // technically not a recoverable error: we're using the + // recoverable slice as an overflow container here to + // ensure everything is tracked. + e.recoverable = append(e.recoverable, err) return e } -// Add appends the error to the slice of recoverable and -// iterated errors (ie: errors.errs). If failFast is true, -// the first Added error will get copied to errors.err, -// causing the errors struct to identify as non-recoverably -// failed. -func (e *Errors) Add(err error) *Errors { +// AddRecoverable appends the error to the slice of recoverable +// errors (ie: bus.recoverable). If failFast is true, the first +// added error will get copied to bus.failure, causing the bus +// to identify as non-recoverably failed. +// +// TODO: nil return, not Bus, since we don't want people to return +// from errors.AddRecoverable(). +func (e *Bus) AddRecoverable(err error) *Bus { if err == nil { return e } @@ -110,44 +135,44 @@ func (e *Errors) Add(err error) *Errors { e.mu.Lock() defer e.mu.Unlock() - return e.addErr(err) + return e.addRecoverableErr(err) } // addErr handles adding errors to errors.errs. Sync locking // gets handled upstream of this call. -func (e *Errors) addErr(err error) *Errors { - if e.err == nil && e.failFast { - e.setErr(err) +func (e *Bus) addRecoverableErr(err error) *Bus { + if e.failure == nil && e.failFast { + e.setFailure(err) } - e.errs = append(e.errs, err) + e.recoverable = append(e.recoverable, err) return e } // --------------------------------------------------------------------------- -// Iteration Tracker +// Local aggregator // --------------------------------------------------------------------------- -// Tracker constructs a new errors tracker for aggregating errors -// in a single iteration loop. Trackers shouldn't be passed down -// to other funcs, and the function that spawned the tracker should -// always return `tracker.Err()` to ensure that hard failures are -// propagated upstream. -func (e *Errors) Tracker() *tracker { - return &tracker{ - mu: &sync.Mutex{}, - errs: e, +// Local constructs a new local bus to handle error aggregation in a +// constrained scope. Local busses shouldn't be passed down to other +// funcs, and the function that spawned the local bus should always +// return `local.Failure()` to ensure that hard failures are propagated +// back upstream. +func (e *Bus) Local() *localBus { + return &localBus{ + mu: &sync.Mutex{}, + bus: e, } } -type tracker struct { +type localBus struct { mu *sync.Mutex - errs *Errors + bus *Bus current error } -func (e *tracker) Add(err error) { +func (e *localBus) AddRecoverable(err error) { if err == nil { return } @@ -155,18 +180,18 @@ func (e *tracker) Add(err error) { e.mu.Lock() defer e.mu.Unlock() - if e.current == nil && e.errs.failFast { + if e.current == nil && e.bus.failFast { e.current = err } - e.errs.Add(err) + e.bus.AddRecoverable(err) } -// Err returns the primary error in the tracker. Will be nil if the -// original Errors is set to bestEffort handling. Does not return the -// underlying Errors.Err(). Should be called as the return value of -// any func which created a new tracker. -func (e *tracker) Err() error { +// Failure returns the failure that happened within the local bus. +// It does not return the underlying bus.Failure(), only the failure +// that was recorded within the local bus instance. This error should +// get returned by any func which created a local bus. +func (e *localBus) Failure() error { return e.current } diff --git a/src/pkg/fault/fault_test.go b/src/pkg/fault/fault_test.go index 9b68797af..c7d14a957 100644 --- a/src/pkg/fault/fault_test.go +++ b/src/pkg/fault/fault_test.go @@ -75,16 +75,16 @@ func (suite *FaultErrorsUnitSuite) TestErr() { suite.T().Run(test.name, func(t *testing.T) { n := fault.New(test.failFast) require.NotNil(t, n) - require.NoError(t, n.Err()) - require.Empty(t, n.Errs()) + require.NoError(t, n.Failure()) + require.Empty(t, n.Recovered()) e := n.Fail(test.fail) require.NotNil(t, e) - e = n.Add(test.add) + e = n.AddRecoverable(test.add) require.NotNil(t, e) - test.expect(t, n.Err()) + test.expect(t, n.Failure()) }) } } @@ -94,16 +94,16 @@ func (suite *FaultErrorsUnitSuite) TestFail() { n := fault.New(false) require.NotNil(t, n) - require.NoError(t, n.Err()) - require.Empty(t, n.Errs()) + require.NoError(t, n.Failure()) + require.Empty(t, n.Recovered()) n.Fail(assert.AnError) - assert.Error(t, n.Err()) - assert.Empty(t, n.Errs()) + assert.Error(t, n.Failure()) + assert.Empty(t, n.Recovered()) n.Fail(assert.AnError) - assert.Error(t, n.Err()) - assert.NotEmpty(t, n.Errs()) + assert.Error(t, n.Failure()) + assert.NotEmpty(t, n.Recovered()) } func (suite *FaultErrorsUnitSuite) TestErrs() { @@ -154,10 +154,10 @@ func (suite *FaultErrorsUnitSuite) TestErrs() { e := n.Fail(test.fail) require.NotNil(t, e) - e = n.Add(test.add) + e = n.AddRecoverable(test.add) require.NotNil(t, e) - test.expect(t, n.Errs()) + test.expect(t, n.Recovered()) }) } } @@ -168,16 +168,16 @@ func (suite *FaultErrorsUnitSuite) TestAdd() { n := fault.New(true) require.NotNil(t, n) - n.Add(assert.AnError) - assert.Error(t, n.Err()) - assert.Len(t, n.Errs(), 1) + n.AddRecoverable(assert.AnError) + assert.Error(t, n.Failure()) + assert.Len(t, n.Recovered(), 1) - n.Add(assert.AnError) - assert.Error(t, n.Err()) - assert.Len(t, n.Errs(), 2) + n.AddRecoverable(assert.AnError) + assert.Error(t, n.Failure()) + assert.Len(t, n.Recovered(), 2) } -func (suite *FaultErrorsUnitSuite) TestData() { +func (suite *FaultErrorsUnitSuite) TestErrors() { t := suite.T() // not fail-fast @@ -185,12 +185,12 @@ func (suite *FaultErrorsUnitSuite) TestData() { require.NotNil(t, n) n.Fail(errors.New("fail")) - n.Add(errors.New("1")) - n.Add(errors.New("2")) + n.AddRecoverable(errors.New("1")) + n.AddRecoverable(errors.New("2")) - d := n.Data() - assert.Equal(t, n.Err(), d.Err) - assert.ElementsMatch(t, n.Errs(), d.Errs) + d := n.Errors() + assert.Equal(t, n.Failure(), d.Failure) + assert.ElementsMatch(t, n.Recovered(), d.Recovered) assert.False(t, d.FailFast) // fail-fast @@ -198,12 +198,12 @@ func (suite *FaultErrorsUnitSuite) TestData() { require.NotNil(t, n) n.Fail(errors.New("fail")) - n.Add(errors.New("1")) - n.Add(errors.New("2")) + n.AddRecoverable(errors.New("1")) + n.AddRecoverable(errors.New("2")) - d = n.Data() - assert.Equal(t, n.Err(), d.Err) - assert.ElementsMatch(t, n.Errs(), d.Errs) + d = n.Errors() + assert.Equal(t, n.Failure(), d.Failure) + assert.ElementsMatch(t, n.Recovered(), d.Recovered) assert.True(t, d.FailFast) } @@ -214,17 +214,13 @@ func (suite *FaultErrorsUnitSuite) TestMarshalUnmarshal() { n := fault.New(false) require.NotNil(t, n) - n.Add(errors.New("1")) - n.Add(errors.New("2")) + n.AddRecoverable(errors.New("1")) + n.AddRecoverable(errors.New("2")) - data := n.Data() - - jsonStr, err := json.Marshal(data) + bs, err := json.Marshal(n.Errors()) require.NoError(t, err) - um := fault.ErrorsData{} - - err = json.Unmarshal(jsonStr, &um) + err = json.Unmarshal(bs, &fault.Errors{}) require.NoError(t, err) } @@ -246,7 +242,7 @@ func (suite *FaultErrorsUnitSuite) TestUnmarshalLegacy() { t.Logf("jsonStr is %s\n", jsonStr) - um := fault.ErrorsData{} + um := fault.Errors{} err = json.Unmarshal(jsonStr, &um) require.NoError(t, err) @@ -255,25 +251,25 @@ func (suite *FaultErrorsUnitSuite) TestUnmarshalLegacy() { func (suite *FaultErrorsUnitSuite) TestTracker() { t := suite.T() - be := fault.New(false) + eb := fault.New(false) - ba := be.Tracker() - assert.NoError(t, ba.Err()) - assert.Empty(t, be.Errs()) + lb := eb.Local() + assert.NoError(t, lb.Failure()) + assert.Empty(t, eb.Recovered()) - ba.Add(assert.AnError) - assert.NoError(t, ba.Err()) - assert.NoError(t, be.Err()) - assert.NotEmpty(t, be.Errs()) + lb.AddRecoverable(assert.AnError) + assert.NoError(t, lb.Failure()) + assert.NoError(t, eb.Failure()) + assert.NotEmpty(t, eb.Recovered()) - fe := fault.New(true) + ebt := fault.New(true) - fa := fe.Tracker() - assert.NoError(t, fa.Err()) - assert.Empty(t, fe.Errs()) + lbt := ebt.Local() + assert.NoError(t, lbt.Failure()) + assert.Empty(t, ebt.Recovered()) - fa.Add(assert.AnError) - assert.Error(t, fa.Err()) - assert.Error(t, fe.Err()) - assert.NotEmpty(t, fe.Errs()) + lbt.AddRecoverable(assert.AnError) + assert.Error(t, lbt.Failure()) + assert.Error(t, ebt.Failure()) + assert.NotEmpty(t, ebt.Recovered()) } diff --git a/src/pkg/repository/loadtest/repository_load_test.go b/src/pkg/repository/loadtest/repository_load_test.go index a080d916c..b29c00dcb 100644 --- a/src/pkg/repository/loadtest/repository_load_test.go +++ b/src/pkg/repository/loadtest/repository_load_test.go @@ -185,8 +185,8 @@ func runBackupLoadTest( assert.Less(t, 0, b.Results.ItemsWritten, "items written") assert.Less(t, int64(0), b.Results.BytesUploaded, "bytes uploaded") assert.Equal(t, len(users), b.Results.ResourceOwners, "resource owners") - assert.NoError(t, b.Errors.Err(), "non-recoverable error") - assert.Empty(t, b.Errors.Errs(), "recoverable errors") + assert.NoError(t, b.Errors.Failure(), "non-recoverable error") + assert.Empty(t, b.Errors.Recovered(), "recoverable errors") assert.NoError(t, b.Results.ReadErrors, "read errors") assert.NoError(t, b.Results.WriteErrors, "write errors") }) @@ -242,7 +242,7 @@ func runBackupDetailsLoadTest( t.Run("backup_details_"+name, func(t *testing.T) { var ( - errs *fault.Errors + errs *fault.Bus b *backup.Backup ds *details.Details labels = pprof.Labels("details_load_test", name) @@ -252,8 +252,8 @@ func runBackupDetailsLoadTest( ds, b, errs = r.BackupDetails(ctx, backupID) }) - require.NoError(t, errs.Err(), "retrieving details in backup "+backupID) - require.Empty(t, errs.Errs(), "retrieving details in backup "+backupID) + require.NoError(t, errs.Failure(), "retrieving details in backup "+backupID) + require.Empty(t, errs.Recovered(), "retrieving details in backup "+backupID) require.NotNil(t, ds, "backup details must exist") require.NotNil(t, b, "backup must exist") @@ -294,8 +294,8 @@ func doRestoreLoadTest( assert.Less(t, 0, r.Results.ItemsRead, "items read") assert.Less(t, 0, r.Results.ItemsWritten, "items written") assert.Equal(t, len(users), r.Results.ResourceOwners, "resource owners") - assert.NoError(t, r.Errors.Err(), "non-recoverable error") - assert.Empty(t, r.Errors.Errs(), "recoverable errors") + assert.NoError(t, r.Errors.Failure(), "non-recoverable error") + assert.Empty(t, r.Errors.Recovered(), "recoverable errors") assert.NoError(t, r.Results.ReadErrors, "read errors") assert.NoError(t, r.Results.WriteErrors, "write errors") assert.Equal(t, expectItemCount, r.Results.ItemsWritten, "backup and restore wrote the same count of items") diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index a28f7f2fb..947ce6de0 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -35,12 +35,12 @@ var ErrorRepoAlreadyExists = errors.New("a repository was already initialized wi // repository. type BackupGetter interface { Backup(ctx context.Context, id model.StableID) (*backup.Backup, error) - Backups(ctx context.Context, ids []model.StableID) ([]*backup.Backup, *fault.Errors) + Backups(ctx context.Context, ids []model.StableID) ([]*backup.Backup, *fault.Bus) BackupsByTag(ctx context.Context, fs ...store.FilterOption) ([]*backup.Backup, error) BackupDetails( ctx context.Context, backupID string, - ) (*details.Details, *backup.Backup, *fault.Errors) + ) (*details.Details, *backup.Backup, *fault.Bus) } type Repository interface { @@ -314,7 +314,7 @@ func (r repository) Backup(ctx context.Context, id model.StableID) (*backup.Back // BackupsByID lists backups by ID. Returns as many backups as possible with // errors for the backups it was unable to retrieve. -func (r repository) Backups(ctx context.Context, ids []model.StableID) ([]*backup.Backup, *fault.Errors) { +func (r repository) Backups(ctx context.Context, ids []model.StableID) ([]*backup.Backup, *fault.Bus) { var ( bups []*backup.Backup errs = fault.New(false) @@ -324,7 +324,7 @@ func (r repository) Backups(ctx context.Context, ids []model.StableID) ([]*backu for _, id := range ids { b, err := sw.GetBackup(ctx, id) if err != nil { - errs.Add(clues.Stack(err).With("backup_id", id)) + errs.AddRecoverable(clues.Stack(err).With("backup_id", id)) } bups = append(bups, b) @@ -343,7 +343,7 @@ func (r repository) BackupsByTag(ctx context.Context, fs ...store.FilterOption) func (r repository) BackupDetails( ctx context.Context, backupID string, -) (*details.Details, *backup.Backup, *fault.Errors) { +) (*details.Details, *backup.Backup, *fault.Bus) { sw := store.NewKopiaStore(r.modelStore) errs := fault.New(false) diff --git a/src/pkg/selectors/exchange.go b/src/pkg/selectors/exchange.go index d47e67b3f..d048bab97 100644 --- a/src/pkg/selectors/exchange.go +++ b/src/pkg/selectors/exchange.go @@ -719,7 +719,7 @@ func (s ExchangeScope) setDefaults() { func (s exchange) Reduce( ctx context.Context, deets *details.Details, - errs *fault.Errors, + errs *fault.Bus, ) *details.Details { return reduce[ExchangeScope]( ctx, diff --git a/src/pkg/selectors/onedrive.go b/src/pkg/selectors/onedrive.go index 5fe942518..f9cc037ea 100644 --- a/src/pkg/selectors/onedrive.go +++ b/src/pkg/selectors/onedrive.go @@ -498,7 +498,7 @@ func (s OneDriveScope) DiscreteCopy(user string) OneDriveScope { func (s oneDrive) Reduce( ctx context.Context, deets *details.Details, - errs *fault.Errors, + errs *fault.Bus, ) *details.Details { return reduce[OneDriveScope]( ctx, diff --git a/src/pkg/selectors/scopes.go b/src/pkg/selectors/scopes.go index ff24ca845..6b7d45c89 100644 --- a/src/pkg/selectors/scopes.go +++ b/src/pkg/selectors/scopes.go @@ -288,7 +288,7 @@ func reduce[T scopeT, C categoryT]( deets *details.Details, s Selector, dataCategories map[path.CategoryType]C, - errs *fault.Errors, + errs *fault.Bus, ) *details.Details { ctx, end := D.Span(ctx, "selectors:reduce") defer end() @@ -314,7 +314,7 @@ func reduce[T scopeT, C categoryT]( for _, ent := range deets.Items() { repoPath, err := path.FromDataLayerPath(ent.RepoRef, true) if err != nil { - errs.Add(clues.Wrap(err, "transforming repoRef to path").WithClues(ctx)) + errs.AddRecoverable(clues.Wrap(err, "transforming repoRef to path").WithClues(ctx)) continue } @@ -326,7 +326,7 @@ func reduce[T scopeT, C categoryT]( if len(ent.LocationRef) > 0 { pb, err := path.Builder{}.SplitUnescapeAppend(ent.LocationRef) if err != nil { - errs.Add(clues.Wrap(err, "transforming locationRef to path").WithClues(ctx)) + errs.AddRecoverable(clues.Wrap(err, "transforming locationRef to path").WithClues(ctx)) continue } @@ -338,7 +338,7 @@ func reduce[T scopeT, C categoryT]( repoPath.Category(), true) if err != nil { - errs.Add(clues.Wrap(err, "transforming locationRef to path").WithClues(ctx)) + errs.AddRecoverable(clues.Wrap(err, "transforming locationRef to path").WithClues(ctx)) continue } } diff --git a/src/pkg/selectors/scopes_test.go b/src/pkg/selectors/scopes_test.go index e8b4a4cc0..1a6dfc102 100644 --- a/src/pkg/selectors/scopes_test.go +++ b/src/pkg/selectors/scopes_test.go @@ -284,7 +284,7 @@ func (suite *SelectorScopesSuite) TestReduce() { dataCats, errs) require.NotNil(t, result) - require.NoError(t, errs.Err(), "no recoverable errors") + require.NoError(t, errs.Failure(), "no recoverable errors") assert.Len(t, result.Entries, test.expectLen) }) } diff --git a/src/pkg/selectors/selectors.go b/src/pkg/selectors/selectors.go index 41bfecbcd..21be47d37 100644 --- a/src/pkg/selectors/selectors.go +++ b/src/pkg/selectors/selectors.go @@ -70,7 +70,7 @@ var ( const All = "All" type Reducer interface { - Reduce(context.Context, *details.Details, *fault.Errors) *details.Details + Reduce(context.Context, *details.Details, *fault.Bus) *details.Details } // selectorResourceOwners aggregates all discrete path category types described @@ -240,7 +240,7 @@ func (s Selector) PathService() path.ServiceType { func (s Selector) Reduce( ctx context.Context, deets *details.Details, - errs *fault.Errors, + errs *fault.Bus, ) (*details.Details, error) { r, err := selectorAsIface[Reducer](s) if err != nil { diff --git a/src/pkg/selectors/sharepoint.go b/src/pkg/selectors/sharepoint.go index cfefeea82..4da71852f 100644 --- a/src/pkg/selectors/sharepoint.go +++ b/src/pkg/selectors/sharepoint.go @@ -570,7 +570,7 @@ func (s SharePointScope) DiscreteCopy(site string) SharePointScope { func (s sharePoint) Reduce( ctx context.Context, deets *details.Details, - errs *fault.Errors, + errs *fault.Bus, ) *details.Details { return reduce[SharePointScope]( ctx, diff --git a/src/pkg/services/m365/m365.go b/src/pkg/services/m365/m365.go index 3e4361bfa..c05f2e4d7 100644 --- a/src/pkg/services/m365/m365.go +++ b/src/pkg/services/m365/m365.go @@ -31,12 +31,12 @@ func UsersCompat(ctx context.Context, acct account.Account) ([]*User, error) { return nil, err } - return users, errs.Err() + return users, errs.Failure() } // Users returns a list of users in the specified M365 tenant // TODO: Implement paging support -func Users(ctx context.Context, acct account.Account, errs *fault.Errors) ([]*User, error) { +func Users(ctx context.Context, acct account.Account, errs *fault.Bus) ([]*User, error) { gc, err := connector.NewGraphConnector(ctx, graph.HTTPClient(graph.NoTimeout()), acct, connector.Users, errs) if err != nil { return nil, errors.Wrap(err, "initializing M365 graph connection") @@ -61,7 +61,7 @@ func Users(ctx context.Context, acct account.Account, errs *fault.Errors) ([]*Us return ret, nil } -func UserIDs(ctx context.Context, acct account.Account, errs *fault.Errors) ([]string, error) { +func UserIDs(ctx context.Context, acct account.Account, errs *fault.Bus) ([]string, error) { users, err := Users(ctx, acct, errs) if err != nil { return nil, err @@ -77,7 +77,7 @@ func UserIDs(ctx context.Context, acct account.Account, errs *fault.Errors) ([]s // UserPNs retrieves all user principleNames in the tenant. Principle Names // can be used analogous userIDs in graph API queries. -func UserPNs(ctx context.Context, acct account.Account, errs *fault.Errors) ([]string, error) { +func UserPNs(ctx context.Context, acct account.Account, errs *fault.Bus) ([]string, error) { users, err := Users(ctx, acct, errs) if err != nil { return nil, err @@ -92,7 +92,7 @@ func UserPNs(ctx context.Context, acct account.Account, errs *fault.Errors) ([]s } // SiteURLs returns a list of SharePoint site WebURLs in the specified M365 tenant -func SiteURLs(ctx context.Context, acct account.Account, errs *fault.Errors) ([]string, error) { +func SiteURLs(ctx context.Context, acct account.Account, errs *fault.Bus) ([]string, error) { gc, err := connector.NewGraphConnector(ctx, graph.HTTPClient(graph.NoTimeout()), acct, connector.Sites, errs) if err != nil { return nil, errors.Wrap(err, "initializing M365 graph connection") @@ -102,7 +102,7 @@ func SiteURLs(ctx context.Context, acct account.Account, errs *fault.Errors) ([] } // SiteURLs returns a list of SharePoint sites IDs in the specified M365 tenant -func SiteIDs(ctx context.Context, acct account.Account, errs *fault.Errors) ([]string, error) { +func SiteIDs(ctx context.Context, acct account.Account, errs *fault.Bus) ([]string, error) { gc, err := connector.NewGraphConnector(ctx, graph.HTTPClient(graph.NoTimeout()), acct, connector.Sites, errs) if err != nil { return nil, errors.Wrap(err, "initializing graph connection")