break gc iterator cb to own func, fix err scope (#282)
A misuse of variable declaration that overlapped with var shadowing on 'err' was causing the attachment retry error to get lost, meaning failures to retrieve attachments are occurring silently.
This commit is contained in:
parent
70e7a1e2d3
commit
e8ec528113
@ -233,12 +233,12 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([
|
||||
folderList = append(folderList, *folderable.GetId())
|
||||
}
|
||||
// Time to create Exchange data Holder
|
||||
var byteArray []byte
|
||||
collections := make([]DataCollection, 0)
|
||||
var errs error
|
||||
var totalItems, success int
|
||||
for _, aFolder := range folderList {
|
||||
|
||||
// get all user's mail messages
|
||||
result, err := gc.client.UsersById(user).MailFoldersById(aFolder).Messages().Get()
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(user, err, errs)
|
||||
@ -248,53 +248,29 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([
|
||||
continue
|
||||
}
|
||||
|
||||
// set up a page iterator for retrieving further message batches
|
||||
pageIterator, err := msgraphgocore.NewPageIterator(result, &gc.adapter, models.CreateMessageCollectionResponseFromDiscriminatorValue)
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(user, fmt.Errorf("iterator failed initialization: %v", err), errs)
|
||||
continue
|
||||
}
|
||||
|
||||
// prep writing mail attachments
|
||||
objectWriter := kw.NewJsonSerializationWriter()
|
||||
|
||||
// prep the items for handoff to the backup consumer
|
||||
edc := NewExchangeDataCollection(user, []string{gc.tenant, user, mailCategory, aFolder})
|
||||
|
||||
callbackFunc := func(messageItem interface{}) bool {
|
||||
totalItems++
|
||||
message, ok := messageItem.(models.Messageable)
|
||||
if !ok {
|
||||
errs = support.WrapAndAppend(user, fmt.Errorf("non-message return for user: %s", user), errs)
|
||||
return true
|
||||
}
|
||||
if *message.GetHasAttachments() {
|
||||
// Retry Loop
|
||||
for count := 0; count < numberOfRetries; count++ {
|
||||
attached, err := gc.client.UsersById(user).MessagesById(*message.GetId()).Attachments().Get()
|
||||
if err == nil && attached != nil {
|
||||
message.SetAttachments(attached.GetValue())
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
logger.Ctx(ctx).Debug("exceeded maximum retries")
|
||||
errs = support.WrapAndAppend(*message.GetId(), fmt.Errorf("attachment failed: %v ", err), errs)
|
||||
}
|
||||
}
|
||||
err = objectWriter.WriteObjectValue("", message)
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(*message.GetId(), support.SetNonRecoverableError(err),
|
||||
errs)
|
||||
return true
|
||||
}
|
||||
byteArray, err = objectWriter.GetSerializedContent()
|
||||
objectWriter.Close()
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(*message.GetId(), err, errs)
|
||||
return true
|
||||
}
|
||||
if byteArray != nil {
|
||||
edc.PopulateCollection(&ExchangeData{id: *message.GetId(), message: byteArray})
|
||||
}
|
||||
return true
|
||||
// iterate through the remaining pages of mail
|
||||
stats := iteratorStats{
|
||||
count: totalItems,
|
||||
errs: errs,
|
||||
}
|
||||
err = pageIterator.Iterate(callbackFunc)
|
||||
cbf := gc.serializeMessageIteratorCallback(ctx, objectWriter, edc, user, &stats)
|
||||
|
||||
err = pageIterator.Iterate(cbf)
|
||||
totalItems = stats.count
|
||||
errs = stats.errs
|
||||
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppend(user, err, errs)
|
||||
@ -315,6 +291,81 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([
|
||||
return collections, errs
|
||||
}
|
||||
|
||||
type iteratorStats struct {
|
||||
count int
|
||||
errs error
|
||||
}
|
||||
|
||||
func (gc *GraphConnector) serializeMessageIteratorCallback(
|
||||
ctx context.Context,
|
||||
objectWriter *kw.JsonSerializationWriter,
|
||||
edc ExchangeDataCollection,
|
||||
user string,
|
||||
stats *iteratorStats,
|
||||
) func(messageItem interface{}) bool {
|
||||
return func(messageItem interface{}) bool {
|
||||
stats.count++
|
||||
|
||||
message, ok := messageItem.(models.Messageable)
|
||||
if !ok {
|
||||
stats.errs = support.WrapAndAppend(
|
||||
user,
|
||||
errors.New("non-message return for user: "+user),
|
||||
stats.errs)
|
||||
return true
|
||||
}
|
||||
|
||||
if *message.GetHasAttachments() {
|
||||
// getting all the attachments might take a couple attempts due to filesize
|
||||
var retriesErr error
|
||||
for count := 0; count < numberOfRetries; count++ {
|
||||
attached, err := gc.client.
|
||||
UsersById(user).
|
||||
MessagesById(*message.GetId()).
|
||||
Attachments().
|
||||
Get()
|
||||
retriesErr = err
|
||||
if err == nil && attached != nil {
|
||||
message.SetAttachments(attached.GetValue())
|
||||
break
|
||||
}
|
||||
}
|
||||
if retriesErr != nil {
|
||||
logger.Ctx(ctx).Debug("exceeded maximum retries")
|
||||
stats.errs = support.WrapAndAppend(
|
||||
*message.GetId(),
|
||||
errors.Wrap(retriesErr, "attachment failed"),
|
||||
stats.errs)
|
||||
}
|
||||
}
|
||||
|
||||
err := objectWriter.WriteObjectValue("", message)
|
||||
if err != nil {
|
||||
stats.errs = support.WrapAndAppend(
|
||||
*message.GetId(),
|
||||
support.SetNonRecoverableError(err),
|
||||
stats.errs)
|
||||
return true
|
||||
}
|
||||
|
||||
byteArray, err := objectWriter.GetSerializedContent()
|
||||
objectWriter.Close()
|
||||
if err != nil {
|
||||
stats.errs = support.WrapAndAppend(
|
||||
*message.GetId(),
|
||||
errors.Wrap(err, "serializing mail content"),
|
||||
stats.errs)
|
||||
return true
|
||||
}
|
||||
|
||||
if byteArray != nil {
|
||||
edc.PopulateCollection(&ExchangeData{id: *message.GetId(), message: byteArray})
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// SetStatus helper function
|
||||
func (gc *GraphConnector) SetStatus(cos support.ConnectorOperationStatus) {
|
||||
gc.status = &cos
|
||||
|
||||
@ -65,9 +65,9 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_ExchangeDataColl
|
||||
suite.T().Skip(err)
|
||||
}
|
||||
collectionList, err := suite.connector.ExchangeDataCollection(context.Background(), "lidiah@8qzvrj.onmicrosoft.com")
|
||||
assert.NotNil(suite.T(), collectionList)
|
||||
assert.NotNil(suite.T(), collectionList, "collection list")
|
||||
assert.Error(suite.T(), err) // TODO Remove after https://github.com/alcionai/corso/issues/140
|
||||
assert.NotNil(suite.T(), suite.connector.status)
|
||||
assert.NotNil(suite.T(), suite.connector.status, "connector status")
|
||||
suite.NotContains(err.Error(), "attachment failed") // TODO Create Retry Exceeded Error
|
||||
exchangeData := collectionList[0]
|
||||
suite.Greater(len(exchangeData.FullPath()), 2)
|
||||
|
||||
@ -39,12 +39,12 @@ func SetNonRecoverableError(e error) error {
|
||||
|
||||
// WrapErrorAndAppend helper function used to attach identifying information to an error
|
||||
// and return it as a mulitierror
|
||||
func WrapAndAppend(identifier string, e error, previous error) error {
|
||||
func WrapAndAppend(identifier string, e, previous error) error {
|
||||
return multierror.Append(previous, errors.Wrap(e, identifier))
|
||||
}
|
||||
|
||||
// WrapErrorAndAppendf format version of WrapErrorAndAppend
|
||||
func WrapAndAppendf(identifier interface{}, e error, previous error) error {
|
||||
func WrapAndAppendf(identifier interface{}, e, previous error) error {
|
||||
return multierror.Append(previous, errors.Wrapf(e, "%v", identifier))
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user