From b95231d85fbb8d95fee3b45dd7a03be899c57284 Mon Sep 17 00:00:00 2001 From: Keepers Date: Tue, 21 Feb 2023 21:51:06 -0700 Subject: [PATCH] add fault.tracker for error additions (#2510) ## Description Realized we had a race condition: in an async runtime it's possible for an errs.Err() to be returned by multiple functions, even though that Err() was only sourced by one of them. The addition of a tracker contains the returned error into the scope of that func so that only the error produced in the current iteration is returned. ## Does this PR need a docs update or release note? - [x] :no_entry: No ## Type of change - [x] :broom: Tech Debt/Cleanup ## Issue(s) * #1970 ## Test Plan - [x] :zap: Unit test - [x] :green_heart: E2E --- src/internal/connector/discovery/api/users.go | 11 +++-- .../connector/exchange/data_collections.go | 7 +-- .../connector/exchange/service_iterators.go | 19 +++++--- .../connector/exchange/service_restore.go | 42 +++++++++-------- src/internal/connector/graph_connector.go | 8 ++-- .../connector/sharepoint/api/pages.go | 13 +++--- .../connector/sharepoint/collection.go | 22 +++++---- .../connector/sharepoint/data_collections.go | 33 ++++++++------ src/internal/connector/sharepoint/list.go | 27 ++++++----- src/internal/connector/sharepoint/restore.go | 30 ++++++++----- src/internal/kopia/wrapper.go | 17 ++++--- src/internal/operations/manifests.go | 7 +-- src/internal/operations/restore.go | 9 ++-- src/pkg/fault/example_fault_test.go | 37 ++++++++++++++- src/pkg/fault/fault.go | 45 +++++++++++++++++++ src/pkg/fault/fault_test.go | 26 +++++++++++ 16 files changed, 251 insertions(+), 102 deletions(-) diff --git a/src/internal/connector/discovery/api/users.go b/src/internal/connector/discovery/api/users.go index 3006c178b..05528eb2c 100644 --- a/src/internal/connector/discovery/api/users.go +++ b/src/internal/connector/discovery/api/users.go @@ -108,16 +108,19 @@ func (c Users) GetAll(ctx context.Context, errs *fault.Errors) ([]models.Userabl return nil, clues.Wrap(err, "creating users iterator").WithClues(ctx).With(graph.ErrData(err)...) } - us := make([]models.Userable, 0) + var ( + us = make([]models.Userable, 0) + et = errs.Tracker() + ) iterator := func(item any) bool { - if errs.Err() != nil { + if et.Err() != nil { return false } u, err := validateUser(item) if err != nil { - errs.Add(clues.Wrap(err, "validating user").WithClues(ctx).With(graph.ErrData(err)...)) + et.Add(clues.Wrap(err, "validating user").WithClues(ctx).With(graph.ErrData(err)...)) } else { us = append(us, u) } @@ -129,7 +132,7 @@ func (c Users) GetAll(ctx context.Context, errs *fault.Errors) ([]models.Userabl return nil, clues.Wrap(err, "iterating all users").WithClues(ctx).With(graph.ErrData(err)...) } - return us, errs.Err() + return us, et.Err() } func (c Users) GetByID(ctx context.Context, userID string) (models.Userable, error) { diff --git a/src/internal/connector/exchange/data_collections.go b/src/internal/connector/exchange/data_collections.go index 5becad190..1d62d4f56 100644 --- a/src/internal/connector/exchange/data_collections.go +++ b/src/internal/connector/exchange/data_collections.go @@ -178,6 +178,7 @@ func DataCollections( var ( user = selector.DiscreteOwner collections = []data.BackupCollection{} + et = errs.Tracker() ) cdps, err := parseMetadataCollections(ctx, metadata, errs) @@ -186,7 +187,7 @@ func DataCollections( } for _, scope := range eb.Scopes() { - if errs.Err() != nil { + if et.Err() != nil { break } @@ -200,14 +201,14 @@ func DataCollections( su, errs) if err != nil { - errs.Add(err) + et.Add(err) continue } collections = append(collections, dcs...) } - return collections, nil, errs.Err() + return collections, nil, et.Err() } func getterByType(ac api.Client, category path.CategoryType) (addedAndRemovedItemIDsGetter, error) { diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index 1ee6636fe..02f9d92b0 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -68,9 +68,11 @@ func filterContainersAndFillCollections( return err } + et := errs.Tracker() + for _, c := range resolver.Items() { - if errs.Err() != nil { - return errs.Err() + if et.Err() != nil { + return et.Err() } cID := *c.GetId() @@ -100,7 +102,7 @@ func filterContainersAndFillCollections( added, removed, newDelta, err := getter.GetAddedAndRemovedItemIDs(ctx, qp.ResourceOwner, cID, prevDelta) if err != nil { if !graph.IsErrDeletedInFlight(err) { - errs.Add(err) + et.Add(err) continue } @@ -155,8 +157,12 @@ func filterContainersAndFillCollections( // in the `previousPath` set, but does not exist in the current container // resolver (which contains all the resource owners' current containers). for id, p := range tombstones { + if et.Err() != nil { + return et.Err() + } + if collections[id] != nil { - errs.Add(clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ctx)) + et.Add(clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ctx)) continue } @@ -205,15 +211,14 @@ func filterContainersAndFillCollections( path.ExchangeService, qp.Category, entries, - statusUpdater, - ) + statusUpdater) if err != nil { return clues.Wrap(err, "making metadata collection") } collections["metadata"] = col - return errs.Err() + return et.Err() } // produces a set of id:path pairs from the deltapaths map. diff --git a/src/internal/connector/exchange/service_restore.go b/src/internal/connector/exchange/service_restore.go index 52dc80977..dff05be54 100644 --- a/src/internal/connector/exchange/service_restore.go +++ b/src/internal/connector/exchange/service_restore.go @@ -112,6 +112,7 @@ func RestoreExchangeEvent( ctx = clues.Add(ctx, "item_id", ptr.Val(event.GetId())) var ( + et = errs.Tracker() transformedEvent = support.ToEventSimplified(event) attached []models.Attachmentable ) @@ -139,19 +140,19 @@ func RestoreExchangeEvent( } for _, attach := range attached { - if errs.Err() != nil { + if et.Err() != nil { break } if err := uploadAttachment(ctx, uploader, attach); err != nil { - errs.Add(err) + et.Add(err) } } info := api.EventInfo(event) info.Size = int64(len(bits)) - return info, errs.Err() + return info, et.Err() } // RestoreMailMessage utility function to place an exchange.Mail @@ -255,17 +256,19 @@ func SendMailToBackStore( return clues.New("nil response from post").WithClues(ctx) } - id := ptr.Val(response.GetId()) - - uploader := &mailAttachmentUploader{ - userID: user, - folderID: destination, - itemID: id, - service: service, - } + var ( + et = errs.Tracker() + id = ptr.Val(response.GetId()) + uploader = &mailAttachmentUploader{ + userID: user, + folderID: destination, + itemID: id, + service: service, + } + ) for _, attachment := range attached { - if errs.Err() != nil { + if et.Err() != nil { break } @@ -280,13 +283,13 @@ func SendMailToBackStore( continue } - errs.Add(errors.Wrap(err, "uploading mail attachment")) + et.Add(errors.Wrap(err, "uploading mail attachment")) break } } - return errs.Err() + return et.Err() } // RestoreExchangeDataCollections restores M365 objects in data.RestoreCollection to MSFT @@ -307,6 +310,7 @@ func RestoreExchangeDataCollections( userID string // TODO policy to be updated from external source after completion of refactoring policy = control.Copy + et = errs.Tracker() ) if len(dcs) > 0 { @@ -315,8 +319,8 @@ func RestoreExchangeDataCollections( } for _, dc := range dcs { - if errs.Err() != nil { - return nil, errs.Err() + if et.Err() != nil { + break } userCaches := directoryCaches[userID] @@ -333,7 +337,7 @@ func RestoreExchangeDataCollections( userCaches, errs) if err != nil { - errs.Add(clues.Wrap(err, "creating destination").WithClues(ctx)) + et.Add(clues.Wrap(err, "creating destination").WithClues(ctx)) continue } @@ -351,10 +355,10 @@ func RestoreExchangeDataCollections( support.Restore, len(dcs), metrics, - errs.Err(), + et.Err(), dest.ContainerName) - return status, errs.Err() + return status, et.Err() } // restoreCollection handles restoration of an individual collection. diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index a61253c0f..165420dd2 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -320,15 +320,17 @@ func getResources( return nil, clues.Stack(err).WithClues(ctx).With(graph.ErrData(err)...) } + et := errs.Tracker() + callbackFunc := func(item any) bool { - if errs.Err() != nil { + if et.Err() != nil { return false } k, v, err := identify(item) if err != nil { if !errors.Is(err, errKnownSkippableCase) { - errs.Add(clues.Stack(err). + et.Add(clues.Stack(err). WithClues(ctx). With("query_url", gs.Adapter().GetBaseUrl())) } @@ -345,5 +347,5 @@ func getResources( return nil, clues.Stack(err).WithClues(ctx).With(graph.ErrData(err)...) } - return resources, errs.Err() + return resources, et.Err() } diff --git a/src/internal/connector/sharepoint/api/pages.go b/src/internal/connector/sharepoint/api/pages.go index af3e9c31f..8f33760fb 100644 --- a/src/internal/connector/sharepoint/api/pages.go +++ b/src/internal/connector/sharepoint/api/pages.go @@ -34,7 +34,7 @@ func GetSitePages( col = make([]models.SitePageable, 0) semaphoreCh = make(chan struct{}, fetchChannelSize) opts = retrieveSitePageOptions() - err error + et = errs.Tracker() wg sync.WaitGroup m sync.Mutex ) @@ -49,7 +49,7 @@ func GetSitePages( } for _, entry := range pages { - if errs.Err() != nil { + if et.Err() != nil { break } @@ -61,11 +61,14 @@ func GetSitePages( defer wg.Done() defer func() { <-semaphoreCh }() - var page models.SitePageable + var ( + page models.SitePageable + err error + ) page, err = serv.Client().SitesById(siteID).PagesById(pageID).Get(ctx, opts) if err != nil { - errs.Add(clues.Wrap(err, "fetching page").WithClues(ctx).With(graph.ErrData(err)...)) + et.Add(clues.Wrap(err, "fetching page").WithClues(ctx).With(graph.ErrData(err)...)) return } @@ -75,7 +78,7 @@ func GetSitePages( wg.Wait() - return col, errs.Err() + return col, et.Err() } // fetchPages utility function to return the tuple of item diff --git a/src/internal/connector/sharepoint/collection.go b/src/internal/connector/sharepoint/collection.go index 8cfe8731b..121f70d70 100644 --- a/src/internal/connector/sharepoint/collection.go +++ b/src/internal/connector/sharepoint/collection.go @@ -223,7 +223,10 @@ func (sc *Collection) retrieveLists( progress chan<- struct{}, errs *fault.Errors, ) (numMetrics, error) { - var metrics numMetrics + var ( + metrics numMetrics + et = errs.Tracker() + ) lists, err := loadSiteLists(ctx, sc.service, sc.fullPath.ResourceOwner(), sc.jobs, errs) if err != nil { @@ -234,13 +237,13 @@ func (sc *Collection) retrieveLists( // For each models.Listable, object is serialized and the metrics are collected. // The progress is objected via the passed in channel. for _, lst := range lists { - if errs.Err() != nil { + if et.Err() != nil { break } byteArray, err := serializeContent(wtr, lst) if err != nil { - errs.Add(clues.Wrap(err, "serializing list").WithClues(ctx)) + et.Add(clues.Wrap(err, "serializing list").WithClues(ctx)) continue } @@ -266,7 +269,7 @@ func (sc *Collection) retrieveLists( } } - return metrics, errs.Err() + return metrics, et.Err() } func (sc *Collection) retrievePages( @@ -275,7 +278,10 @@ func (sc *Collection) retrievePages( progress chan<- struct{}, errs *fault.Errors, ) (numMetrics, error) { - var metrics numMetrics + var ( + metrics numMetrics + et = errs.Tracker() + ) betaService := sc.betaService if betaService == nil { @@ -292,13 +298,13 @@ func (sc *Collection) retrievePages( // Pageable objects are not supported in v1.0 of msgraph at this time. // TODO: Verify Parsable interface supported with modified-Pageable for _, pg := range pages { - if errs.Err() != nil { + if et.Err() != nil { break } byteArray, err := serializeContent(wtr, pg) if err != nil { - errs.Add(clues.Wrap(err, "serializing page").WithClues(ctx)) + et.Add(clues.Wrap(err, "serializing page").WithClues(ctx)) continue } @@ -318,7 +324,7 @@ func (sc *Collection) retrievePages( } } - return metrics, errs.Err() + return metrics, et.Err() } func serializeContent(writer *kw.JsonSerializationWriter, obj absser.Parsable) ([]byte, error) { diff --git a/src/internal/connector/sharepoint/data_collections.go b/src/internal/connector/sharepoint/data_collections.go index b941fd479..6a9373781 100644 --- a/src/internal/connector/sharepoint/data_collections.go +++ b/src/internal/connector/sharepoint/data_collections.go @@ -44,12 +44,13 @@ func DataCollections( } var ( + et = errs.Tracker() site = b.DiscreteOwner collections = []data.BackupCollection{} ) for _, scope := range b.Scopes() { - if errs.Err() != nil { + if et.Err() != nil { break } @@ -73,7 +74,7 @@ func DataCollections( ctrlOpts, errs) if err != nil { - errs.Add(err) + et.Add(err) continue } @@ -88,7 +89,7 @@ func DataCollections( su, ctrlOpts) if err != nil { - errs.Add(err) + et.Add(err) continue } @@ -102,7 +103,7 @@ func DataCollections( ctrlOpts, errs) if err != nil { - errs.Add(err) + et.Add(err) continue } } @@ -111,7 +112,7 @@ func DataCollections( foldersComplete <- struct{}{} } - return collections, nil, errs.Err() + return collections, nil, et.Err() } func collectLists( @@ -124,7 +125,10 @@ func collectLists( ) ([]data.BackupCollection, error) { logger.Ctx(ctx).With("site", siteID).Debug("Creating SharePoint List Collections") - spcs := make([]data.BackupCollection, 0) + var ( + et = errs.Tracker() + spcs = make([]data.BackupCollection, 0) + ) lists, err := preFetchLists(ctx, serv, siteID) if err != nil { @@ -132,7 +136,7 @@ func collectLists( } for _, tuple := range lists { - if errs.Err() != nil { + if et.Err() != nil { break } @@ -143,7 +147,7 @@ func collectLists( path.ListsCategory, false) if err != nil { - errs.Add(clues.Wrap(err, "creating list collection path").WithClues(ctx)) + et.Add(clues.Wrap(err, "creating list collection path").WithClues(ctx)) } collection := NewCollection(dir, serv, List, updater.UpdateStatus, ctrlOpts) @@ -152,7 +156,7 @@ func collectLists( spcs = append(spcs, collection) } - return spcs, errs.Err() + return spcs, et.Err() } // collectLibraries constructs a onedrive Collections struct and Get()s @@ -204,7 +208,10 @@ func collectPages( ) ([]data.BackupCollection, error) { logger.Ctx(ctx).Debug("creating SharePoint Pages collections") - spcs := make([]data.BackupCollection, 0) + var ( + et = errs.Tracker() + spcs = make([]data.BackupCollection, 0) + ) // make the betaClient // Need to receive From DataCollection Call @@ -221,7 +228,7 @@ func collectPages( } for _, tuple := range tuples { - if errs.Err() != nil { + if et.Err() != nil { break } @@ -232,7 +239,7 @@ func collectPages( path.PagesCategory, false) if err != nil { - errs.Add(clues.Wrap(err, "creating page collection path").WithClues(ctx)) + et.Add(clues.Wrap(err, "creating page collection path").WithClues(ctx)) } collection := NewCollection(dir, serv, Pages, updater.UpdateStatus, ctrlOpts) @@ -242,7 +249,7 @@ func collectPages( spcs = append(spcs, collection) } - return spcs, errs.Err() + return spcs, et.Err() } type folderMatcher struct { diff --git a/src/internal/connector/sharepoint/list.go b/src/internal/connector/sharepoint/list.go index a892fd976..f753d847e 100644 --- a/src/internal/connector/sharepoint/list.go +++ b/src/internal/connector/sharepoint/list.go @@ -97,6 +97,7 @@ func loadSiteLists( var ( results = make([]models.Listable, 0) semaphoreCh = make(chan struct{}, fetchChannelSize) + et = errs.Tracker() wg sync.WaitGroup m sync.Mutex ) @@ -111,8 +112,8 @@ func loadSiteLists( } for _, listID := range listIDs { - if errs.Err() != nil { - return nil, errs.Err() + if et.Err() != nil { + break } semaphoreCh <- struct{}{} @@ -130,13 +131,13 @@ func loadSiteLists( entry, err = gs.Client().SitesById(siteID).ListsById(id).Get(ctx, nil) if err != nil { - errs.Add(clues.Wrap(err, "getting site list").WithClues(ctx).With(graph.ErrData(err)...)) + et.Add(clues.Wrap(err, "getting site list").WithClues(ctx).With(graph.ErrData(err)...)) return } cols, cTypes, lItems, err := fetchListContents(ctx, gs, siteID, id, errs) if err != nil { - errs.Add(clues.Wrap(err, "getting list contents")) + et.Add(clues.Wrap(err, "getting list contents")) return } @@ -149,7 +150,7 @@ func loadSiteLists( wg.Wait() - return results, errs.Err() + return results, et.Err() } // fetchListContents utility function to retrieve associated M365 relationships @@ -198,6 +199,7 @@ func fetchListItems( prefix = gs.Client().SitesById(siteID).ListsById(listID) builder = prefix.Items() itms = make([]models.ListItemable, 0) + et = errs.Tracker() ) for { @@ -211,7 +213,7 @@ func fetchListItems( } for _, itm := range resp.GetValue() { - if errs.Err() != nil { + if et.Err() != nil { break } @@ -219,7 +221,7 @@ func fetchListItems( fields, err := newPrefix.Fields().Get(ctx, nil) if err != nil { - errs.Add(clues.Wrap(err, "getting list fields").WithClues(ctx).With(graph.ErrData(err)...)) + et.Add(clues.Wrap(err, "getting list fields").WithClues(ctx).With(graph.ErrData(err)...)) continue } @@ -235,7 +237,7 @@ func fetchListItems( builder = mssite.NewItemListsItemItemsRequestBuilder(*resp.GetOdataNextLink(), gs.Adapter()) } - return itms, errs.Err() + return itms, et.Err() } // fetchColumns utility function to return columns from a site. @@ -301,6 +303,7 @@ func fetchContentTypes( errs *fault.Errors, ) ([]models.ContentTypeable, error) { var ( + et = errs.Tracker() cTypes = make([]models.ContentTypeable, 0) builder = gs.Client().SitesById(siteID).ListsById(listID).ContentTypes() ) @@ -316,7 +319,7 @@ func fetchContentTypes( } for _, cont := range resp.GetValue() { - if errs.Err() != nil { + if et.Err() != nil { break } @@ -324,7 +327,7 @@ func fetchContentTypes( links, err := fetchColumnLinks(ctx, gs, siteID, listID, id) if err != nil { - errs.Add(err) + et.Add(err) continue } @@ -332,7 +335,7 @@ func fetchContentTypes( cs, err := fetchColumns(ctx, gs, siteID, listID, id) if err != nil { - errs.Add(err) + et.Add(err) continue } @@ -348,7 +351,7 @@ func fetchContentTypes( builder = mssite.NewItemListsItemContentTypesRequestBuilder(*resp.GetOdataNextLink(), gs.Adapter()) } - return cTypes, errs.Err() + return cTypes, et.Err() } func fetchColumnLinks( diff --git a/src/internal/connector/sharepoint/restore.go b/src/internal/connector/sharepoint/restore.go index a5a0076f2..174189179 100644 --- a/src/internal/connector/sharepoint/restore.go +++ b/src/internal/connector/sharepoint/restore.go @@ -226,13 +226,14 @@ func RestoreListCollection( directory = dc.FullPath() siteID = directory.ResourceOwner() items = dc.Items(ctx, errs) + et = errs.Tracker() ) trace.Log(ctx, "gc:sharepoint:restoreListCollection", directory.String()) for { - if errs.Err() != nil { - return metrics, errs.Err() + if et.Err() != nil { + break } select { @@ -252,7 +253,7 @@ func RestoreListCollection( siteID, restoreContainerName) if err != nil { - errs.Add(err) + et.Add(err) continue } @@ -260,7 +261,7 @@ func RestoreListCollection( itemPath, err := dc.FullPath().Append(itemData.UUID(), true) if err != nil { - errs.Add(clues.Wrap(err, "appending item to full path").WithClues(ctx)) + et.Add(clues.Wrap(err, "appending item to full path").WithClues(ctx)) continue } @@ -275,6 +276,8 @@ func RestoreListCollection( metrics.Successes++ } } + + return metrics, et.Err() } // RestorePageCollection handles restoration of an individual site page collection. @@ -305,14 +308,15 @@ func RestorePageCollection( return metrics, clues.Wrap(err, "constructing graph client") } - service := discover.NewBetaService(adpt) - - // Restore items from collection - items := dc.Items(ctx, errs) + var ( + et = errs.Tracker() + service = discover.NewBetaService(adpt) + items = dc.Items(ctx, errs) + ) for { - if errs.Err() != nil { - return metrics, errs.Err() + if et.Err() != nil { + break } select { @@ -332,7 +336,7 @@ func RestorePageCollection( siteID, restoreContainerName) if err != nil { - errs.Add(err) + et.Add(err) continue } @@ -340,7 +344,7 @@ func RestorePageCollection( itemPath, err := dc.FullPath().Append(itemData.UUID(), true) if err != nil { - errs.Add(clues.Wrap(err, "appending item to full path").WithClues(ctx)) + et.Add(clues.Wrap(err, "appending item to full path").WithClues(ctx)) continue } @@ -355,4 +359,6 @@ func RestorePageCollection( metrics.Successes++ } } + + return metrics, et.Err() } diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index f40823a4c..732872c65 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -397,23 +397,26 @@ func (w Wrapper) RestoreMultipleItems( return nil, err } - // Maps short ID of parent path to data collection for that folder. - cols := map[string]*kopiaDataCollection{} + var ( + // Maps short ID of parent path to data collection for that folder. + cols = map[string]*kopiaDataCollection{} + et = errs.Tracker() + ) for _, itemPath := range paths { - if errs.Err() != nil { - return nil, errs.Err() + if et.Err() != nil { + return nil, et.Err() } ds, err := getItemStream(ctx, itemPath, snapshotRoot, bcounter) if err != nil { - errs.Add(err) + et.Add(err) continue } parentPath, err := itemPath.Dir() if err != nil { - errs.Add(clues.Wrap(err, "making directory collection").WithClues(ctx)) + et.Add(clues.Wrap(err, "making directory collection").WithClues(ctx)) continue } @@ -437,7 +440,7 @@ func (w Wrapper) RestoreMultipleItems( res = append(res, c) } - return res, errs.Err() + return res, et.Err() } // DeleteSnapshot removes the provided manifest from kopia. diff --git a/src/internal/operations/manifests.go b/src/internal/operations/manifests.go index 1460bb5a7..305443b07 100644 --- a/src/internal/operations/manifests.go +++ b/src/internal/operations/manifests.go @@ -139,10 +139,11 @@ func verifyDistinctBases(ctx context.Context, mans []*kopia.ManifestEntry, errs var ( failed bool reasons = map[string]manifest.ID{} + et = errs.Tracker() ) for _, man := range mans { - if errs.Err() != nil { + if et.Err() != nil { break } @@ -162,7 +163,7 @@ func verifyDistinctBases(ctx context.Context, mans []*kopia.ManifestEntry, errs if b, ok := reasons[reasonKey]; ok { failed = true - errs.Add(clues.New("manifests have overlapping reasons"). + et.Add(clues.New("manifests have overlapping reasons"). WithClues(ctx). With("other_manifest_id", b)) @@ -177,7 +178,7 @@ func verifyDistinctBases(ctx context.Context, mans []*kopia.ManifestEntry, errs return clues.New("multiple base snapshots qualify").WithClues(ctx) } - return errs.Err() + return et.Err() } // collectMetadata retrieves all metadata files associated with the manifest. diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 1a65f406c..596735bf6 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -351,16 +351,17 @@ func formatDetailsForRestoration( fdsPaths = fds.Paths() paths = make([]path.Path, len(fdsPaths)) shortRefs = make([]string, len(fdsPaths)) + et = errs.Tracker() ) for i := range fdsPaths { - if errs.Err() != nil { - return nil, errs.Err() + if et.Err() != nil { + break } p, err := path.FromDataLayerPath(fdsPaths[i], true) if err != nil { - errs.Add(clues. + et.Add(clues. Wrap(err, "parsing details path after reduction"). WithMap(clues.In(ctx)). With("path", fdsPaths[i])) @@ -385,5 +386,5 @@ func formatDetailsForRestoration( logger.Ctx(ctx).With("short_refs", shortRefs).Infof("found %d details entries to restore", len(shortRefs)) - return paths, errs.Err() + return paths, et.Err() } diff --git a/src/pkg/fault/example_fault_test.go b/src/pkg/fault/example_fault_test.go index 04a125e50..904f27c63 100644 --- a/src/pkg/fault/example_fault_test.go +++ b/src/pkg/fault/example_fault_test.go @@ -224,8 +224,41 @@ func ExampleErrors_Errs() { // Err() is nil } -// ExampleErrors_e2e showcases a more complex integration. -func ExampleErrors_e2e() { +func ExampleErrors_Tracker() { + // It is common for Corso to run operations in parallel, + // and for iterations to be nested within iterations. To + // avoid mistakenly returning an error that was sourced + // from some other async iteration, recoverable instances + // are aggrgated into a Tracker. + errs := fault.New(false) + trkr := errs.Tracker() + + err := func() error { + for i := range items { + if trkr.Err() != nil { + break + } + + if err := getIthItem(i); err != nil { + // instead of calling errs.Add(err), we call the + // trackers Add method. The error will still get + // added to the errs.Errs() set. But if this err + // causes the run to fail, only this tracker treats + // it as the causal failure. + trkr.Add(err) + } + } + + return trkr.Err() + }() + if err != nil { + // handle the Err() that appeared in the tracker + fmt.Println("err occurred", errs.Err()) + } +} + +// ExampleErrorsE2e showcases a more complex integration. +func Example_errors_e2e() { oper := newOperation() // imagine that we're a user, calling into corso SDK. diff --git a/src/pkg/fault/fault.go b/src/pkg/fault/fault.go index 71de706a7..b67cb530a 100644 --- a/src/pkg/fault/fault.go +++ b/src/pkg/fault/fault.go @@ -124,3 +124,48 @@ func (e *Errors) addErr(err error) *Errors { return e } + +// --------------------------------------------------------------------------- +// Iteration Tracker +// --------------------------------------------------------------------------- + +// Tracker constructs a new errors tracker for aggregating errors +// in a single iteration loop. Trackers shouldn't be passed down +// to other funcs, and the function that spawned the tracker should +// always return `tracker.Err()` to ensure that hard failures are +// propagated upstream. +func (e *Errors) Tracker() *tracker { + return &tracker{ + mu: &sync.Mutex{}, + errs: e, + } +} + +type tracker struct { + mu *sync.Mutex + errs *Errors + current error +} + +func (e *tracker) Add(err error) { + if err == nil { + return + } + + e.mu.Lock() + defer e.mu.Unlock() + + if e.current == nil && e.errs.failFast { + e.current = err + } + + e.errs.Add(err) +} + +// Err returns the primary error in the tracker. Will be nil if the +// original Errors is set to bestEffort handling. Does not return the +// underlying Errors.Err(). Should be called as the return value of +// any func which created a new tracker. +func (e *tracker) Err() error { + return e.current +} diff --git a/src/pkg/fault/fault_test.go b/src/pkg/fault/fault_test.go index 4a995581e..9b68797af 100644 --- a/src/pkg/fault/fault_test.go +++ b/src/pkg/fault/fault_test.go @@ -251,3 +251,29 @@ func (suite *FaultErrorsUnitSuite) TestUnmarshalLegacy() { err = json.Unmarshal(jsonStr, &um) require.NoError(t, err) } + +func (suite *FaultErrorsUnitSuite) TestTracker() { + t := suite.T() + + be := fault.New(false) + + ba := be.Tracker() + assert.NoError(t, ba.Err()) + assert.Empty(t, be.Errs()) + + ba.Add(assert.AnError) + assert.NoError(t, ba.Err()) + assert.NoError(t, be.Err()) + assert.NotEmpty(t, be.Errs()) + + fe := fault.New(true) + + fa := fe.Tracker() + assert.NoError(t, fa.Err()) + assert.Empty(t, fe.Errs()) + + fa.Add(assert.AnError) + assert.Error(t, fa.Err()) + assert.Error(t, fe.Err()) + assert.NotEmpty(t, fe.Errs()) +}