diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index 44418e9bd..d53e3dbe9 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -173,6 +173,9 @@ func (col *Collection) streamItems(ctx context.Context) { colProgress chan<- struct{} user = col.user + log = logger.Ctx(ctx).With( + "service", path.ExchangeService.String(), + "category", col.category.String()) ) defer func() { @@ -251,50 +254,19 @@ func (col *Collection) streamItems(ctx context.Context) { err error ) - for i := 1; i <= numberOfRetries; i++ { - item, info, err = col.items.GetItem(ctx, user, id) - if err == nil { - break - } - - // If the data is no longer available just return here and chalk it up - // as a success. There's no reason to retry and no way we can backup up - // enough information to restore the item anyway. - if graph.IsErrDeletedInFlight(err) { - atomic.AddInt64(&success, 1) - logger.Ctx(ctx).Infow( - "Graph reported item not found", - "error", err, - "service", path.ExchangeService.String(), - "category", col.category.String) - - return - } - - if i < numberOfRetries { - time.Sleep(time.Duration(3*(i+1)) * time.Second) - } - } - + item, info, err = getItemWithRetries(ctx, user, id, col.items) if err != nil { // Don't report errors for deleted items as there's no way for us to - // back up data that is gone. Chalk them up as a "success" though since - // there's really nothing we can do and not reporting it will make the - // status code upset cause we won't have the same number of results as - // attempted items. + // back up data that is gone. Record it as a "success", since there's + // nothing else we can do, and not reporting it will make the status + // investigation upset. if graph.IsErrDeletedInFlight(err) { atomic.AddInt64(&success, 1) - logger.Ctx(ctx).Infow( - "Graph reported item not found", - "error", err, - "service", path.ExchangeService.String(), - "category", col.category.String) - - return + log.Infow("item not found", "err", err) + } else { + errUpdater(user, support.ConnectorStackErrorTraceWrap(err, "fetching item")) } - errUpdater(user, support.ConnectorStackErrorTraceWrap(err, "fetching item")) - return } @@ -325,6 +297,42 @@ func (col *Collection) streamItems(ctx context.Context) { wg.Wait() } +// get an item while handling retry and backoff. +func getItemWithRetries( + ctx context.Context, + userID, itemID string, + items itemer, +) (serialization.Parsable, *details.ExchangeInfo, error) { + var ( + item serialization.Parsable + info *details.ExchangeInfo + err error + ) + + for i := 1; i <= numberOfRetries; i++ { + item, info, err = items.GetItem(ctx, userID, itemID) + if err == nil { + break + } + + // If the data is no longer available just return here and chalk it up + // as a success. There's no reason to retry; it's gone Let it go. + if graph.IsErrDeletedInFlight(err) { + return nil, nil, err + } + + if i < numberOfRetries { + time.Sleep(time.Duration(3*(i+1)) * time.Second) + } + } + + if err != nil { + return nil, nil, err + } + + return item, info, err +} + // terminatePopulateSequence is a utility function used to close a Collection's data channel // and to send the status update through the channel. func (col *Collection) finishPopulation(ctx context.Context, success int, totalBytes int64, errs error) { diff --git a/src/internal/connector/exchange/exchange_data_collection_test.go b/src/internal/connector/exchange/exchange_data_collection_test.go index a63a7caf8..e45f3d80c 100644 --- a/src/internal/connector/exchange/exchange_data_collection_test.go +++ b/src/internal/connector/exchange/exchange_data_collection_test.go @@ -10,23 +10,33 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/common" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/path" ) -type mockItemer struct{} +type mockItemer struct { + getCount int + serializeCount int + getErr error + serializeErr error +} -func (mi mockItemer) GetItem( +func (mi *mockItemer) GetItem( context.Context, string, string, ) (serialization.Parsable, *details.ExchangeInfo, error) { - return nil, nil, nil + mi.getCount++ + return nil, nil, mi.getErr } -func (mi mockItemer) Serialize(context.Context, serialization.Parsable, string, string) ([]byte, error) { - return nil, nil +func (mi *mockItemer) Serialize(context.Context, serialization.Parsable, string, string) ([]byte, error) { + mi.serializeCount++ + return nil, mi.serializeErr } type ExchangeDataCollectionSuite struct { @@ -153,10 +163,58 @@ func (suite *ExchangeDataCollectionSuite) TestNewCollection_state() { "u", test.curr, test.prev, 0, - mockItemer{}, nil, + &mockItemer{}, nil, control.Options{}, false) assert.Equal(t, test.expect, c.State()) }) } } + +func (suite *ExchangeDataCollectionSuite) TestGetItemWithRetries() { + table := []struct { + name string + items *mockItemer + expectErr func(*testing.T, error) + expectGetCalls int + }{ + { + name: "happy", + items: &mockItemer{}, + expectErr: func(t *testing.T, err error) { + assert.NoError(t, err) + }, + expectGetCalls: 1, + }, + { + name: "an error", + items: &mockItemer{getErr: assert.AnError}, + expectErr: func(t *testing.T, err error) { + assert.Error(t, err) + }, + expectGetCalls: 3, + }, + { + name: "deleted in flight", + items: &mockItemer{ + getErr: graph.ErrDeletedInFlight{ + Err: *common.EncapsulateError(assert.AnError), + }, + }, + expectErr: func(t *testing.T, err error) { + assert.True(t, graph.IsErrDeletedInFlight(err), "is ErrDeletedInFlight") + }, + expectGetCalls: 1, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + ctx, flush := tester.NewContext() + defer flush() + + // itemer is mocked, so only the errors are configured atm. + _, _, err := getItemWithRetries(ctx, "userID", "itemID", test.items) + test.expectErr(t, err) + }) + } +} diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index af4ae0824..1ed353be6 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -375,8 +375,7 @@ func runRestoreBackupTest( t.Logf( "Restoring collections to %s for resourceOwners(s) %v\n", dest.ContainerName, - resourceOwners, - ) + resourceOwners) start := time.Now() @@ -394,8 +393,10 @@ func runRestoreBackupTest( status := restoreGC.AwaitStatus() runTime := time.Since(start) - assert.Equal(t, totalItems, status.ObjectCount, "status.ObjectCount") - assert.Equal(t, totalItems, status.Successful, "status.Successful") + assert.NoError(t, status.Err, "restored status.Err") + assert.Zero(t, status.ErrorCount, "restored status.ErrorCount") + assert.Equal(t, totalItems, status.ObjectCount, "restored status.ObjectCount") + assert.Equal(t, totalItems, status.Successful, "restored status.Successful") assert.Len( t, deets.Entries, @@ -434,8 +435,13 @@ func runRestoreBackupTest( skipped := checkCollections(t, totalItems, expectedData, dcs) status = backupGC.AwaitStatus() - assert.Equal(t, totalItems+skipped, status.ObjectCount, "status.ObjectCount") - assert.Equal(t, totalItems+skipped, status.Successful, "status.Successful") + + assert.NoError(t, status.Err, "backup status.Err") + assert.Zero(t, status.ErrorCount, "backup status.ErrorCount") + assert.Equalf(t, totalItems+skipped, status.ObjectCount, + "backup status.ObjectCount; wanted %d items + %d skipped", totalItems, skipped) + assert.Equalf(t, totalItems+skipped, status.Successful, + "backup status.Successful; wanted %d items + %d skipped", totalItems, skipped) } func (suite *GraphConnectorIntegrationSuite) TestRestoreAndBackup() { @@ -907,3 +913,30 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames }) } } + +// TODO: this should only be run during smoke tests, not part of the standard CI. +// That's why it's set aside instead of being included in the other test set. +func (suite *GraphConnectorIntegrationSuite) TestRestoreAndBackup_largeMailAttachment() { + subjectText := "Test message for restore with large attachment" + + test := restoreBackupInfo{ + name: "EmailsWithLargeAttachments", + service: path.ExchangeService, + resource: Users, + collections: []colInfo{ + { + pathElements: []string{"Inbox"}, + category: path.EmailCategory, + items: []itemInfo{ + { + name: "35mbAttachment", + data: mockconnector.GetMockMessageWithSizedAttachment(subjectText, 35), + lookupKey: subjectText, + }, + }, + }, + }, + } + + runRestoreBackupTest(suite.T(), suite.acct, test, suite.connector.tenant, []string{suite.user}) +} diff --git a/src/internal/connector/mockconnector/mock_data_message.go b/src/internal/connector/mockconnector/mock_data_message.go index 4c63806c9..597447492 100644 --- a/src/internal/connector/mockconnector/mock_data_message.go +++ b/src/internal/connector/mockconnector/mock_data_message.go @@ -155,6 +155,41 @@ func GetMockMessageWith( return []byte(message) } +// GetMockMessageWithDirectAttachment returns a message an attachment that contains n MB of data. +// Max limit on N is 35 (imposed by exchange) . +// Serialized with: kiota-serialization-json-go v0.7.1 +func GetMockMessageWithSizedAttachment(subject string, n int) []byte { + // I know we said 35, but after base64encoding, 24mb of base content + // bloats up to 34mb (35 baloons to 49). So we have to restrict n + // appropriately. + if n > 24 { + n = 24 + } + + //nolint:lll + messageFmt := "{\"id\":\"AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwBGAAAAAADCNgjhM9QmQYWNcI7hCpPrBwDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAADSEBNbUIB9RL6ePDeF3FIYAAB4moqeAAA=\"," + + "\"@odata.type\":\"#microsoft.graph.message\",\"@odata.etag\":\"W/\\\"CQAAABYAAADSEBNbUIB9RL6ePDeF3FIYAAB3maFQ\\\"\",\"@odata.context\":\"https://graph.microsoft.com/v1.0/$metadata#users('a4a472f8-ccb0-43ec-bf52-3697a91b926c')/messages/$entity\",\"categories\":[]," + + "\"changeKey\":\"CQAAABYAAADSEBNbUIB9RL6ePDeF3FIYAAB3maFQ\",\"createdDateTime\":\"2022-09-29T17:39:06Z\",\"lastModifiedDateTime\":\"2022-09-29T17:39:08Z\"," + + "\"attachments\":[{\"id\":\"AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwBGAAAAAADCNgjhM9QmQYWNcI7hCpPrBwDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAADSEBNbUIB9RL6ePDeF3FIYAAB4moqeAAABEgAQANMmZLFhjWJJj4X9mj8piqg=\",\"@odata.type\":\"#microsoft.graph.fileAttachment\",\"@odata.mediaContentType\":\"application/octet-stream\"," + + "\"contentType\":\"application/octet-stream\",\"isInline\":false,\"lastModifiedDateTime\":\"2022-09-29T17:39:06Z\",\"name\":\"database.db\",\"size\":%d," + + "\"contentBytes\":\"%s\"}]," + + "\"bccRecipients\":[],\"body\":{\"content\":\"
\\r\\n