From e8ec528113a28fae4d6451a52cfb2ec427648b42 Mon Sep 17 00:00:00 2001 From: Keepers <104464746+ryanfkeepers@users.noreply.github.com> Date: Wed, 6 Jul 2022 12:49:32 -0600 Subject: [PATCH] break gc iterator cb to own func, fix err scope (#282) A misuse of variable declaration that overlapped with var shadowing on 'err' was causing the attachment retry error to get lost, meaning failures to retrieve attachments are occurring silently. --- src/internal/connector/graph_connector.go | 129 ++++++++++++------ .../connector/graph_connector_test.go | 4 +- src/internal/connector/support/errors.go | 4 +- 3 files changed, 94 insertions(+), 43 deletions(-) diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 54866f25d..86be2bc7d 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -233,12 +233,12 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([ folderList = append(folderList, *folderable.GetId()) } // Time to create Exchange data Holder - var byteArray []byte collections := make([]DataCollection, 0) var errs error var totalItems, success int for _, aFolder := range folderList { + // get all user's mail messages result, err := gc.client.UsersById(user).MailFoldersById(aFolder).Messages().Get() if err != nil { errs = support.WrapAndAppend(user, err, errs) @@ -248,53 +248,29 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([ continue } + // set up a page iterator for retrieving further message batches pageIterator, err := msgraphgocore.NewPageIterator(result, &gc.adapter, models.CreateMessageCollectionResponseFromDiscriminatorValue) if err != nil { errs = support.WrapAndAppend(user, fmt.Errorf("iterator failed initialization: %v", err), errs) continue } + + // prep writing mail attachments objectWriter := kw.NewJsonSerializationWriter() + + // prep the items for handoff to the backup consumer edc := NewExchangeDataCollection(user, []string{gc.tenant, user, mailCategory, aFolder}) - callbackFunc := func(messageItem interface{}) bool { - totalItems++ - message, ok := messageItem.(models.Messageable) - if !ok { - errs = support.WrapAndAppend(user, fmt.Errorf("non-message return for user: %s", user), errs) - return true - } - if *message.GetHasAttachments() { - // Retry Loop - for count := 0; count < numberOfRetries; count++ { - attached, err := gc.client.UsersById(user).MessagesById(*message.GetId()).Attachments().Get() - if err == nil && attached != nil { - message.SetAttachments(attached.GetValue()) - break - } - } - if err != nil { - logger.Ctx(ctx).Debug("exceeded maximum retries") - errs = support.WrapAndAppend(*message.GetId(), fmt.Errorf("attachment failed: %v ", err), errs) - } - } - err = objectWriter.WriteObjectValue("", message) - if err != nil { - errs = support.WrapAndAppend(*message.GetId(), support.SetNonRecoverableError(err), - errs) - return true - } - byteArray, err = objectWriter.GetSerializedContent() - objectWriter.Close() - if err != nil { - errs = support.WrapAndAppend(*message.GetId(), err, errs) - return true - } - if byteArray != nil { - edc.PopulateCollection(&ExchangeData{id: *message.GetId(), message: byteArray}) - } - return true + // iterate through the remaining pages of mail + stats := iteratorStats{ + count: totalItems, + errs: errs, } - err = pageIterator.Iterate(callbackFunc) + cbf := gc.serializeMessageIteratorCallback(ctx, objectWriter, edc, user, &stats) + + err = pageIterator.Iterate(cbf) + totalItems = stats.count + errs = stats.errs if err != nil { errs = support.WrapAndAppend(user, err, errs) @@ -315,6 +291,81 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([ return collections, errs } +type iteratorStats struct { + count int + errs error +} + +func (gc *GraphConnector) serializeMessageIteratorCallback( + ctx context.Context, + objectWriter *kw.JsonSerializationWriter, + edc ExchangeDataCollection, + user string, + stats *iteratorStats, +) func(messageItem interface{}) bool { + return func(messageItem interface{}) bool { + stats.count++ + + message, ok := messageItem.(models.Messageable) + if !ok { + stats.errs = support.WrapAndAppend( + user, + errors.New("non-message return for user: "+user), + stats.errs) + return true + } + + if *message.GetHasAttachments() { + // getting all the attachments might take a couple attempts due to filesize + var retriesErr error + for count := 0; count < numberOfRetries; count++ { + attached, err := gc.client. + UsersById(user). + MessagesById(*message.GetId()). + Attachments(). + Get() + retriesErr = err + if err == nil && attached != nil { + message.SetAttachments(attached.GetValue()) + break + } + } + if retriesErr != nil { + logger.Ctx(ctx).Debug("exceeded maximum retries") + stats.errs = support.WrapAndAppend( + *message.GetId(), + errors.Wrap(retriesErr, "attachment failed"), + stats.errs) + } + } + + err := objectWriter.WriteObjectValue("", message) + if err != nil { + stats.errs = support.WrapAndAppend( + *message.GetId(), + support.SetNonRecoverableError(err), + stats.errs) + return true + } + + byteArray, err := objectWriter.GetSerializedContent() + objectWriter.Close() + if err != nil { + stats.errs = support.WrapAndAppend( + *message.GetId(), + errors.Wrap(err, "serializing mail content"), + stats.errs) + return true + } + + if byteArray != nil { + edc.PopulateCollection(&ExchangeData{id: *message.GetId(), message: byteArray}) + } + + return true + } +} + // SetStatus helper function func (gc *GraphConnector) SetStatus(cos support.ConnectorOperationStatus) { gc.status = &cos diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index 780f7b04a..c4d8be2a1 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -65,9 +65,9 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_ExchangeDataColl suite.T().Skip(err) } collectionList, err := suite.connector.ExchangeDataCollection(context.Background(), "lidiah@8qzvrj.onmicrosoft.com") - assert.NotNil(suite.T(), collectionList) + assert.NotNil(suite.T(), collectionList, "collection list") assert.Error(suite.T(), err) // TODO Remove after https://github.com/alcionai/corso/issues/140 - assert.NotNil(suite.T(), suite.connector.status) + assert.NotNil(suite.T(), suite.connector.status, "connector status") suite.NotContains(err.Error(), "attachment failed") // TODO Create Retry Exceeded Error exchangeData := collectionList[0] suite.Greater(len(exchangeData.FullPath()), 2) diff --git a/src/internal/connector/support/errors.go b/src/internal/connector/support/errors.go index 41aa69edd..03eb5f87c 100644 --- a/src/internal/connector/support/errors.go +++ b/src/internal/connector/support/errors.go @@ -39,12 +39,12 @@ func SetNonRecoverableError(e error) error { // WrapErrorAndAppend helper function used to attach identifying information to an error // and return it as a mulitierror -func WrapAndAppend(identifier string, e error, previous error) error { +func WrapAndAppend(identifier string, e, previous error) error { return multierror.Append(previous, errors.Wrap(e, identifier)) } // WrapErrorAndAppendf format version of WrapErrorAndAppend -func WrapAndAppendf(identifier interface{}, e error, previous error) error { +func WrapAndAppendf(identifier interface{}, e, previous error) error { return multierror.Append(previous, errors.Wrapf(e, "%v", identifier)) }