add fault.tracker for error additions (#2510)

## Description

Realized we had a race condition: in an async
runtime it's possible for an errs.Err() to be
returned by multiple functions, even though that
Err() was only sourced by one of them.  The
addition of a tracker contains the returned
error into the scope of that func so that only
the error produced in the current iteration is
returned.

## Does this PR need a docs update or release note?

- [x]  No 

## Type of change

- [x] 🧹 Tech Debt/Cleanup

## Issue(s)

* #1970

## Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-02-21 21:51:06 -07:00 committed by GitHub
parent 9dab7c0010
commit b95231d85f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 251 additions and 102 deletions

View File

@ -108,16 +108,19 @@ func (c Users) GetAll(ctx context.Context, errs *fault.Errors) ([]models.Userabl
return nil, clues.Wrap(err, "creating users iterator").WithClues(ctx).With(graph.ErrData(err)...) return nil, clues.Wrap(err, "creating users iterator").WithClues(ctx).With(graph.ErrData(err)...)
} }
us := make([]models.Userable, 0) var (
us = make([]models.Userable, 0)
et = errs.Tracker()
)
iterator := func(item any) bool { iterator := func(item any) bool {
if errs.Err() != nil { if et.Err() != nil {
return false return false
} }
u, err := validateUser(item) u, err := validateUser(item)
if err != nil { if err != nil {
errs.Add(clues.Wrap(err, "validating user").WithClues(ctx).With(graph.ErrData(err)...)) et.Add(clues.Wrap(err, "validating user").WithClues(ctx).With(graph.ErrData(err)...))
} else { } else {
us = append(us, u) us = append(us, u)
} }
@ -129,7 +132,7 @@ func (c Users) GetAll(ctx context.Context, errs *fault.Errors) ([]models.Userabl
return nil, clues.Wrap(err, "iterating all users").WithClues(ctx).With(graph.ErrData(err)...) return nil, clues.Wrap(err, "iterating all users").WithClues(ctx).With(graph.ErrData(err)...)
} }
return us, errs.Err() return us, et.Err()
} }
func (c Users) GetByID(ctx context.Context, userID string) (models.Userable, error) { func (c Users) GetByID(ctx context.Context, userID string) (models.Userable, error) {

View File

@ -178,6 +178,7 @@ func DataCollections(
var ( var (
user = selector.DiscreteOwner user = selector.DiscreteOwner
collections = []data.BackupCollection{} collections = []data.BackupCollection{}
et = errs.Tracker()
) )
cdps, err := parseMetadataCollections(ctx, metadata, errs) cdps, err := parseMetadataCollections(ctx, metadata, errs)
@ -186,7 +187,7 @@ func DataCollections(
} }
for _, scope := range eb.Scopes() { for _, scope := range eb.Scopes() {
if errs.Err() != nil { if et.Err() != nil {
break break
} }
@ -200,14 +201,14 @@ func DataCollections(
su, su,
errs) errs)
if err != nil { if err != nil {
errs.Add(err) et.Add(err)
continue continue
} }
collections = append(collections, dcs...) collections = append(collections, dcs...)
} }
return collections, nil, errs.Err() return collections, nil, et.Err()
} }
func getterByType(ac api.Client, category path.CategoryType) (addedAndRemovedItemIDsGetter, error) { func getterByType(ac api.Client, category path.CategoryType) (addedAndRemovedItemIDsGetter, error) {

View File

@ -68,9 +68,11 @@ func filterContainersAndFillCollections(
return err return err
} }
et := errs.Tracker()
for _, c := range resolver.Items() { for _, c := range resolver.Items() {
if errs.Err() != nil { if et.Err() != nil {
return errs.Err() return et.Err()
} }
cID := *c.GetId() cID := *c.GetId()
@ -100,7 +102,7 @@ func filterContainersAndFillCollections(
added, removed, newDelta, err := getter.GetAddedAndRemovedItemIDs(ctx, qp.ResourceOwner, cID, prevDelta) added, removed, newDelta, err := getter.GetAddedAndRemovedItemIDs(ctx, qp.ResourceOwner, cID, prevDelta)
if err != nil { if err != nil {
if !graph.IsErrDeletedInFlight(err) { if !graph.IsErrDeletedInFlight(err) {
errs.Add(err) et.Add(err)
continue continue
} }
@ -155,8 +157,12 @@ func filterContainersAndFillCollections(
// in the `previousPath` set, but does not exist in the current container // in the `previousPath` set, but does not exist in the current container
// resolver (which contains all the resource owners' current containers). // resolver (which contains all the resource owners' current containers).
for id, p := range tombstones { for id, p := range tombstones {
if et.Err() != nil {
return et.Err()
}
if collections[id] != nil { if collections[id] != nil {
errs.Add(clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ctx)) et.Add(clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ctx))
continue continue
} }
@ -205,15 +211,14 @@ func filterContainersAndFillCollections(
path.ExchangeService, path.ExchangeService,
qp.Category, qp.Category,
entries, entries,
statusUpdater, statusUpdater)
)
if err != nil { if err != nil {
return clues.Wrap(err, "making metadata collection") return clues.Wrap(err, "making metadata collection")
} }
collections["metadata"] = col collections["metadata"] = col
return errs.Err() return et.Err()
} }
// produces a set of id:path pairs from the deltapaths map. // produces a set of id:path pairs from the deltapaths map.

View File

@ -112,6 +112,7 @@ func RestoreExchangeEvent(
ctx = clues.Add(ctx, "item_id", ptr.Val(event.GetId())) ctx = clues.Add(ctx, "item_id", ptr.Val(event.GetId()))
var ( var (
et = errs.Tracker()
transformedEvent = support.ToEventSimplified(event) transformedEvent = support.ToEventSimplified(event)
attached []models.Attachmentable attached []models.Attachmentable
) )
@ -139,19 +140,19 @@ func RestoreExchangeEvent(
} }
for _, attach := range attached { for _, attach := range attached {
if errs.Err() != nil { if et.Err() != nil {
break break
} }
if err := uploadAttachment(ctx, uploader, attach); err != nil { if err := uploadAttachment(ctx, uploader, attach); err != nil {
errs.Add(err) et.Add(err)
} }
} }
info := api.EventInfo(event) info := api.EventInfo(event)
info.Size = int64(len(bits)) info.Size = int64(len(bits))
return info, errs.Err() return info, et.Err()
} }
// RestoreMailMessage utility function to place an exchange.Mail // RestoreMailMessage utility function to place an exchange.Mail
@ -255,17 +256,19 @@ func SendMailToBackStore(
return clues.New("nil response from post").WithClues(ctx) return clues.New("nil response from post").WithClues(ctx)
} }
id := ptr.Val(response.GetId()) var (
et = errs.Tracker()
uploader := &mailAttachmentUploader{ id = ptr.Val(response.GetId())
uploader = &mailAttachmentUploader{
userID: user, userID: user,
folderID: destination, folderID: destination,
itemID: id, itemID: id,
service: service, service: service,
} }
)
for _, attachment := range attached { for _, attachment := range attached {
if errs.Err() != nil { if et.Err() != nil {
break break
} }
@ -280,13 +283,13 @@ func SendMailToBackStore(
continue continue
} }
errs.Add(errors.Wrap(err, "uploading mail attachment")) et.Add(errors.Wrap(err, "uploading mail attachment"))
break break
} }
} }
return errs.Err() return et.Err()
} }
// RestoreExchangeDataCollections restores M365 objects in data.RestoreCollection to MSFT // RestoreExchangeDataCollections restores M365 objects in data.RestoreCollection to MSFT
@ -307,6 +310,7 @@ func RestoreExchangeDataCollections(
userID string userID string
// TODO policy to be updated from external source after completion of refactoring // TODO policy to be updated from external source after completion of refactoring
policy = control.Copy policy = control.Copy
et = errs.Tracker()
) )
if len(dcs) > 0 { if len(dcs) > 0 {
@ -315,8 +319,8 @@ func RestoreExchangeDataCollections(
} }
for _, dc := range dcs { for _, dc := range dcs {
if errs.Err() != nil { if et.Err() != nil {
return nil, errs.Err() break
} }
userCaches := directoryCaches[userID] userCaches := directoryCaches[userID]
@ -333,7 +337,7 @@ func RestoreExchangeDataCollections(
userCaches, userCaches,
errs) errs)
if err != nil { if err != nil {
errs.Add(clues.Wrap(err, "creating destination").WithClues(ctx)) et.Add(clues.Wrap(err, "creating destination").WithClues(ctx))
continue continue
} }
@ -351,10 +355,10 @@ func RestoreExchangeDataCollections(
support.Restore, support.Restore,
len(dcs), len(dcs),
metrics, metrics,
errs.Err(), et.Err(),
dest.ContainerName) dest.ContainerName)
return status, errs.Err() return status, et.Err()
} }
// restoreCollection handles restoration of an individual collection. // restoreCollection handles restoration of an individual collection.

