From 871f0b406dfbd5bad7baf99abf61ddeae25078b3 Mon Sep 17 00:00:00 2001 From: Keepers Date: Fri, 10 Mar 2023 10:23:19 -0700 Subject: [PATCH] introduce fault items (#2710) Adds the item struct to the fault package for tracking serializable and dedupliatable error sources. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [ ] :sunflower: Feature #### Issue(s) * #2708 #### Test Plan - [x] :zap: Unit test --- CHANGELOG.md | 2 +- src/internal/connector/graph/errors.go | 31 ++++ src/internal/connector/graph/errors_test.go | 40 ++++ src/internal/connector/onedrive/collection.go | 7 +- .../connector/onedrive/collections.go | 31 ++-- .../connector/onedrive/collections_test.go | 26 ++- src/internal/operations/backup.go | 11 +- src/internal/stats/stats.go | 5 + src/pkg/backup/backup.go | 106 ++++++++--- src/pkg/backup/backup_test.go | 108 ++++++++--- src/pkg/fault/example_fault_test.go | 21 +++ src/pkg/fault/fault.go | 169 +++++++++++++---- src/pkg/fault/fault_test.go | 114 ++++++++++++ src/pkg/fault/item.go | 172 ++++++++++++++++++ src/pkg/fault/item_test.go | 145 +++++++++++++++ 15 files changed, 869 insertions(+), 119 deletions(-) create mode 100644 src/pkg/fault/item.go create mode 100644 src/pkg/fault/item_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e5e5af12..7945a6128 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,8 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Show owner information when doing backup list in json format -- Onedrive files that are flagged as malware get skipped during backup. - Permissions for groups can now be backed up and restored +- Onedrive files that are flagged as malware get skipped during backup. Skipped files are listed in the backup results as part of the status, including a reference to their categorization, eg: "Completed (0 errors, 1 skipped: 1 malware)". ### Fixed - Corso-generated .meta files and permissions no longer appear in the backup details. diff --git a/src/internal/connector/graph/errors.go b/src/internal/connector/graph/errors.go index 6561a58dd..ee37f6a0a 100644 --- a/src/internal/connector/graph/errors.go +++ b/src/internal/connector/graph/errors.go @@ -9,12 +9,14 @@ import ( "os" "strings" + "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors" "github.com/pkg/errors" "golang.org/x/exp/slices" "github.com/alcionai/clues" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" ) @@ -252,3 +254,32 @@ func appendIf(a []any, k string, v *string) []any { return append(a, k, *v) } + +// MalwareInfo gathers potentially useful information about a malware infected +// drive item, and aggregates that data into a map. +func MalwareInfo(item models.DriveItemable) map[string]any { + m := map[string]any{} + + creator := item.GetCreatedByUser() + if creator != nil { + m[fault.AddtlCreatedBy] = ptr.Val(creator.GetId()) + } + + lastmodder := item.GetLastModifiedByUser() + if lastmodder != nil { + m[fault.AddtlLastModBy] = ptr.Val(lastmodder.GetId()) + } + + parent := item.GetParentReference() + if parent != nil { + m[fault.AddtlContainerID] = ptr.Val(parent.GetId()) + m[fault.AddtlContainerName] = ptr.Val(parent.GetName()) + } + + malware := item.GetMalware() + if malware != nil { + m[fault.AddtlMalwareDesc] = ptr.Val(malware.GetDescription()) + } + + return m +} diff --git a/src/internal/connector/graph/errors_test.go b/src/internal/connector/graph/errors_test.go index cbc0bbb87..c34d0b4de 100644 --- a/src/internal/connector/graph/errors_test.go +++ b/src/internal/connector/graph/errors_test.go @@ -5,12 +5,14 @@ import ( "net/http" "testing" + "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/alcionai/clues" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/fault" ) type GraphErrorsUnitSuite struct { @@ -221,3 +223,41 @@ func (suite *GraphErrorsUnitSuite) TestIsErrUnauthorized() { }) } } + +func (suite *GraphErrorsUnitSuite) TestMalwareInfo() { + var ( + i = models.DriveItem{} + cb = models.User{} + cbID = "created-by" + lm = models.User{} + lmID = "last-mod-by" + ref = models.ItemReference{} + refCID = "container-id" + refCN = "container-name" + mal = models.Malware{} + malDesc = "malware-description" + ) + + cb.SetId(&cbID) + i.SetCreatedByUser(&cb) + + lm.SetId(&lmID) + i.SetLastModifiedByUser(&lm) + + ref.SetId(&refCID) + ref.SetName(&refCN) + i.SetParentReference(&ref) + + mal.SetDescription(&malDesc) + i.SetMalware(&mal) + + expect := map[string]any{ + fault.AddtlCreatedBy: cbID, + fault.AddtlLastModBy: lmID, + fault.AddtlContainerID: refCID, + fault.AddtlContainerName: refCN, + fault.AddtlMalwareDesc: malDesc, + } + + assert.Equal(suite.T(), expect, MalwareInfo(&i)) +} diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 3e928a099..6736e76cf 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -427,13 +427,16 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { // check for errors following retries if err != nil { - if clues.HasLabel(err, graph.LabelsMalware) { - logger.Ctx(ctx).Infow("malware item", clues.InErr(err).Slice()...) + if item.GetMalware() != nil || clues.HasLabel(err, graph.LabelsMalware) { + logger.Ctx(ctx).With("error", err.Error(), "malware", true).Error("downloading item") + el.AddSkip(fault.FileSkip(fault.SkipMalware, itemID, itemName, graph.MalwareInfo(item))) } else { logger.Ctx(ctx).With("error", err.Error()).Error("downloading item") el.AddRecoverable(clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) } + // return err, not el.Err(), because the lazy reader needs to communicate to + // the data consumer that this item is unreadable, regardless of the fault state. return nil, err } diff --git a/src/internal/connector/onedrive/collections.go b/src/internal/connector/onedrive/collections.go index 594d6b448..7cae3a88c 100644 --- a/src/internal/connector/onedrive/collections.go +++ b/src/internal/connector/onedrive/collections.go @@ -608,27 +608,26 @@ func (c *Collections) UpdateCollections( break } - if item.GetMalware() != nil { - // TODO: track the item as skipped; logging alone might - // slice out the data from tracking. - // https://learn.microsoft.com/en-us/graph/api/resources/malware?view=graph-rest-1.0 - logger.Ctx(ctx).Infow("malware detected", "malware_description", ptr.Val(item.GetMalware().GetDescription())) - continue - } - var ( itemID = ptr.Val(item.GetId()) - ictx = clues.Add(ctx, "update_item_id", itemID) + itemName = ptr.Val(item.GetName()) + ictx = clues.Add(ctx, "item_id", itemID, "item_name", itemName) isFolder = item.GetFolder() != nil || item.GetPackage() != nil ) - // TODO(meain): Use `@microsoft.graph.sharedChanged` in - // item.GetAdditionalData() to optimize fetching - // permissions. When permissions change, this flags is - // present, if only data changes, it is not present. Have to - // add `oneDrive.sharedChanged` in `$select` in delta - // https://learn.microsoft.com/en-us/onedrive/developer/rest-api - // /concepts/scan-guidance#scanning-permissions-hierarchies + if item.GetMalware() != nil { + addtl := graph.MalwareInfo(item) + skip := fault.FileSkip(fault.SkipMalware, itemID, itemName, addtl) + + if isFolder { + skip = fault.ContainerSkip(fault.SkipMalware, itemID, itemName, addtl) + } + + errs.AddSkip(skip) + logger.Ctx(ctx).Infow("malware detected", "item_details", addtl) + + continue + } // Deleted file or folder. if item.GetDeleted() != nil { diff --git a/src/internal/connector/onedrive/collections_test.go b/src/internal/connector/onedrive/collections_test.go index 1ca8614b8..9dee2bd6f 100644 --- a/src/internal/connector/onedrive/collections_test.go +++ b/src/internal/connector/onedrive/collections_test.go @@ -175,6 +175,7 @@ func (suite *OneDriveCollectionsSuite) TestUpdateCollections() { expectedItemCount int expectedContainerCount int expectedFileCount int + expectedSkippedCount int expectedMetadataPaths map[string]string expectedExcludes map[string]struct{} }{ @@ -745,6 +746,7 @@ func (suite *OneDriveCollectionsSuite) TestUpdateCollections() { expectedItemCount: 4, expectedFileCount: 2, expectedContainerCount: 3, + expectedSkippedCount: 1, expectedMetadataPaths: map[string]string{ "root": expectedPath(""), "folder": expectedPath("/folder"), @@ -775,6 +777,8 @@ func (suite *OneDriveCollectionsSuite) TestUpdateCollections() { control.Options{ToggleFeatures: control.Toggles{EnablePermissionsBackup: true}}) itemCollection := map[string]string{} + errs := fault.New(true) + err := c.UpdateCollections( ctx, "driveID1", @@ -785,12 +789,13 @@ func (suite *OneDriveCollectionsSuite) TestUpdateCollections() { excludes, itemCollection, false, - fault.New(true)) + errs) tt.expect(t, err) assert.Equal(t, len(tt.expectedCollectionIDs), len(c.CollectionMap), "total collections") assert.Equal(t, tt.expectedItemCount, c.NumItems, "item count") assert.Equal(t, tt.expectedFileCount, c.NumFiles, "file count") assert.Equal(t, tt.expectedContainerCount, c.NumContainers, "container count") + assert.Equal(t, tt.expectedSkippedCount, len(errs.Skipped()), "skipped items") for id, sp := range tt.expectedCollectionIDs { if !assert.Containsf(t, c.CollectionMap, id, "missing collection with id %s", id) { @@ -1261,11 +1266,12 @@ func (suite *OneDriveCollectionsSuite) TestGet() { prevFolderPaths map[string]map[string]string // Collection name -> set of item IDs. We can't check item data because // that's not mocked out. Metadata is checked separately. - expectedCollections map[string]map[data.CollectionState][]string - expectedDeltaURLs map[string]string - expectedFolderPaths map[string]map[string]string - expectedDelList map[string]struct{} - doNotMergeItems bool + expectedCollections map[string]map[data.CollectionState][]string + expectedDeltaURLs map[string]string + expectedFolderPaths map[string]map[string]string + expectedDelList map[string]struct{} + expectedSkippedCount int + doNotMergeItems bool }{ { name: "OneDrive_OneItemPage_DelFileOnly_NoFolders_NoErrors", @@ -1813,7 +1819,8 @@ func (suite *OneDriveCollectionsSuite) TestGet() { "folder": folderPath1, }, }, - expectedDelList: getDelList("file", "file2"), + expectedDelList: getDelList("file", "file2"), + expectedSkippedCount: 2, }, } for _, test := range table { @@ -1884,8 +1891,11 @@ func (suite *OneDriveCollectionsSuite) TestGet() { assert.NoError(t, err, "creating metadata collection") prevMetadata := []data.RestoreCollection{data.NotFoundRestoreCollection{Collection: mc}} - cols, delList, err := c.Get(ctx, prevMetadata, fault.New(true)) + errs := fault.New(true) + + cols, delList, err := c.Get(ctx, prevMetadata, errs) test.errCheck(t, err) + assert.Equal(t, test.expectedSkippedCount, len(errs.Skipped())) if err != nil { return diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index a184748eb..5925c521a 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -170,6 +170,11 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { Errorf("doing backup: recoverable error %d of %d", i+1, recoverableCount) } + skippedCount := len(op.Errors.Skipped()) + for i, skip := range op.Errors.Skipped() { + logger.Ctx(ctx).With("skip", skip).Infof("doing backup: skipped item %d of %d", i+1, skippedCount) + } + // ----- // Persistence // ----- @@ -673,8 +678,7 @@ func (op *BackupOperation) createBackupModels( op.Selectors, op.Results.ReadWrites, op.Results.StartAndEndTime, - op.Errors, - ) + op.Errors) if err = op.store.Put(ctx, model.BackupSchema, b); err != nil { return clues.Wrap(err, "creating backup model").WithClues(ctx) @@ -694,8 +698,7 @@ func (op *BackupOperation) createBackupModels( events.Service: op.Selectors.PathService().String(), events.StartTime: common.FormatTime(op.Results.StartedAt), events.Status: op.Status.String(), - }, - ) + }) return nil } diff --git a/src/internal/stats/stats.go b/src/internal/stats/stats.go index b44c4ae73..825ff7629 100644 --- a/src/internal/stats/stats.go +++ b/src/internal/stats/stats.go @@ -28,3 +28,8 @@ type ByteCounter struct { func (bc *ByteCounter) Count(i int64) { atomic.AddInt64(&bc.NumBytes, i) } + +type SkippedCounts struct { + TotalSkippedItems int `json:"totalSkippedItems"` + SkippedMalware int `json:"skippedMalware"` +} diff --git a/src/pkg/backup/backup.go b/src/pkg/backup/backup.go index c8ade4966..7e3682c6f 100644 --- a/src/pkg/backup/backup.go +++ b/src/pkg/backup/backup.go @@ -35,13 +35,18 @@ type Backup struct { // Version represents the version of the backup format Version int `json:"version"` - // Errors contains all errors aggregated during a backup operation. - Errors fault.Errors `json:"errors"` - ErrorCount int `json:"errorCount"` + FailFast bool `json:"failFast"` + + // the quantity of errors, both hard failure and recoverable. + ErrorCount int `json:"errorCount"` + + // the non-recoverable failure message, only populated if one occurred. + Failure string `json:"failure"` // stats are embedded so that the values appear as top-level properties stats.ReadWrites stats.StartAndEndTime + stats.SkippedCounts } // interface compliance checks @@ -55,13 +60,28 @@ func New( se stats.StartAndEndTime, errs *fault.Bus, ) *Backup { - errData := errs.Errors() + var ( + ee = errs.Errors() + // TODO: count errData.Items(), not all recovered errors. + errCount = len(ee.Recovered) + failMsg string + malware int + otherSkips int + ) - errCount := len(errs.Errors().Recovered) - if errData.Failure != nil { + if ee.Failure != nil { + failMsg = ee.Failure.Error() errCount++ } + for _, s := range ee.Skipped { + if s.HasCause(fault.SkipMalware) { + malware++ + } else { + otherSkips++ + } + } + return &Backup{ BaseModel: model.BaseModel{ ID: id, @@ -69,16 +89,26 @@ func New( model.ServiceTag: selector.PathService().String(), }, }, - CreationTime: time.Now(), - SnapshotID: snapshotID, - DetailsID: detailsID, - Status: status, - Selector: selector, - Errors: errData, - ErrorCount: errCount, + + Version: version.Backup, + SnapshotID: snapshotID, + DetailsID: detailsID, + + CreationTime: time.Now(), + Status: status, + + Selector: selector, + FailFast: errs.FailFast(), + + ErrorCount: errCount, + Failure: failMsg, + ReadWrites: rw, StartAndEndTime: se, - Version: version.Backup, + SkippedCounts: stats.SkippedCounts{ + TotalSkippedItems: len(ee.Skipped), + SkippedMalware: malware, + }, } } @@ -121,7 +151,7 @@ type Printable struct { func (b Backup) MinimumPrintable() any { return Printable{ ID: b.ID, - ErrorCount: b.countErrors(), + ErrorCount: b.ErrorCount, StartedAt: b.StartedAt, Status: b.Status, Version: "0", @@ -145,7 +175,38 @@ func (b Backup) Headers() []string { // Values returns the values matching the Headers list for printing // out to a terminal in a columnar display. func (b Backup) Values() []string { - status := fmt.Sprintf("%s (%d errors)", b.Status, b.countErrors()) + var ( + status = b.Status + errCount = b.ErrorCount + ) + + if errCount+b.TotalSkippedItems > 0 { + status += (" (") + } + + if errCount > 0 { + status += fmt.Sprintf("%d errors", errCount) + } + + if errCount > 0 && b.TotalSkippedItems > 0 { + status += ", " + } + + if b.TotalSkippedItems > 0 { + status += fmt.Sprintf("%d skipped", b.TotalSkippedItems) + } + + if b.TotalSkippedItems > 0 && b.SkippedMalware > 0 { + status += ": " + } + + if b.SkippedMalware > 0 { + status += fmt.Sprintf("%d malware", b.SkippedMalware) + } + + if errCount+b.TotalSkippedItems > 0 { + status += (")") + } return []string{ common.FormatTabularDisplayTime(b.StartedAt), @@ -154,16 +215,3 @@ func (b Backup) Values() []string { b.Selector.DiscreteOwner, } } - -func (b Backup) countErrors() int { - if b.ErrorCount > 0 { - return b.ErrorCount - } - - errCount := len(b.Errors.Recovered) - if b.Errors.Failure != nil { - errCount++ - } - - return errCount -} diff --git a/src/pkg/backup/backup_test.go b/src/pkg/backup/backup_test.go index 57c5a5e46..df40ae243 100644 --- a/src/pkg/backup/backup_test.go +++ b/src/pkg/backup/backup_test.go @@ -1,7 +1,6 @@ package backup_test import ( - "errors" "testing" "time" @@ -14,16 +13,15 @@ import ( "github.com/alcionai/corso/src/internal/stats" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup" - "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/selectors" ) -type BackupSuite struct { +type BackupUnitSuite struct { tester.Suite } -func TestBackupSuite(t *testing.T) { - suite.Run(t, &BackupSuite{Suite: tester.NewUnitSuite(t)}) +func TestBackupUnitSuite(t *testing.T) { + suite.Run(t, &BackupUnitSuite{Suite: tester.NewUnitSuite(t)}) } func stubBackup(t time.Time) backup.Backup { @@ -42,9 +40,8 @@ func stubBackup(t time.Time) backup.Backup { DetailsID: "details", Status: "status", Selector: sel.Selector, - Errors: fault.Errors{ - Recovered: []error{errors.New("read"), errors.New("write")}, - }, + ErrorCount: 2, + Failure: "read, write", ReadWrites: stats.ReadWrites{ BytesRead: 301, BytesUploaded: 301, @@ -55,36 +52,93 @@ func stubBackup(t time.Time) backup.Backup { StartedAt: t, CompletedAt: t, }, + SkippedCounts: stats.SkippedCounts{ + TotalSkippedItems: 1, + SkippedMalware: 1, + }, } } -func (suite *BackupSuite) TestBackup_HeadersValues() { - t := suite.T() - now := time.Now() - b := stubBackup(now) +func (suite *BackupUnitSuite) TestBackup_HeadersValues() { + var ( + t = suite.T() + now = time.Now() + b = stubBackup(now) + expectHs = []string{ + "Started At", + "ID", + "Status", + "Resource Owner", + } + nowFmt = common.FormatTabularDisplayTime(now) + expectVs = []string{ + nowFmt, + "id", + "status (2 errors, 1 skipped: 1 malware)", + "test", + } + ) - expectHs := []string{ - "Started At", - "ID", - "Status", - "Resource Owner", - } + // single skipped malware hs := b.Headers() assert.Equal(t, expectHs, hs) - nowFmt := common.FormatTabularDisplayTime(now) - expectVs := []string{ - nowFmt, - "id", - "status (2 errors)", - "test", - } - vs := b.Values() assert.Equal(t, expectVs, vs) } -func (suite *BackupSuite) TestBackup_MinimumPrintable() { +func (suite *BackupUnitSuite) TestBackup_Values_statusVariations() { + table := []struct { + name string + bup backup.Backup + expect string + }{ + { + name: "no extras", + bup: backup.Backup{Status: "test"}, + expect: "test", + }, + { + name: "errors", + bup: backup.Backup{ + Status: "test", + ErrorCount: 42, + }, + expect: "test (42 errors)", + }, + { + name: "malware", + bup: backup.Backup{ + Status: "test", + SkippedCounts: stats.SkippedCounts{ + TotalSkippedItems: 2, + SkippedMalware: 1, + }, + }, + expect: "test (2 skipped: 1 malware)", + }, + { + name: "errors and malware", + bup: backup.Backup{ + Status: "test", + ErrorCount: 42, + SkippedCounts: stats.SkippedCounts{ + TotalSkippedItems: 1, + SkippedMalware: 1, + }, + }, + expect: "test (42 errors, 1 skipped: 1 malware)", + }, + } + for _, test := range table { + suite.Run(test.name, func() { + result := test.bup.Values() + assert.Equal(suite.T(), test.expect, result[2], "status value") + }) + } +} + +func (suite *BackupUnitSuite) TestBackup_MinimumPrintable() { t := suite.T() now := time.Now() b := stubBackup(now) diff --git a/src/pkg/fault/example_fault_test.go b/src/pkg/fault/example_fault_test.go index 67ddcc4b4..832a17df0 100644 --- a/src/pkg/fault/example_fault_test.go +++ b/src/pkg/fault/example_fault_test.go @@ -417,3 +417,24 @@ func ExampleErrors_Failure_return() { // Output: direct: caught one // recoverable: caught one } + +// ExampleBus_AddSkip showcases when to use AddSkip instead of an error. +func ExampleBus_AddSkip() { + errs := fault.New(false) + + // Some conditions cause well-known problems that we want Corso to skip + // over, instead of error out. An initial case is when Graph API identifies + // a file as containing malware. We can't download the file: it'll always + // error. Our only option is to skip it. + errs.AddSkip(fault.FileSkip( + fault.SkipMalware, + "file-id", + "file-name", + map[string]any{"foo": "bar"}, + )) + + // later on, after processing, end users can scrutinize the skipped items. + fmt.Println(errs.Skipped()[0].String()) + + // Output: skipped processing file: malware_detected +} diff --git a/src/pkg/fault/fault.go b/src/pkg/fault/fault.go index 52ad235b8..b88227846 100644 --- a/src/pkg/fault/fault.go +++ b/src/pkg/fault/fault.go @@ -1,8 +1,10 @@ package fault import ( + "errors" "sync" + "golang.org/x/exp/maps" "golang.org/x/exp/slices" ) @@ -22,6 +24,11 @@ type Bus struct { // we'd expect to see 1 error added to this slice. recoverable []error + // skipped is the accumulation of skipped items. Skipped items + // are not errors themselves, but instead represent some permanent + // inability to process an item, due to a well-known cause. + skipped []Skipped + // if failFast is true, the first errs addition will // get promoted to the err value. This signifies a // non-recoverable processing state, causing any running @@ -29,29 +36,6 @@ type Bus struct { failFast bool } -// Errors provides the errors data alone, without sync -// controls, allowing the data to be persisted. -type Errors struct { - // Failure identifies a non-recoverable error. This includes - // non-start cases (ex: cannot connect to client), hard- - // stop issues (ex: credentials expired) or conscious exit - // cases (ex: iteration error + failFast config). - Failure error `json:"failure"` - - // Recovered errors accumulate through a runtime under - // best-effort processing conditions. They imply that an - // error occurred, but the process was able to move on and - // complete afterwards. - // Eg: if a process is retrieving N items, and 1 of the - // items fails to be retrieved, but the rest of them succeed, - // we'd expect to see 1 error added to this slice. - Recovered []error `json:"-"` - - // If FailFast is true, then the first Recoverable error will - // promote to the Failure spot, causing processing to exit. - FailFast bool `json:"failFast"` -} - // New constructs a new error with default values in place. func New(failFast bool) *Bus { return &Bus{ @@ -61,6 +45,11 @@ func New(failFast bool) *Bus { } } +// FailFast returs the failFast flag in the bus. +func (e *Bus) FailFast() bool { + return e.failFast +} + // Failure returns the primary error. If not nil, this // indicates the operation exited prior to completion. func (e *Bus) Failure() error { @@ -72,17 +61,13 @@ func (e *Bus) Failure() error { // iteration where a single failure (ex: retrieving an item), // doesn't require the entire process to end. func (e *Bus) Recovered() []error { - return e.recoverable + return slices.Clone(e.recoverable) } -// Errors returns the plain record of errors that were aggregated -// within a fult Bus. -func (e *Bus) Errors() Errors { - return Errors{ - Failure: e.failure, - Recovered: slices.Clone(e.recoverable), - FailFast: e.failFast, - } +// Skipped returns the slice of items that were permanently +// skipped during processing. +func (e *Bus) Skipped() []Skipped { + return slices.Clone(e.skipped) } // Fail sets the non-recoverable error (ie: bus.failure) @@ -150,6 +135,105 @@ func (e *Bus) addRecoverableErr(err error) *Bus { return e } +// AddSkip appends a record of a Skipped item to the fault bus. +// Importantly, skipped items are not the same as recoverable +// errors. An item should only be skipped under the following +// conditions. All other cases should be handled as errors. +// 1. The conditions for skipping the item are well-known and +// well-documented. End users need to be able to understand +// both the conditions and identifications of skips. +// 2. Skipping avoids a permanent and consistent failure. If +// the underlying reason is transient or otherwise recoverable, +// the item should not be skipped. +func (e *Bus) AddSkip(s *Skipped) *Bus { + if s == nil { + return e + } + + e.mu.Lock() + defer e.mu.Unlock() + + return e.addSkip(s) +} + +func (e *Bus) addSkip(s *Skipped) *Bus { + e.skipped = append(e.skipped, *s) + return e +} + +// Errors returns the plain record of errors that were aggregated +// within a fult Bus. +func (e *Bus) Errors() Errors { + return Errors{ + Failure: e.failure, + Recovered: slices.Clone(e.recoverable), + Items: itemsIn(e.failure, e.recoverable), + Skipped: slices.Clone(e.skipped), + FailFast: e.failFast, + } +} + +// --------------------------------------------------------------------------- +// Errors Data +// --------------------------------------------------------------------------- + +// Errors provides the errors data alone, without sync controls +// or adders/setters. Expected to get called at the end of processing, +// as a way to aggregate results. +type Errors struct { + // Failure identifies a non-recoverable error. This includes + // non-start cases (ex: cannot connect to client), hard- + // stop issues (ex: credentials expired) or conscious exit + // cases (ex: iteration error + failFast config). + Failure error `json:"failure"` + + // Recovered errors accumulate through a runtime under + // best-effort processing conditions. They imply that an + // error occurred, but the process was able to move on and + // complete afterwards. + // Eg: if a process is retrieving N items, and 1 of the + // items fails to be retrieved, but the rest of them succeed, + // we'd expect to see 1 error added to this slice. + Recovered []error `json:"-"` + + // Items are the reduction of all errors (both the failure and the + // recovered values) in the Errors struct into a slice of items, + // deduplicated by their ID. + Items []Item `json:"items"` + + // Skipped is the accumulation of skipped items. Skipped items + // are not errors themselves, but instead represent some permanent + // inability to process an item, due to a well-known cause. + Skipped []Skipped `json:"skipped"` + + // If FailFast is true, then the first Recoverable error will + // promote to the Failure spot, causing processing to exit. + FailFast bool `json:"failFast"` +} + +// itemsIn reduces all errors (both the failure and recovered values) +// in the Errors struct into a slice of items, deduplicated by their +// ID. +func itemsIn(failure error, recovered []error) []Item { + is := map[string]Item{} + + for _, err := range recovered { + var ie *Item + if !errors.As(err, &ie) { + continue + } + + is[ie.ID] = *ie + } + + var ie *Item + if errors.As(failure, &ie) { + is[ie.ID] = *ie + } + + return maps.Values(is) +} + // --------------------------------------------------------------------------- // Local aggregator // --------------------------------------------------------------------------- @@ -187,6 +271,27 @@ func (e *localBus) AddRecoverable(err error) { e.bus.AddRecoverable(err) } +// AddSkip appends a record of a Skipped item to the local bus. +// Importantly, skipped items are not the same as recoverable +// errors. An item should only be skipped under the following +// conditions. All other cases should be handled as errors. +// 1. The conditions for skipping the item are well-known and +// well-documented. End users need to be able to understand +// both the conditions and identifications of skips. +// 2. Skipping avoids a permanent and consistent failure. If +// the underlying reason is transient or otherwise recoverable, +// the item should not be skipped. +func (e *localBus) AddSkip(s *Skipped) { + if s == nil { + return + } + + e.mu.Lock() + defer e.mu.Unlock() + + e.bus.AddSkip(s) +} + // Failure returns the failure that happened within the local bus. // It does not return the underlying bus.Failure(), only the failure // that was recorded within the local bus instance. This error should diff --git a/src/pkg/fault/fault_test.go b/src/pkg/fault/fault_test.go index e0fcc4e20..e7298625d 100644 --- a/src/pkg/fault/fault_test.go +++ b/src/pkg/fault/fault_test.go @@ -182,6 +182,22 @@ func (suite *FaultErrorsUnitSuite) TestAdd() { assert.Len(t, n.Recovered(), 2) } +func (suite *FaultErrorsUnitSuite) TestAddSkip() { + t := suite.T() + + n := fault.New(true) + require.NotNil(t, n) + + n.Fail(assert.AnError) + assert.Len(t, n.Skipped(), 0) + + n.AddRecoverable(assert.AnError) + assert.Len(t, n.Skipped(), 0) + + n.AddSkip(fault.OwnerSkip(fault.SkipMalware, "id", "name", nil)) + assert.Len(t, n.Skipped(), 1) +} + func (suite *FaultErrorsUnitSuite) TestErrors() { t := suite.T() @@ -212,6 +228,104 @@ func (suite *FaultErrorsUnitSuite) TestErrors() { assert.True(t, d.FailFast) } +func (suite *FaultErrorsUnitSuite) TestErrors_Items() { + ae := assert.AnError + addtl := map[string]any{"foo": "bar", "baz": 1} + + table := []struct { + name string + errs func() fault.Errors + expect []fault.Item + }{ + { + name: "no errors", + errs: func() fault.Errors { + return fault.New(false).Errors() + }, + expect: []fault.Item{}, + }, + { + name: "no items", + errs: func() fault.Errors { + b := fault.New(false) + b.Fail(ae) + b.AddRecoverable(ae) + + return b.Errors() + }, + expect: []fault.Item{}, + }, + { + name: "failure item", + errs: func() fault.Errors { + b := fault.New(false) + b.Fail(fault.OwnerErr(ae, "id", "name", addtl)) + b.AddRecoverable(ae) + + return b.Errors() + }, + expect: []fault.Item{*fault.OwnerErr(ae, "id", "name", addtl)}, + }, + { + name: "recoverable item", + errs: func() fault.Errors { + b := fault.New(false) + b.Fail(ae) + b.AddRecoverable(fault.OwnerErr(ae, "id", "name", addtl)) + + return b.Errors() + }, + expect: []fault.Item{*fault.OwnerErr(ae, "id", "name", addtl)}, + }, + { + name: "two items", + errs: func() fault.Errors { + b := fault.New(false) + b.Fail(fault.OwnerErr(ae, "oid", "name", addtl)) + b.AddRecoverable(fault.FileErr(ae, "fid", "name", addtl)) + + return b.Errors() + }, + expect: []fault.Item{ + *fault.OwnerErr(ae, "oid", "name", addtl), + *fault.FileErr(ae, "fid", "name", addtl), + }, + }, + { + name: "duplicate items - failure priority", + errs: func() fault.Errors { + b := fault.New(false) + b.Fail(fault.OwnerErr(ae, "id", "name", addtl)) + b.AddRecoverable(fault.FileErr(ae, "id", "name", addtl)) + + return b.Errors() + }, + expect: []fault.Item{ + *fault.OwnerErr(ae, "id", "name", addtl), + }, + }, + { + name: "duplicate items - last recoverable priority", + errs: func() fault.Errors { + b := fault.New(false) + b.Fail(ae) + b.AddRecoverable(fault.FileErr(ae, "fid", "name", addtl)) + b.AddRecoverable(fault.FileErr(ae, "fid", "name2", addtl)) + + return b.Errors() + }, + expect: []fault.Item{ + *fault.FileErr(ae, "fid", "name2", addtl), + }, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + assert.ElementsMatch(suite.T(), test.expect, test.errs().Items) + }) + } +} + func (suite *FaultErrorsUnitSuite) TestMarshalUnmarshal() { t := suite.T() diff --git a/src/pkg/fault/item.go b/src/pkg/fault/item.go new file mode 100644 index 000000000..d5dab4857 --- /dev/null +++ b/src/pkg/fault/item.go @@ -0,0 +1,172 @@ +package fault + +const ( + AddtlCreatedBy = "created_by" + AddtlLastModBy = "last_modified_by" + AddtlContainerID = "container_id" + AddtlContainerName = "container_name" + AddtlMalwareDesc = "malware_description" +) + +type itemType string + +const ( + FileType itemType = "file" + ContainerType itemType = "container" + ResourceOwnerType itemType = "resource_owner" +) + +var _ error = &Item{} + +// Item contains a concrete reference to a thing that failed +// during processing. The categorization of the item is determined +// by its Type: file, container, or reourceOwner. +// +// Item is compliant with the error interface so that it can be +// aggregated with the fault bus, and deserialized using the +// errors.As() func. The idea is that fault,Items, during +// processing, will get packed into bus.AddRecoverable (or failure) +// as part of standard error handling, and later deserialized +// by the end user (cli or sdk) for surfacing human-readable and +// identifiable points of failure. +type Item struct { + // deduplication identifier; the ID of the observed item. + ID string `json:"id"` + + // a human-readable reference: file/container name, email, etc + Name string `json:"name"` + + // tracks the type of item represented by this entry. + Type itemType `json:"type"` + + // Error() of the causal error, or a sentinel if this is the + // source of the error. In case of ID collisions, the first + // item takes priority. + Cause string `json:"cause"` + + // Additional is a catch-all map for storing data that might + // be relevant to particular types or contexts of items without + // being globally relevant. Ex: parent container references, + // created-by ids, last modified, etc. Should be used sparingly, + // only for information that might be immediately relevant to the + // end user. + Additional map[string]any `json:"additional"` +} + +// Error complies with the error interface. +func (i *Item) Error() string { + if i == nil { + return "" + } + + if len(i.Type) == 0 { + return "processing item of unknown type" + } + + return string("processing " + i.Type) +} + +// ContainerErr produces a Container-type Item for tracking erronous items +func ContainerErr(cause error, id, name string, addtl map[string]any) *Item { + return itemErr(ContainerType, cause, id, name, addtl) +} + +// FileErr produces a File-type Item for tracking erronous items. +func FileErr(cause error, id, name string, addtl map[string]any) *Item { + return itemErr(FileType, cause, id, name, addtl) +} + +// OnwerErr produces a ResourceOwner-type Item for tracking erronous items. +func OwnerErr(cause error, id, name string, addtl map[string]any) *Item { + return itemErr(ResourceOwnerType, cause, id, name, addtl) +} + +// itemErr produces a Item of the provided type for tracking erronous items. +func itemErr(t itemType, cause error, id, name string, addtl map[string]any) *Item { + return &Item{ + ID: id, + Name: name, + Type: t, + Cause: cause.Error(), + Additional: addtl, + } +} + +// --------------------------------------------------------------------------- +// Skipped Items +// --------------------------------------------------------------------------- + +// skipCause identifies the well-known conditions to Skip an item. It is +// important that skip cause enumerations do not overlap with general error +// handling. Skips must be well known, well documented, and consistent. +// Transient failures, undocumented or unknown conditions, and arbitrary +// handling should never produce a skipped item. Those cases should get +// handled as normal errors. +type skipCause string + +// SkipMalware identifies a malware detection case. Files that graph api +// identifies as malware cannot be downloaded or uploaded, and will permanently +// fail any attempts to backup or restore. +const SkipMalware skipCause = "malware_detected" + +// Skipped items are permanently unprocessable due to well-known conditions. +// In order to skip an item, the following conditions should be met: +// 1. The conditions for skipping the item are well-known and +// well-documented. End users need to be able to understand +// both the conditions and identifications of skips. +// 2. Skipping avoids a permanent and consistent failure. If +// the underlying reason is transient or otherwise recoverable, +// the item should not be skipped. +// +// Skipped wraps Item primarily to minimze confusion when sharing the +// fault interface. Skipped items are not errors, and Item{} errors are +// not the basis for a Skip. +type Skipped struct { + item Item +} + +// String complies with the stringer interface. +func (s *Skipped) String() string { + if s == nil { + return "" + } + + return "skipped " + s.item.Error() + ": " + s.item.Cause +} + +// HasCause compares the underlying cause against the parameter. +func (s *Skipped) HasCause(c skipCause) bool { + if s == nil { + return false + } + + return s.item.Cause == string(c) +} + +// ContainerSkip produces a Container-kind Item for tracking skipped items. +func ContainerSkip(cause skipCause, id, name string, addtl map[string]any) *Skipped { + return itemSkip(ContainerType, cause, id, name, addtl) +} + +// FileSkip produces a File-kind Item for tracking skipped items. +func FileSkip(cause skipCause, id, name string, addtl map[string]any) *Skipped { + return itemSkip(FileType, cause, id, name, addtl) +} + +// OnwerSkip produces a ResourceOwner-kind Item for tracking skipped items. +func OwnerSkip(cause skipCause, id, name string, addtl map[string]any) *Skipped { + return itemSkip(ResourceOwnerType, cause, id, name, addtl) +} + +// itemSkip produces a Item of the provided type for tracking skipped items. +func itemSkip(t itemType, cause skipCause, id, name string, addtl map[string]any) *Skipped { + return &Skipped{ + item: Item{ + ID: id, + Name: name, + Type: t, + Cause: string(cause), + Additional: addtl, + }, + } +} diff --git a/src/pkg/fault/item_test.go b/src/pkg/fault/item_test.go new file mode 100644 index 000000000..f90136ff1 --- /dev/null +++ b/src/pkg/fault/item_test.go @@ -0,0 +1,145 @@ +package fault + +import ( + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/tester" +) + +type ItemUnitSuite struct { + tester.Suite +} + +func TestItemUnitSuite(t *testing.T) { + suite.Run(t, &ItemUnitSuite{Suite: tester.NewUnitSuite(t)}) +} + +func (suite *ItemUnitSuite) TestItem_Error() { + var ( + t = suite.T() + i *Item + ) + + assert.Contains(t, i.Error(), "nil") + + i = &Item{} + assert.Contains(t, i.Error(), "unknown type") + + i = &Item{Type: FileType} + assert.Contains(t, i.Error(), FileType) +} + +func (suite *ItemUnitSuite) TestContainerErr() { + t := suite.T() + addtl := map[string]any{"foo": "bar"} + i := ContainerErr(errors.New("foo"), "id", "name", addtl) + + expect := Item{ + ID: "id", + Name: "name", + Type: ContainerType, + Cause: "foo", + Additional: addtl, + } + + assert.Equal(t, expect, *i) +} + +func (suite *ItemUnitSuite) TestFileErr() { + t := suite.T() + addtl := map[string]any{"foo": "bar"} + i := FileErr(errors.New("foo"), "id", "name", addtl) + + expect := Item{ + ID: "id", + Name: "name", + Type: FileType, + Cause: "foo", + Additional: addtl, + } + + assert.Equal(t, expect, *i) +} + +func (suite *ItemUnitSuite) TestOwnerErr() { + t := suite.T() + addtl := map[string]any{"foo": "bar"} + i := OwnerErr(errors.New("foo"), "id", "name", addtl) + + expect := Item{ + ID: "id", + Name: "name", + Type: ResourceOwnerType, + Cause: "foo", + Additional: addtl, + } + + assert.Equal(t, expect, *i) +} + +func (suite *ItemUnitSuite) TestSkipped_String() { + var ( + t = suite.T() + i *Skipped + ) + + assert.Contains(t, i.String(), "nil") + + i = &Skipped{Item{}} + assert.Contains(t, i.String(), "unknown type") + + i = &Skipped{Item{Type: FileType}} + assert.Contains(t, i.item.Error(), FileType) +} + +func (suite *ItemUnitSuite) TestContainerSkip() { + t := suite.T() + addtl := map[string]any{"foo": "bar"} + i := ContainerSkip(SkipMalware, "id", "name", addtl) + + expect := Item{ + ID: "id", + Name: "name", + Type: ContainerType, + Cause: string(SkipMalware), + Additional: addtl, + } + + assert.Equal(t, Skipped{expect}, *i) +} + +func (suite *ItemUnitSuite) TestFileSkip() { + t := suite.T() + addtl := map[string]any{"foo": "bar"} + i := FileSkip(SkipMalware, "id", "name", addtl) + + expect := Item{ + ID: "id", + Name: "name", + Type: FileType, + Cause: string(SkipMalware), + Additional: addtl, + } + + assert.Equal(t, Skipped{expect}, *i) +} + +func (suite *ItemUnitSuite) TestOwnerSkip() { + t := suite.T() + addtl := map[string]any{"foo": "bar"} + i := OwnerSkip(SkipMalware, "id", "name", addtl) + + expect := Item{ + ID: "id", + Name: "name", + Type: ResourceOwnerType, + Cause: string(SkipMalware), + Additional: addtl, + } + + assert.Equal(t, Skipped{expect}, *i) +}