From 368a246596fc63d5101a416a1cb5e670a7bf361f Mon Sep 17 00:00:00 2001 From: ryanfkeepers Date: Tue, 13 Feb 2024 16:39:03 -0700 Subject: [PATCH] allow skipping error cases in exchange backup utilize the previously introduced canSkipItemFailure interface to create skip records for items during backup. --- .../m365/collection/exchange/backup.go | 1 + .../m365/collection/exchange/collection.go | 70 ++++- .../collection/exchange/collection_test.go | 272 ++++++++++++++++++ .../m365/collection/exchange/events_backup.go | 1 + .../collection/exchange/events_backup_test.go | 10 +- .../m365/collection/exchange/mock/item.go | 30 ++ 6 files changed, 367 insertions(+), 17 deletions(-) diff --git a/src/internal/m365/collection/exchange/backup.go b/src/internal/m365/collection/exchange/backup.go index d191428dc..445577a0a 100644 --- a/src/internal/m365/collection/exchange/backup.go +++ b/src/internal/m365/collection/exchange/backup.go @@ -296,6 +296,7 @@ func populateCollections( cl), qp.ProtectedResource.ID(), bh.itemHandler(), + bh, addAndRem.Added, addAndRem.Removed, // TODO: produce a feature flag that allows selective diff --git a/src/internal/m365/collection/exchange/collection.go b/src/internal/m365/collection/exchange/collection.go index efb6f7e94..3e701bf57 100644 --- a/src/internal/m365/collection/exchange/collection.go +++ b/src/internal/m365/collection/exchange/collection.go @@ -19,6 +19,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/errs/core" "github.com/alcionai/corso/src/pkg/fault" @@ -68,21 +69,21 @@ func getItemAndInfo( ctx context.Context, getter itemGetterSerializer, userID string, - id string, + itemID string, useImmutableIDs bool, parentPath string, ) ([]byte, *details.ExchangeInfo, error) { item, info, err := getter.GetItem( ctx, userID, - id, + itemID, fault.New(true)) // temporary way to force a failFast error if err != nil { return nil, nil, clues.WrapWC(ctx, err, "fetching item"). Label(fault.LabelForceNoBackupCreation) } - itemData, err := getter.Serialize(ctx, item, userID, id) + itemData, err := getter.Serialize(ctx, item, userID, itemID) if err != nil { return nil, nil, clues.WrapWC(ctx, err, "serializing item") } @@ -108,6 +109,7 @@ func NewCollection( bc data.BaseCollection, user string, items itemGetterSerializer, + canSkipFailChecker canSkipItemFailurer, origAdded map[string]time.Time, origRemoved []string, validModTimes bool, @@ -140,6 +142,7 @@ func NewCollection( added: added, removed: removed, getter: items, + skipChecker: canSkipFailChecker, statusUpdater: statusUpdater, } } @@ -150,6 +153,7 @@ func NewCollection( added: added, removed: removed, getter: items, + skipChecker: canSkipFailChecker, statusUpdater: statusUpdater, counter: counter, } @@ -167,7 +171,8 @@ type prefetchCollection struct { // removed is a list of item IDs that were deleted from, or moved out, of a container removed map[string]struct{} - getter itemGetterSerializer + getter itemGetterSerializer + skipChecker canSkipItemFailurer statusUpdater support.StatusUpdater } @@ -194,11 +199,12 @@ func (col *prefetchCollection) streamItems( wg sync.WaitGroup progressMessage chan<- struct{} user = col.user + dataCategory = col.Category().String() ) ctx = clues.Add( ctx, - "category", col.Category().String()) + "category", dataCategory) defer func() { close(stream) @@ -227,7 +233,7 @@ func (col *prefetchCollection) streamItems( defer close(semaphoreCh) // delete all removed items - for id := range col.removed { + for itemID := range col.removed { semaphoreCh <- struct{}{} wg.Add(1) @@ -247,7 +253,7 @@ func (col *prefetchCollection) streamItems( if progressMessage != nil { progressMessage <- struct{}{} } - }(id) + }(itemID) } var ( @@ -256,7 +262,7 @@ func (col *prefetchCollection) streamItems( ) // add any new items - for id := range col.added { + for itemID := range col.added { if el.Failure() != nil { break } @@ -277,8 +283,24 @@ func (col *prefetchCollection) streamItems( col.Opts().ToggleFeatures.ExchangeImmutableIDs, parentPath) if err != nil { + // pulled outside the switch due to multiple return values. + cause, canSkip := col.skipChecker.CanSkipItemFailure( + err, + user, + id, + col.BaseCollection.Opts()) + // Handle known error cases switch { + case canSkip: + // this is a special case handler that allows the item to be skipped + // instead of producing an error. + errs.AddSkip(ctx, fault.FileSkip( + cause, + dataCategory, + id, + id, + nil)) case errors.Is(err, core.ErrNotFound): // Don't report errors for deleted items as there's no way for us to // back up data that is gone. Record it as a "success", since there's @@ -349,7 +371,7 @@ func (col *prefetchCollection) streamItems( if progressMessage != nil { progressMessage <- struct{}{} } - }(id) + }(itemID) } wg.Wait() @@ -377,7 +399,8 @@ type lazyFetchCollection struct { // removed is a list of item IDs that were deleted from, or moved out, of a container removed map[string]struct{} - getter itemGetterSerializer + getter itemGetterSerializer + skipChecker canSkipItemFailurer statusUpdater support.StatusUpdater @@ -404,8 +427,7 @@ func (col *lazyFetchCollection) streamItems( var ( success int64 progressMessage chan<- struct{} - - user = col.user + user = col.user ) defer func() { @@ -459,10 +481,13 @@ func (col *lazyFetchCollection) streamItems( &lazyItemGetter{ userID: user, itemID: id, + category: col.FullPath().Category(), getter: col.getter, modTime: modTime, immutableIDs: col.Opts().ToggleFeatures.ExchangeImmutableIDs, parentPath: parentPath, + skipChecker: col.skipChecker, + opts: col.BaseCollection.Opts(), }, id, modTime, @@ -481,9 +506,12 @@ type lazyItemGetter struct { getter itemGetterSerializer userID string itemID string + category path.CategoryType parentPath string modTime time.Time immutableIDs bool + skipChecker canSkipItemFailurer + opts control.Options } func (lig *lazyItemGetter) GetData( @@ -498,6 +526,24 @@ func (lig *lazyItemGetter) GetData( lig.immutableIDs, lig.parentPath) if err != nil { + cause, canSkip := lig.skipChecker.CanSkipItemFailure( + err, + lig.userID, + lig.itemID, + lig.opts) + if canSkip { + errs.AddSkip(ctx, fault.FileSkip( + cause, + lig.category.String(), + lig.itemID, + lig.itemID, + nil)) + + return nil, nil, false, clues. + NewWC(ctx, "error marked as skippable by handler"). + Label(graph.LabelsSkippable) + } + // If an item was deleted then return an empty file so we don't fail // the backup and return a sentinel error when asked for ItemInfo so // we don't display the item in the backup. diff --git a/src/internal/m365/collection/exchange/collection_test.go b/src/internal/m365/collection/exchange/collection_test.go index 1ec9e81f3..a3fbe4c79 100644 --- a/src/internal/m365/collection/exchange/collection_test.go +++ b/src/internal/m365/collection/exchange/collection_test.go @@ -153,6 +153,7 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() { count.New()), "u", mock.DefaultItemGetSerialize(), + mock.NeverCanSkipFailChecker(), nil, nil, colType.validModTimes, @@ -298,6 +299,7 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_Items() { count.New()), "", &mock.ItemGetSerialize{}, + mock.NeverCanSkipFailChecker(), test.added, maps.Keys(test.removed), false, @@ -333,6 +335,126 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_Items() { } } +func (suite *CollectionUnitSuite) TestPrefetchCollection_Items_skipFailure() { + var ( + t = suite.T() + start = time.Now().Add(-time.Second) + statusUpdater = func(*support.ControllerOperationStatus) {} + ) + + fullPath, err := path.Build("t", "pr", path.ExchangeService, path.EmailCategory, false, "fnords", "smarf") + require.NoError(t, err, clues.ToCore(err)) + + locPath, err := path.Build("t", "pr", path.ExchangeService, path.EmailCategory, false, "fnords", "smarf") + require.NoError(t, err, clues.ToCore(err)) + + table := []struct { + name string + added map[string]time.Time + removed map[string]struct{} + expectItemCount int + expectSkippedCount int + }{ + { + name: "no items", + }, + { + name: "only added items", + added: map[string]time.Time{ + "fisher": {}, + "flannigan": {}, + "fitzbog": {}, + }, + expectItemCount: 0, + expectSkippedCount: 3, + }, + { + name: "only removed items", + removed: map[string]struct{}{ + "princess": {}, + "poppy": {}, + "petunia": {}, + }, + expectItemCount: 3, + expectSkippedCount: 0, + }, + { + name: "added and removed items", + added: map[string]time.Time{ + "general": {}, + }, + removed: map[string]struct{}{ + "general": {}, + "goose": {}, + "grumbles": {}, + }, + expectItemCount: 3, + // not 1, because general is removed from the added + // map due to being in the removed map + expectSkippedCount: 0, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + var ( + t = suite.T() + errs = fault.New(true) + itemCount int + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + col := NewCollection( + data.NewBaseCollection( + fullPath, + nil, + locPath.ToBuilder(), + control.DefaultOptions(), + false, + count.New()), + "", + &mock.ItemGetSerialize{ + SerializeErr: assert.AnError, + }, + mock.AlwaysCanSkipFailChecker(), + test.added, + maps.Keys(test.removed), + false, + statusUpdater, + count.New()) + + for item := range col.Items(ctx, errs) { + itemCount++ + + _, rok := test.removed[item.ID()] + if rok { + assert.True(t, item.Deleted(), "removals should be marked as deleted") + dimt, ok := item.(data.ItemModTime) + require.True(t, ok, "item implements data.ItemModTime") + assert.True(t, dimt.ModTime().After(start), "deleted items should set mod time to now()") + } + + _, aok := test.added[item.ID()] + if !rok && aok { + assert.False(t, item.Deleted(), "additions should not be marked as deleted") + } + + assert.True(t, aok || rok, "item must be either added or removed: %q", item.ID()) + } + + assert.NoError(t, errs.Failure()) + assert.Equal( + t, + test.expectItemCount, + itemCount, + "should see all expected items") + assert.Len(t, errs.Skipped(), test.expectSkippedCount) + }) + } +} + // This test verifies skipped error cases are handled correctly by collection enumeration func (suite *CollectionUnitSuite) TestCollection_SkippedErrors() { var ( @@ -398,6 +520,7 @@ func (suite *CollectionUnitSuite) TestCollection_SkippedErrors() { count.New()), "", test.itemGetter, + mock.NeverCanSkipFailChecker(), test.added, nil, false, @@ -530,6 +653,7 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() { count.New()), "", mlg, + mock.NeverCanSkipFailChecker(), test.added, maps.Keys(test.removed), true, @@ -589,6 +713,154 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() { } } +func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_skipFailure() { + var ( + t = suite.T() + start = time.Now().Add(-time.Second) + statusUpdater = func(*support.ControllerOperationStatus) {} + ) + + fullPath, err := path.Build("t", "pr", path.ExchangeService, path.EmailCategory, false, "fnords", "smarf") + require.NoError(t, err, clues.ToCore(err)) + + locPath, err := path.Build("t", "pr", path.ExchangeService, path.EmailCategory, false, "fnords", "smarf") + require.NoError(t, err, clues.ToCore(err)) + + table := []struct { + name string + added map[string]time.Time + removed map[string]struct{} + expectItemCount int + expectSkippedCount int + expectReads []string + }{ + { + name: "no items", + }, + { + name: "only added items", + added: map[string]time.Time{ + "fisher": start.Add(time.Minute), + "flannigan": start.Add(2 * time.Minute), + "fitzbog": start.Add(3 * time.Minute), + }, + expectItemCount: 3, + expectSkippedCount: 2, + expectReads: []string{ + "fisher", + "fitzbog", + }, + }, + { + name: "only removed items", + removed: map[string]struct{}{ + "princess": {}, + "poppy": {}, + "petunia": {}, + }, + expectItemCount: 3, + expectSkippedCount: 0, + }, + { + name: "added and removed items", + added: map[string]time.Time{ + "general": {}, + }, + removed: map[string]struct{}{ + "general": {}, + "goose": {}, + "grumbles": {}, + }, + expectItemCount: 3, + // not 1, because general is removed from the added + // map due to being in the removed map + expectSkippedCount: 0, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + var ( + t = suite.T() + errs = fault.New(true) + itemCount int + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + mlg := &mockLazyItemGetterSerializer{ + ItemGetSerialize: &mock.ItemGetSerialize{ + SerializeErr: assert.AnError, + }, + } + defer mlg.check(t, test.expectReads) + + col := NewCollection( + data.NewBaseCollection( + fullPath, + nil, + locPath.ToBuilder(), + control.DefaultOptions(), + false, + count.New()), + "", + mlg, + mock.AlwaysCanSkipFailChecker(), + test.added, + maps.Keys(test.removed), + true, + statusUpdater, + count.New()) + + for item := range col.Items(ctx, errs) { + itemCount++ + + _, rok := test.removed[item.ID()] + if rok { + assert.True(t, item.Deleted(), "removals should be marked as deleted") + dimt, ok := item.(data.ItemModTime) + require.True(t, ok, "item implements data.ItemModTime") + assert.True(t, dimt.ModTime().After(start), "deleted items should set mod time to now()") + } + + modTime, aok := test.added[item.ID()] + if !rok && aok { + // Item's mod time should be what's passed into the collection + // initializer. + assert.Implements(t, (*data.ItemModTime)(nil), item) + assert.Equal(t, modTime, item.(data.ItemModTime).ModTime(), "item mod time") + + assert.False(t, item.Deleted(), "additions should not be marked as deleted") + + // Check if the test want's us to read the item's data so the lazy + // data fetch is executed. + if slices.Contains(test.expectReads, item.ID()) { + r := item.ToReader() + + _, err := io.ReadAll(r) + assert.Error(t, err, clues.ToCore(err)) + assert.ErrorContains(t, err, "marked as skippable", clues.ToCore(err)) + assert.True(t, clues.HasLabel(err, graph.LabelsSkippable), clues.ToCore(err)) + + r.Close() + } + } + + assert.True(t, aok || rok, "item must be either added or removed: %q", item.ID()) + } + + assert.NoError(t, errs.Failure()) + assert.Equal( + t, + test.expectItemCount, + itemCount, + "should see all expected items") + assert.Len(t, errs.Skipped(), test.expectSkippedCount) + }) + } +} + func (suite *CollectionUnitSuite) TestLazyItem_NoRead_GetInfo_Errors() { t := suite.T() diff --git a/src/internal/m365/collection/exchange/events_backup.go b/src/internal/m365/collection/exchange/events_backup.go index 8dafd2826..617fb474a 100644 --- a/src/internal/m365/collection/exchange/events_backup.go +++ b/src/internal/m365/collection/exchange/events_backup.go @@ -5,6 +5,7 @@ import ( "slices" "github.com/alcionai/clues" + "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/services/m365/api" diff --git a/src/internal/m365/collection/exchange/events_backup_test.go b/src/internal/m365/collection/exchange/events_backup_test.go index bc8fb7377..853d6bda9 100644 --- a/src/internal/m365/collection/exchange/events_backup_test.go +++ b/src/internal/m365/collection/exchange/events_backup_test.go @@ -3,11 +3,11 @@ package exchange import ( "testing" + "github.com/alcionai/clues" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" - "github.com/alcionai/clues" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/fault" @@ -54,7 +54,7 @@ func (suite *EventsBackupHandlerUnitSuite) TestHandler_CanSkipItemFailure() { err: nil, opts: control.Options{ SkipTheseEventsOnInstance503: map[string][]string{ - "foo": []string{"bar", "baz"}, + "foo": {"bar", "baz"}, }, }, expect: assert.False, @@ -64,7 +64,7 @@ func (suite *EventsBackupHandlerUnitSuite) TestHandler_CanSkipItemFailure() { err: clues.New("fix me I'm wrong"), opts: control.Options{ SkipTheseEventsOnInstance503: map[string][]string{ - "foo": []string{"bar", "baz"}, + "foo": {"bar", "baz"}, }, }, expect: assert.False, @@ -74,7 +74,7 @@ func (suite *EventsBackupHandlerUnitSuite) TestHandler_CanSkipItemFailure() { err: clues.New("fix me I'm wrong"), opts: control.Options{ SkipTheseEventsOnInstance503: map[string][]string{ - resourceID: []string{"bar", "baz"}, + resourceID: {"bar", "baz"}, }, }, expect: assert.False, @@ -84,7 +84,7 @@ func (suite *EventsBackupHandlerUnitSuite) TestHandler_CanSkipItemFailure() { err: clues.New("fix me I'm wrong"), opts: control.Options{ SkipTheseEventsOnInstance503: map[string][]string{ - resourceID: []string{"bar", itemID}, + resourceID: {"bar", itemID}, }, }, expect: assert.True, diff --git a/src/internal/m365/collection/exchange/mock/item.go b/src/internal/m365/collection/exchange/mock/item.go index 712388af3..46f18a4cb 100644 --- a/src/internal/m365/collection/exchange/mock/item.go +++ b/src/internal/m365/collection/exchange/mock/item.go @@ -6,10 +6,15 @@ import ( "github.com/microsoft/kiota-abstractions-go/serialization" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/services/m365/api" ) +// --------------------------------------------------------------------------- +// get and serialize item mock +// --------------------------------------------------------------------------- + type ItemGetSerialize struct { GetData serialization.Parsable GetCount int @@ -44,3 +49,28 @@ func (m *ItemGetSerialize) Serialize( func DefaultItemGetSerialize() *ItemGetSerialize { return &ItemGetSerialize{} } + +// --------------------------------------------------------------------------- +// can skip item failure mock +// --------------------------------------------------------------------------- + +type canSkipFailChecker struct { + canSkip bool +} + +func (m canSkipFailChecker) CanSkipItemFailure( + error, + string, + string, + control.Options, +) (fault.SkipCause, bool) { + return fault.SkipCause("testing"), m.canSkip +} + +func NeverCanSkipFailChecker() *canSkipFailChecker { + return &canSkipFailChecker{} +} + +func AlwaysCanSkipFailChecker() *canSkipFailChecker { + return &canSkipFailChecker{true} +}