View File

@ -320,15 +320,17 @@ func getResources(
return nil, clues.Stack(err).WithClues(ctx).With(graph.ErrData(err)...) return nil, clues.Stack(err).WithClues(ctx).With(graph.ErrData(err)...)
} }
et := errs.Tracker()
callbackFunc := func(item any) bool { callbackFunc := func(item any) bool {
if errs.Err() != nil { if et.Err() != nil {
return false return false
} }
k, v, err := identify(item) k, v, err := identify(item)
if err != nil { if err != nil {
if !errors.Is(err, errKnownSkippableCase) { if !errors.Is(err, errKnownSkippableCase) {
errs.Add(clues.Stack(err). et.Add(clues.Stack(err).
WithClues(ctx). WithClues(ctx).
With("query_url", gs.Adapter().GetBaseUrl())) With("query_url", gs.Adapter().GetBaseUrl()))
} }
@ -345,5 +347,5 @@ func getResources(
return nil, clues.Stack(err).WithClues(ctx).With(graph.ErrData(err)...) return nil, clues.Stack(err).WithClues(ctx).With(graph.ErrData(err)...)
} }
return resources, errs.Err() return resources, et.Err()
} }

View File

@ -34,7 +34,7 @@ func GetSitePages(
col = make([]models.SitePageable, 0) col = make([]models.SitePageable, 0)
semaphoreCh = make(chan struct{}, fetchChannelSize) semaphoreCh = make(chan struct{}, fetchChannelSize)
opts = retrieveSitePageOptions() opts = retrieveSitePageOptions()
err error et = errs.Tracker()
wg sync.WaitGroup wg sync.WaitGroup
m sync.Mutex m sync.Mutex
) )
@ -49,7 +49,7 @@ func GetSitePages(
} }
for _, entry := range pages { for _, entry := range pages {
if errs.Err() != nil { if et.Err() != nil {
break break
} }
@ -61,11 +61,14 @@ func GetSitePages(
defer wg.Done() defer wg.Done()
defer func() { <-semaphoreCh }() defer func() { <-semaphoreCh }()
var page models.SitePageable var (
page models.SitePageable
err error
)
page, err = serv.Client().SitesById(siteID).PagesById(pageID).Get(ctx, opts) page, err = serv.Client().SitesById(siteID).PagesById(pageID).Get(ctx, opts)
if err != nil { if err != nil {
errs.Add(clues.Wrap(err, "fetching page").WithClues(ctx).With(graph.ErrData(err)...)) et.Add(clues.Wrap(err, "fetching page").WithClues(ctx).With(graph.ErrData(err)...))
return return
} }
@ -75,7 +78,7 @@ func GetSitePages(
wg.Wait() wg.Wait()
return col, errs.Err() return col, et.Err()
} }
// fetchPages utility function to return the tuple of item // fetchPages utility function to return the tuple of item

View File

@ -223,7 +223,10 @@ func (sc *Collection) retrieveLists(
progress chan<- struct{}, progress chan<- struct{},
errs *fault.Errors, errs *fault.Errors,
) (numMetrics, error) { ) (numMetrics, error) {
var metrics numMetrics var (
metrics numMetrics
et = errs.Tracker()
)
lists, err := loadSiteLists(ctx, sc.service, sc.fullPath.ResourceOwner(), sc.jobs, errs) lists, err := loadSiteLists(ctx, sc.service, sc.fullPath.ResourceOwner(), sc.jobs, errs)
if err != nil { if err != nil {
@ -234,13 +237,13 @@ func (sc *Collection) retrieveLists(
// For each models.Listable, object is serialized and the metrics are collected. // For each models.Listable, object is serialized and the metrics are collected.
// The progress is objected via the passed in channel. // The progress is objected via the passed in channel.
for _, lst := range lists { for _, lst := range lists {
if errs.Err() != nil { if et.Err() != nil {
break break
} }
byteArray, err := serializeContent(wtr, lst) byteArray, err := serializeContent(wtr, lst)
if err != nil { if err != nil {
errs.Add(clues.Wrap(err, "serializing list").WithClues(ctx)) et.Add(clues.Wrap(err, "serializing list").WithClues(ctx))
continue continue
} }
@ -266,7 +269,7 @@ func (sc *Collection) retrieveLists(
} }
} }
return metrics, errs.Err() return metrics, et.Err()
} }
func (sc *Collection) retrievePages( func (sc *Collection) retrievePages(
@ -275,7 +278,10 @@ func (sc *Collection) retrievePages(
progress chan<- struct{}, progress chan<- struct{},
errs *fault.Errors, errs *fault.Errors,
) (numMetrics, error) { ) (numMetrics, error) {
var metrics numMetrics var (
metrics numMetrics
et = errs.Tracker()
)
betaService := sc.betaService betaService := sc.betaService
if betaService == nil { if betaService == nil {
@ -292,13 +298,13 @@ func (sc *Collection) retrievePages(
// Pageable objects are not supported in v1.0 of msgraph at this time. // Pageable objects are not supported in v1.0 of msgraph at this time.
// TODO: Verify Parsable interface supported with modified-Pageable // TODO: Verify Parsable interface supported with modified-Pageable
for _, pg := range pages { for _, pg := range pages {
if errs.Err() != nil { if et.Err() != nil {
break break
} }
byteArray, err := serializeContent(wtr, pg) byteArray, err := serializeContent(wtr, pg)
if err != nil { if err != nil {
errs.Add(clues.Wrap(err, "serializing page").WithClues(ctx)) et.Add(clues.Wrap(err, "serializing page").WithClues(ctx))
continue continue
} }
@ -318,7 +324,7 @@ func (sc *Collection) retrievePages(
} }
} }
return metrics, errs.Err() return metrics, et.Err()
} }
func serializeContent(writer *kw.JsonSerializationWriter, obj absser.Parsable) ([]byte, error) { func serializeContent(writer *kw.JsonSerializationWriter, obj absser.Parsable) ([]byte, error) {

View File

@ -44,12 +44,13 @@ func DataCollections(
} }
var ( var (
et = errs.Tracker()
site = b.DiscreteOwner site = b.DiscreteOwner
collections = []data.BackupCollection{} collections = []data.BackupCollection{}
) )
for _, scope := range b.Scopes() { for _, scope := range b.Scopes() {
if errs.Err() != nil { if et.Err() != nil {
break break
} }
@ -73,7 +74,7 @@ func DataCollections(
ctrlOpts, ctrlOpts,
errs) errs)
if err != nil { if err != nil {
errs.Add(err) et.Add(err)
continue continue
} }
@ -88,7 +89,7 @@ func DataCollections(
su, su,
ctrlOpts) ctrlOpts)
if err != nil { if err != nil {
errs.Add(err) et.Add(err)
continue continue
} }
@ -102,7 +103,7 @@ func DataCollections(
ctrlOpts, ctrlOpts,
errs) errs)
if err != nil { if err != nil {
errs.Add(err) et.Add(err)
continue continue
} }
} }
@ -111,7 +112,7 @@ func DataCollections(
foldersComplete <- struct{}{} foldersComplete <- struct{}{}
} }
return collections, nil, errs.Err() return collections, nil, et.Err()
} }
func collectLists( func collectLists(
@ -124,7 +125,10 @@ func collectLists(
) ([]data.BackupCollection, error) { ) ([]data.BackupCollection, error) {
logger.Ctx(ctx).With("site", siteID).Debug("Creating SharePoint List Collections") logger.Ctx(ctx).With("site", siteID).Debug("Creating SharePoint List Collections")
spcs := make([]data.BackupCollection, 0) var (
et = errs.Tracker()
spcs = make([]data.BackupCollection, 0)
)
lists, err := preFetchLists(ctx, serv, siteID) lists, err := preFetchLists(ctx, serv, siteID)
if err != nil { if err != nil {
@ -132,7 +136,7 @@ func collectLists(
} }
for _, tuple := range lists { for _, tuple := range lists {
if errs.Err() != nil { if et.Err() != nil {
break break
} }
@ -143,7 +147,7 @@ func collectLists(
path.ListsCategory, path.ListsCategory,
false) false)
if err != nil { if err != nil {
errs.Add(clues.Wrap(err, "creating list collection path").WithClues(ctx)) et.Add(clues.Wrap(err, "creating list collection path").WithClues(ctx))
} }
collection := NewCollection(dir, serv, List, updater.UpdateStatus, ctrlOpts) collection := NewCollection(dir, serv, List, updater.UpdateStatus, ctrlOpts)
@ -152,7 +156,7 @@ func collectLists(
spcs = append(spcs, collection) spcs = append(spcs, collection)
} }
return spcs, errs.Err() return spcs, et.Err()
} }
// collectLibraries constructs a onedrive Collections struct and Get()s // collectLibraries constructs a onedrive Collections struct and Get()s
@ -204,7 +208,10 @@ func collectPages(
) ([]data.BackupCollection, error) { ) ([]data.BackupCollection, error) {
logger.Ctx(ctx).Debug("creating SharePoint Pages collections") logger.Ctx(ctx).Debug("creating SharePoint Pages collections")
spcs := make([]data.BackupCollection, 0) var (
et = errs.Tracker()
spcs = make([]data.BackupCollection, 0)
)
// make the betaClient // make the betaClient
// Need to receive From DataCollection Call // Need to receive From DataCollection Call
@ -221,7 +228,7 @@ func collectPages(
} }
for _, tuple := range tuples { for _, tuple := range tuples {
if errs.Err() != nil { if et.Err() != nil {
break break
} }
@ -232,7 +239,7 @@ func collectPages(
path.PagesCategory, path.PagesCategory,
false) false)
if err != nil { if err != nil {
errs.Add(clues.Wrap(err, "creating page collection path").WithClues(ctx)) et.Add(clues.Wrap(err, "creating page collection path").WithClues(ctx))
} }
collection := NewCollection(dir, serv, Pages, updater.UpdateStatus, ctrlOpts) collection := NewCollection(dir, serv, Pages, updater.UpdateStatus, ctrlOpts)
@ -242,7 +249,7 @@ func collectPages(
spcs = append(spcs, collection) spcs = append(spcs, collection)
} }
return spcs, errs.Err() return spcs, et.Err()
} }
type folderMatcher struct { type folderMatcher struct {

View File

@ -97,6 +97,7 @@ func loadSiteLists(
var ( var (
results = make([]models.Listable, 0) results = make([]models.Listable, 0)
semaphoreCh = make(chan struct{}, fetchChannelSize) semaphoreCh = make(chan struct{}, fetchChannelSize)
et = errs.Tracker()
wg sync.WaitGroup wg sync.WaitGroup
m sync.Mutex m sync.Mutex
) )
@ -111,8 +112,8 @@ func loadSiteLists(
} }
for _, listID := range listIDs { for _, listID := range listIDs {
if errs.Err() != nil { if et.Err() != nil {
return nil, errs.Err() break
} }
semaphoreCh <- struct{}{} semaphoreCh <- struct{}{}
@ -130,13 +131,13 @@ func loadSiteLists(
entry, err = gs.Client().SitesById(siteID).ListsById(id).Get(ctx, nil) entry, err = gs.Client().SitesById(siteID).ListsById(id).Get(ctx, nil)
if err != nil { if err != nil {
errs.Add(clues.Wrap(err, "getting site list").WithClues(ctx).With(graph.ErrData(err)...)) et.Add(clues.Wrap(err, "getting site list").WithClues(ctx).With(graph.ErrData(err)...))
return return
} }
cols, cTypes, lItems, err := fetchListContents(ctx, gs, siteID, id, errs) cols, cTypes, lItems, err := fetchListContents(ctx, gs, siteID, id, errs)
if err != nil { if err != nil {
errs.Add(clues.Wrap(err, "getting list contents")) et.Add(clues.Wrap(err, "getting list contents"))
return return
} }
@ -149,7 +150,7 @@ func loadSiteLists(
wg.Wait() wg.Wait()
return results, errs.Err() return results, et.Err()
} }
// fetchListContents utility function to retrieve associated M365 relationships // fetchListContents utility function to retrieve associated M365 relationships
@ -198,6 +199,7 @@ func fetchListItems(
prefix = gs.Client().SitesById(siteID).ListsById(listID) prefix = gs.Client().SitesById(siteID).ListsById(listID)
builder = prefix.Items() builder = prefix.Items()
itms = make([]models.ListItemable, 0) itms = make([]models.ListItemable, 0)
et = errs.Tracker()
) )
for { for {
@ -211,7 +213,7 @@ func fetchListItems(
} }
for _, itm := range resp.GetValue() { for _, itm := range resp.GetValue() {
if errs.Err() != nil { if et.Err() != nil {
break break
} }
@ -219,7 +221,7 @@ func fetchListItems(
fields, err := newPrefix.Fields().Get(ctx, nil) fields, err := newPrefix.Fields().Get(ctx, nil)
if err != nil { if err != nil {
errs.Add(clues.Wrap(err, "getting list fields").WithClues(ctx).With(graph.ErrData(err)...)) et.Add(clues.Wrap(err, "getting list fields").WithClues(ctx).With(graph.ErrData(err)...))
continue continue
} }
@ -235,7 +237,7 @@ func fetchListItems(
builder = mssite.NewItemListsItemItemsRequestBuilder(*resp.GetOdataNextLink(), gs.Adapter()) builder = mssite.NewItemListsItemItemsRequestBuilder(*resp.GetOdataNextLink(), gs.Adapter())
} }
return itms, errs.Err() return itms, et.Err()
} }
// fetchColumns utility function to return columns from a site. // fetchColumns utility function to return columns from a site.
@ -301,6 +303,7 @@ func fetchContentTypes(
errs *fault.Errors, errs *fault.Errors,
) ([]models.ContentTypeable, error) { ) ([]models.ContentTypeable, error) {
var ( var (
et = errs.Tracker()
cTypes = make([]models.ContentTypeable, 0) cTypes = make([]models.ContentTypeable, 0)
builder = gs.Client().SitesById(siteID).ListsById(listID).ContentTypes() builder = gs.Client().SitesById(siteID).ListsById(listID).ContentTypes()
) )
@ -316,7 +319,7 @@ func fetchContentTypes(
} }
for _, cont := range resp.GetValue() { for _, cont := range resp.GetValue() {
if errs.Err() != nil { if et.Err() != nil {
break break
} }
@ -324,7 +327,7 @@ func fetchContentTypes(
links, err := fetchColumnLinks(ctx, gs, siteID, listID, id) links, err := fetchColumnLinks(ctx, gs, siteID, listID, id)
if err != nil { if err != nil {
errs.Add(err) et.Add(err)
continue continue
} }
@ -332,7 +335,7 @@ func fetchContentTypes(
cs, err := fetchColumns(ctx, gs, siteID, listID, id) cs, err := fetchColumns(ctx, gs, siteID, listID, id)
if err != nil { if err != nil {
errs.Add(err) et.Add(err)
continue continue
} }
@ -348,7 +351,7 @@ func fetchContentTypes(
builder = mssite.NewItemListsItemContentTypesRequestBuilder(*resp.GetOdataNextLink(), gs.Adapter()) builder = mssite.NewItemListsItemContentTypesRequestBuilder(*resp.GetOdataNextLink(), gs.Adapter())
} }
return cTypes, errs.Err() return cTypes, et.Err()
} }
func fetchColumnLinks( func fetchColumnLinks(

View File

@ -226,13 +226,14 @@ func RestoreListCollection(
directory = dc.FullPath() directory = dc.FullPath()
siteID = directory.ResourceOwner() siteID = directory.ResourceOwner()
items = dc.Items(ctx, errs) items = dc.Items(ctx, errs)
et = errs.Tracker()
) )
trace.Log(ctx, "gc:sharepoint:restoreListCollection", directory.String()) trace.Log(ctx, "gc:sharepoint:restoreListCollection", directory.String())
for { for {
if errs.Err() != nil { if et.Err() != nil {
return metrics, errs.Err() break
} }
select { select {
@ -252,7 +253,7 @@ func RestoreListCollection(
siteID, siteID,
restoreContainerName) restoreContainerName)
if err != nil { if err != nil {
errs.Add(err) et.Add(err)
continue continue
} }
@ -260,7 +261,7 @@ func RestoreListCollection(
itemPath, err := dc.FullPath().Append(itemData.UUID(), true) itemPath, err := dc.FullPath().Append(itemData.UUID(), true)
if err != nil { if err != nil {
errs.Add(clues.Wrap(err, "appending item to full path").WithClues(ctx)) et.Add(clues.Wrap(err, "appending item to full path").WithClues(ctx))
continue continue
} }
@ -275,6 +276,8 @@ func RestoreListCollection(
metrics.Successes++ metrics.Successes++
} }
} }
return metrics, et.Err()
} }
// RestorePageCollection handles restoration of an individual site page collection. // RestorePageCollection handles restoration of an individual site page collection.
@ -305,14 +308,15 @@ func RestorePageCollection(
return metrics, clues.Wrap(err, "constructing graph client") return metrics, clues.Wrap(err, "constructing graph client")
} }
service := discover.NewBetaService(adpt) var (
et = errs.Tracker()
// Restore items from collection service = discover.NewBetaService(adpt)
items := dc.Items(ctx, errs) items = dc.Items(ctx, errs)
)
for { for {
if errs.Err() != nil { if et.Err() != nil {
return metrics, errs.Err() break
} }
select { select {
@ -332,7 +336,7 @@ func RestorePageCollection(
siteID, siteID,
restoreContainerName) restoreContainerName)
if err != nil { if err != nil {
errs.Add(err) et.Add(err)
continue continue
} }
@ -340,7 +344,7 @@ func RestorePageCollection(
itemPath, err := dc.FullPath().Append(itemData.UUID(), true) itemPath, err := dc.FullPath().Append(itemData.UUID(), true)
if err != nil { if err != nil {
errs.Add(clues.Wrap(err, "appending item to full path").WithClues(ctx)) et.Add(clues.Wrap(err, "appending item to full path").WithClues(ctx))
continue continue
} }
@ -355,4 +359,6 @@ func RestorePageCollection(
metrics.Successes++ metrics.Successes++
} }
} }
return metrics, et.Err()
} }

View File

@ -397,23 +397,26 @@ func (w Wrapper) RestoreMultipleItems(
return nil, err return nil, err
} }
var (
// Maps short ID of parent path to data collection for that folder. // Maps short ID of parent path to data collection for that folder.
cols := map[string]*kopiaDataCollection{} cols = map[string]*kopiaDataCollection{}
et = errs.Tracker()
)
for _, itemPath := range paths { for _, itemPath := range paths {
if errs.Err() != nil { if et.Err() != nil {
return nil, errs.Err() return nil, et.Err()
} }
ds, err := getItemStream(ctx, itemPath, snapshotRoot, bcounter) ds, err := getItemStream(ctx, itemPath, snapshotRoot, bcounter)
if err != nil { if err != nil {
errs.Add(err) et.Add(err)
continue continue
} }
parentPath, err := itemPath.Dir() parentPath, err := itemPath.Dir()
if err != nil { if err != nil {
errs.Add(clues.Wrap(err, "making directory collection").WithClues(ctx)) et.Add(clues.Wrap(err, "making directory collection").WithClues(ctx))
continue continue
} }
@ -437,7 +440,7 @@ func (w Wrapper) RestoreMultipleItems(
res = append(res, c) res = append(res, c)
} }
return res, errs.Err() return res, et.Err()
} }
// DeleteSnapshot removes the provided manifest from kopia. // DeleteSnapshot removes the provided manifest from kopia.

View File

@ -139,10 +139,11 @@ func verifyDistinctBases(ctx context.Context, mans []*kopia.ManifestEntry, errs
var ( var (
failed bool failed bool
reasons = map[string]manifest.ID{} reasons = map[string]manifest.ID{}
et = errs.Tracker()
) )
for _, man := range mans { for _, man := range mans {
if errs.Err() != nil { if et.Err() != nil {
break break
} }
@ -162,7 +163,7 @@ func verifyDistinctBases(ctx context.Context, mans []*kopia.ManifestEntry, errs
if b, ok := reasons[reasonKey]; ok { if b, ok := reasons[reasonKey]; ok {
failed = true failed = true
errs.Add(clues.New("manifests have overlapping reasons"). et.Add(clues.New("manifests have overlapping reasons").
WithClues(ctx). WithClues(ctx).
With("other_manifest_id", b)) With("other_manifest_id", b))
@ -177,7 +178,7 @@ func verifyDistinctBases(ctx context.Context, mans []*kopia.ManifestEntry, errs
return clues.New("multiple base snapshots qualify").WithClues(ctx) return clues.New("multiple base snapshots qualify").WithClues(ctx)
} }
return errs.Err() return et.Err()
} }
// collectMetadata retrieves all metadata files associated with the manifest. // collectMetadata retrieves all metadata files associated with the manifest.

View File

@ -351,16 +351,17 @@ func formatDetailsForRestoration(
fdsPaths = fds.Paths() fdsPaths = fds.Paths()
paths = make([]path.Path, len(fdsPaths)) paths = make([]path.Path, len(fdsPaths))
shortRefs = make([]string, len(fdsPaths)) shortRefs = make([]string, len(fdsPaths))
et = errs.Tracker()
) )
for i := range fdsPaths { for i := range fdsPaths {
if errs.Err() != nil { if et.Err() != nil {
return nil, errs.Err() break
} }
p, err := path.FromDataLayerPath(fdsPaths[i], true) p, err := path.FromDataLayerPath(fdsPaths[i], true)
if err != nil { if err != nil {
errs.Add(clues. et.Add(clues.
Wrap(err, "parsing details path after reduction"). Wrap(err, "parsing details path after reduction").
WithMap(clues.In(ctx)). WithMap(clues.In(ctx)).
With("path", fdsPaths[i])) With("path", fdsPaths[i]))
@ -385,5 +386,5 @@ func formatDetailsForRestoration(
logger.Ctx(ctx).With("short_refs", shortRefs).Infof("found %d details entries to restore", len(shortRefs)) logger.Ctx(ctx).With("short_refs", shortRefs).Infof("found %d details entries to restore", len(shortRefs))
return paths, errs.Err() return paths, et.Err()
} }

View File

@ -224,8 +224,41 @@ func ExampleErrors_Errs() {
// Err() is nil // Err() is nil
} }
// ExampleErrors_e2e showcases a more complex integration. func ExampleErrors_Tracker() {
func ExampleErrors_e2e() { // It is common for Corso to run operations in parallel,
// and for iterations to be nested within iterations. To
// avoid mistakenly returning an error that was sourced
// from some other async iteration, recoverable instances
// are aggrgated into a Tracker.
errs := fault.New(false)
trkr := errs.Tracker()
err := func() error {
for i := range items {
if trkr.Err() != nil {
break
}
if err := getIthItem(i); err != nil {
// instead of calling errs.Add(err), we call the
// trackers Add method. The error will still get
// added to the errs.Errs() set. But if this err
// causes the run to fail, only this tracker treats
// it as the causal failure.
trkr.Add(err)
}
}
return trkr.Err()
}()
if err != nil {
// handle the Err() that appeared in the tracker
fmt.Println("err occurred", errs.Err())
}
}
// ExampleErrorsE2e showcases a more complex integration.
func Example_errors_e2e() {
oper := newOperation() oper := newOperation()
// imagine that we're a user, calling into corso SDK. // imagine that we're a user, calling into corso SDK.

View File

@ -124,3 +124,48 @@ func (e *Errors) addErr(err error) *Errors {
return e return e
} }
// ---------------------------------------------------------------------------
// Iteration Tracker
// ---------------------------------------------------------------------------
// Tracker constructs a new errors tracker for aggregating errors
// in a single iteration loop. Trackers shouldn't be passed down
// to other funcs, and the function that spawned the tracker should
// always return `tracker.Err()` to ensure that hard failures are
// propagated upstream.
func (e *Errors) Tracker() *tracker {
return &tracker{
mu: &sync.Mutex{},
errs: e,
}
}
type tracker struct {
mu *sync.Mutex
errs *Errors
current error
}
func (e *tracker) Add(err error) {
if err == nil {
return
}
e.mu.Lock()
defer e.mu.Unlock()
if e.current == nil && e.errs.failFast {
e.current = err
}
e.errs.Add(err)
}
// Err returns the primary error in the tracker. Will be nil if the
// original Errors is set to bestEffort handling. Does not return the
// underlying Errors.Err(). Should be called as the return value of
// any func which created a new tracker.
func (e *tracker) Err() error {
return e.current
}

View File

@ -251,3 +251,29 @@ func (suite *FaultErrorsUnitSuite) TestUnmarshalLegacy() {
err = json.Unmarshal(jsonStr, &um) err = json.Unmarshal(jsonStr, &um)
require.NoError(t, err) require.NoError(t, err)
} }
func (suite *FaultErrorsUnitSuite) TestTracker() {
t := suite.T()
be := fault.New(false)
ba := be.Tracker()
assert.NoError(t, ba.Err())
assert.Empty(t, be.Errs())
ba.Add(assert.AnError)
assert.NoError(t, ba.Err())
assert.NoError(t, be.Err())
assert.NotEmpty(t, be.Errs())
fe := fault.New(true)
fa := fe.Tracker()
assert.NoError(t, fa.Err())
assert.Empty(t, fe.Errs())
fa.Add(assert.AnError)
assert.Error(t, fa.Err())
assert.Error(t, fe.Err())
assert.NotEmpty(t, fe.Errs())
}