diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b18a4d04..b96aca9e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Document Corso's fault-tolerance and restartability features +- Add retries on timeouts and status code 500 for Exchange ## [v0.2.0] (alpha) - 2023-1-29 diff --git a/src/internal/connector/exchange/api/api.go b/src/internal/connector/exchange/api/api.go index c4858c5c1..c47cc9b5b 100644 --- a/src/internal/connector/exchange/api/api.go +++ b/src/internal/connector/exchange/api/api.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/alcionai/corso/src/internal/connector/graph" + "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/pkg/account" ) @@ -153,3 +154,26 @@ func HasAttachments(body models.ItemBodyable) bool { return strings.Contains(content, "src=\"cid:") } + +// Run a function with retries +func runWithRetry(run func() error) error { + var err error + + for i := 0; i < numberOfRetries; i++ { + err = run() + if err == nil { + return nil + } + + // only retry on timeouts and 500-internal-errors. + if !(graph.IsErrTimeout(err) || graph.IsInternalServerError(err)) { + break + } + + if i < numberOfRetries { + time.Sleep(time.Duration(3*(i+2)) * time.Second) + } + } + + return support.ConnectorStackErrorTraceWrap(err, "maximum retries or unretryable") +} diff --git a/src/internal/connector/exchange/api/contacts.go b/src/internal/connector/exchange/api/contacts.go index 0db1e964c..33f05d2a3 100644 --- a/src/internal/connector/exchange/api/contacts.go +++ b/src/internal/connector/exchange/api/contacts.go @@ -61,7 +61,16 @@ func (c Contacts) GetItem( ctx context.Context, user, itemID string, ) (serialization.Parsable, *details.ExchangeInfo, error) { - cont, err := c.stable.Client().UsersById(user).ContactsById(itemID).Get(ctx, nil) + var ( + cont models.Contactable + err error + ) + + err = runWithRetry(func() error { + cont, err = c.stable.Client().UsersById(user).ContactsById(itemID).Get(ctx, nil) + return err + }) + if err != nil { return nil, nil, err } @@ -81,7 +90,14 @@ func (c Contacts) GetAllContactFolderNamesForUser( return nil, err } - return c.stable.Client().UsersById(user).ContactFolders().Get(ctx, options) + var resp models.ContactFolderCollectionResponseable + + err = runWithRetry(func() error { + resp, err = c.stable.Client().UsersById(user).ContactFolders().Get(ctx, options) + return err + }) + + return resp, err } func (c Contacts) GetContainerByID( @@ -93,10 +109,14 @@ func (c Contacts) GetContainerByID( return nil, errors.Wrap(err, "options for contact folder") } - return c.stable.Client(). - UsersById(userID). - ContactFoldersById(dirID). - Get(ctx, ofcf) + var resp models.ContactFolderable + + err = runWithRetry(func() error { + resp, err = c.stable.Client().UsersById(userID).ContactFoldersById(dirID).Get(ctx, ofcf) + return err + }) + + return resp, err } // EnumerateContainers iterates through all of the users current @@ -117,6 +137,7 @@ func (c Contacts) EnumerateContainers( var ( errs *multierror.Error + resp models.ContactFolderCollectionResponseable fields = []string{"displayName", "parentFolderId"} ) @@ -131,7 +152,11 @@ func (c Contacts) EnumerateContainers( ChildFolders() for { - resp, err := builder.Get(ctx, ofcf) + err = runWithRetry(func() error { + resp, err = builder.Get(ctx, ofcf) + return err + }) + if err != nil { return errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } @@ -174,7 +199,17 @@ type contactPager struct { } func (p *contactPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) { - return p.builder.Get(ctx, p.options) + var ( + resp api.DeltaPageLinker + err error + ) + + err = runWithRetry(func() error { + resp, err = p.builder.Get(ctx, p.options) + return err + }) + + return resp, err } func (p *contactPager) setNext(nextLink string) { diff --git a/src/internal/connector/exchange/api/events.go b/src/internal/connector/exchange/api/events.go index e643c1f89..63545143d 100644 --- a/src/internal/connector/exchange/api/events.go +++ b/src/internal/connector/exchange/api/events.go @@ -73,7 +73,13 @@ func (c Events) GetContainerByID( return nil, errors.Wrap(err, "options for event calendar") } - cal, err := service.Client().UsersById(userID).CalendarsById(containerID).Get(ctx, ofc) + var cal models.Calendarable + + err = runWithRetry(func() error { + cal, err = service.Client().UsersById(userID).CalendarsById(containerID).Get(ctx, ofc) + return err + }) + if err != nil { return nil, err } @@ -86,7 +92,16 @@ func (c Events) GetItem( ctx context.Context, user, itemID string, ) (serialization.Parsable, *details.ExchangeInfo, error) { - event, err := c.stable.Client().UsersById(user).EventsById(itemID).Get(ctx, nil) + var ( + event models.Eventable + err error + ) + + err = runWithRetry(func() error { + event, err = c.stable.Client().UsersById(user).EventsById(itemID).Get(ctx, nil) + return err + }) + if err != nil { return nil, nil, err } @@ -128,7 +143,14 @@ func (c Client) GetAllCalendarNamesForUser( return nil, err } - return c.stable.Client().UsersById(user).Calendars().Get(ctx, options) + var resp models.CalendarCollectionResponseable + + err = runWithRetry(func() error { + resp, err = c.stable.Client().UsersById(user).Calendars().Get(ctx, options) + return err + }) + + return resp, err } // EnumerateContainers iterates through all of the users current @@ -147,7 +169,10 @@ func (c Events) EnumerateContainers( return err } - var errs *multierror.Error + var ( + resp models.CalendarCollectionResponseable + errs *multierror.Error + ) ofc, err := optionsForCalendars([]string{"name"}) if err != nil { @@ -157,7 +182,13 @@ func (c Events) EnumerateContainers( builder := service.Client().UsersById(userID).Calendars() for { - resp, err := builder.Get(ctx, ofc) + var err error + + err = runWithRetry(func() error { + resp, err = builder.Get(ctx, ofc) + return err + }) + if err != nil { return errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } @@ -205,7 +236,16 @@ type eventPager struct { } func (p *eventPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) { - resp, err := p.builder.Get(ctx, p.options) + var ( + resp api.DeltaPageLinker + err error + ) + + err = runWithRetry(func() error { + resp, err = p.builder.Get(ctx, p.options) + return err + }) + return resp, err } diff --git a/src/internal/connector/exchange/api/mail.go b/src/internal/connector/exchange/api/mail.go index bbac48a66..597676874 100644 --- a/src/internal/connector/exchange/api/mail.go +++ b/src/internal/connector/exchange/api/mail.go @@ -95,7 +95,14 @@ func (c Mail) GetContainerByID( return nil, errors.Wrap(err, "options for mail folder") } - return service.Client().UsersById(userID).MailFoldersById(dirID).Get(ctx, ofmf) + var resp graph.Container + + err = runWithRetry(func() error { + resp, err = service.Client().UsersById(userID).MailFoldersById(dirID).Get(ctx, ofmf) + return err + }) + + return resp, err } // GetItem retrieves a Messageable item. If the item contains an attachment, that @@ -104,7 +111,16 @@ func (c Mail) GetItem( ctx context.Context, user, itemID string, ) (serialization.Parsable, *details.ExchangeInfo, error) { - mail, err := c.stable.Client().UsersById(user).MessagesById(itemID).Get(ctx, nil) + var ( + mail models.Messageable + err error + ) + + err = runWithRetry(func() error { + mail, err = c.stable.Client().UsersById(user).MessagesById(itemID).Get(ctx, nil) + return err + }) + if err != nil { return nil, nil, err } @@ -154,6 +170,7 @@ func (c Mail) EnumerateContainers( } var ( + resp users.ItemMailFoldersDeltaResponseable errs *multierror.Error builder = service.Client(). UsersById(userID). @@ -162,7 +179,13 @@ func (c Mail) EnumerateContainers( ) for { - resp, err := builder.Get(ctx, nil) + var err error + + err = runWithRetry(func() error { + resp, err = builder.Get(ctx, nil) + return err + }) + if err != nil { return errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } @@ -200,7 +223,17 @@ type mailPager struct { } func (p *mailPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) { - return p.builder.Get(ctx, p.options) + var ( + page api.DeltaPageLinker + err error + ) + + err = runWithRetry(func() error { + page, err = p.builder.Get(ctx, p.options) + return err + }) + + return page, err } func (p *mailPager) setNext(nextLink string) { diff --git a/src/internal/connector/graph/errors.go b/src/internal/connector/graph/errors.go index c75e4a6cb..21116057d 100644 --- a/src/internal/connector/graph/errors.go +++ b/src/internal/connector/graph/errors.go @@ -17,6 +17,7 @@ import ( // --------------------------------------------------------------------------- const ( + errCodeActivityLimitReached = "activityLimitReached" errCodeItemNotFound = "ErrorItemNotFound" errCodeEmailFolderNotFound = "ErrorSyncFolderNotFound" errCodeResyncRequired = "ResyncRequired" @@ -31,8 +32,10 @@ var ( // normally the graph client will catch this for us, but in case we // run our own client Do(), we need to translate it to a timeout type // failure locally. - Err429TooManyRequests = errors.New("429 too many requests") - Err503ServiceUnavailable = errors.New("503 Service Unavailable") + Err429TooManyRequests = errors.New("429 too many requests") + Err503ServiceUnavailable = errors.New("503 Service Unavailable") + Err504GatewayTimeout = errors.New("504 Gateway Timeout") + Err500InternalServerError = errors.New("500 Internal Server Error") ) // The folder or item was deleted between the time we identified @@ -113,6 +116,10 @@ func IsErrThrottled(err error) bool { return true } + if hasErrorCode(err, errCodeActivityLimitReached) { + return true + } + e := ErrThrottled{} return errors.As(err, &e) @@ -135,21 +142,18 @@ func IsErrUnauthorized(err error) bool { return errors.As(err, &e) } -type ErrServiceUnavailable struct { +type ErrInternalServerError struct { common.Err } -func IsSericeUnavailable(err error) bool { - if errors.Is(err, Err503ServiceUnavailable) { +func IsInternalServerError(err error) bool { + if errors.Is(err, Err500InternalServerError) { return true } - e := ErrUnauthorized{} - if errors.As(err, &e) { - return true - } + e := ErrInternalServerError{} - return true + return errors.As(err, &e) } // --------------------------------------------------------------------------- diff --git a/src/internal/connector/graph/errors_test.go b/src/internal/connector/graph/errors_test.go new file mode 100644 index 000000000..c7f889b83 --- /dev/null +++ b/src/internal/connector/graph/errors_test.go @@ -0,0 +1,248 @@ +package graph + +import ( + "context" + "testing" + + "github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/common" +) + +type GraphErrorsUnitSuite struct { + suite.Suite +} + +func TestGraphErrorsUnitSuite(t *testing.T) { + suite.Run(t, new(GraphErrorsUnitSuite)) +} + +func odErr(code string) *odataerrors.ODataError { + odErr := &odataerrors.ODataError{} + merr := odataerrors.MainError{} + merr.SetCode(&code) + odErr.SetError(&merr) + + return odErr +} + +func (suite *GraphErrorsUnitSuite) TestIsErrDeletedInFlight() { + table := []struct { + name string + err error + expect assert.BoolAssertionFunc + }{ + { + name: "nil", + err: nil, + expect: assert.False, + }, + { + name: "non-matching", + err: assert.AnError, + expect: assert.False, + }, + { + name: "as", + err: ErrDeletedInFlight{Err: *common.EncapsulateError(assert.AnError)}, + expect: assert.True, + }, + { + name: "non-matching oDataErr", + err: odErr("fnords"), + expect: assert.False, + }, + { + name: "not-found oDataErr", + err: odErr(errCodeItemNotFound), + expect: assert.True, + }, + { + name: "sync-not-found oDataErr", + err: odErr(errCodeSyncFolderNotFound), + expect: assert.True, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + test.expect(t, IsErrDeletedInFlight(test.err)) + }) + } +} + +func (suite *GraphErrorsUnitSuite) TestIsErrInvalidDelta() { + table := []struct { + name string + err error + expect assert.BoolAssertionFunc + }{ + { + name: "nil", + err: nil, + expect: assert.False, + }, + { + name: "non-matching", + err: assert.AnError, + expect: assert.False, + }, + { + name: "as", + err: ErrInvalidDelta{Err: *common.EncapsulateError(assert.AnError)}, + expect: assert.True, + }, + { + name: "non-matching oDataErr", + err: odErr("fnords"), + expect: assert.False, + }, + { + name: "resync-required oDataErr", + err: odErr(errCodeResyncRequired), + expect: assert.True, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + test.expect(t, IsErrInvalidDelta(test.err)) + }) + } +} + +func (suite *GraphErrorsUnitSuite) TestIsErrTimeout() { + table := []struct { + name string + err error + expect assert.BoolAssertionFunc + }{ + { + name: "nil", + err: nil, + expect: assert.False, + }, + { + name: "non-matching", + err: assert.AnError, + expect: assert.False, + }, + { + name: "as", + err: ErrTimeout{Err: *common.EncapsulateError(assert.AnError)}, + expect: assert.True, + }, + { + name: "context deadline", + err: context.DeadlineExceeded, + expect: assert.True, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + test.expect(t, IsErrTimeout(test.err)) + }) + } +} + +func (suite *GraphErrorsUnitSuite) TestIsErrThrottled() { + table := []struct { + name string + err error + expect assert.BoolAssertionFunc + }{ + { + name: "nil", + err: nil, + expect: assert.False, + }, + { + name: "non-matching", + err: assert.AnError, + expect: assert.False, + }, + { + name: "as", + err: ErrThrottled{Err: *common.EncapsulateError(assert.AnError)}, + expect: assert.True, + }, + { + name: "is429", + err: Err429TooManyRequests, + expect: assert.True, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + test.expect(t, IsErrThrottled(test.err)) + }) + } +} + +func (suite *GraphErrorsUnitSuite) TestIsErrUnauthorized() { + table := []struct { + name string + err error + expect assert.BoolAssertionFunc + }{ + { + name: "nil", + err: nil, + expect: assert.False, + }, + { + name: "non-matching", + err: assert.AnError, + expect: assert.False, + }, + { + name: "as", + err: ErrUnauthorized{Err: *common.EncapsulateError(assert.AnError)}, + expect: assert.True, + }, + { + name: "is429", + err: Err401Unauthorized, + expect: assert.True, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + test.expect(t, IsErrUnauthorized(test.err)) + }) + } +} + +func (suite *GraphErrorsUnitSuite) TestIsInternalServerError() { + table := []struct { + name string + err error + expect assert.BoolAssertionFunc + }{ + { + name: "nil", + err: nil, + expect: assert.False, + }, + { + name: "non-matching", + err: assert.AnError, + expect: assert.False, + }, + { + name: "as", + err: ErrInternalServerError{Err: *common.EncapsulateError(assert.AnError)}, + expect: assert.True, + }, + { + name: "is429", + err: Err500InternalServerError, + expect: assert.True, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + test.expect(t, IsInternalServerError(test.err)) + }) + } +} diff --git a/src/internal/connector/graph/service.go b/src/internal/connector/graph/service.go index 6c0e6dbc1..093995a42 100644 --- a/src/internal/connector/graph/service.go +++ b/src/internal/connector/graph/service.go @@ -149,7 +149,7 @@ func HTTPClient(opts ...option) *http.Client { middlewares := msgraphgocore.GetDefaultMiddlewaresWithOptions(&clientOptions) middlewares = append(middlewares, &LoggingMiddleware{}) httpClient := msgraphgocore.GetDefaultClient(&clientOptions, middlewares...) - httpClient.Timeout = time.Second * 90 + httpClient.Timeout = time.Minute * 3 (&clientConfig{}). populate(opts...). @@ -250,7 +250,6 @@ func (handler *LoggingMiddleware) Intercept( respDump, _ := httputil.DumpResponse(resp, false) metadata := []any{ - "idx", middlewareIndex, "method", req.Method, "status", resp.Status, "statusCode", resp.StatusCode, @@ -273,7 +272,6 @@ func (handler *LoggingMiddleware) Intercept( respDump, _ := httputil.DumpResponse(resp, true) metadata := []any{ - "idx", middlewareIndex, "method", req.Method, "status", resp.Status, "statusCode", resp.StatusCode, diff --git a/src/internal/connector/graph/service_test.go b/src/internal/connector/graph/service_test.go index 14bdc9c36..c2ef2d699 100644 --- a/src/internal/connector/graph/service_test.go +++ b/src/internal/connector/graph/service_test.go @@ -53,7 +53,7 @@ func (suite *GraphUnitSuite) TestHTTPClient() { name: "no options", opts: []option{}, check: func(t *testing.T, c *http.Client) { - assert.Equal(t, 90*time.Second, c.Timeout, "default timeout") + assert.Equal(t, 3*time.Minute, c.Timeout, "default timeout") }, }, { diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index a786de0ab..c4e1825bd 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -271,11 +271,11 @@ func (oc *Collection) populateItems(ctx context.Context) { continue - } else if !graph.IsErrTimeout(err) && !graph.IsErrThrottled(err) && !graph.IsSericeUnavailable(err) { - // TODO: graphAPI will provides headers that state the duration to wait - // in order to succeed again. The one second sleep won't cut it here. - // - // for all non-timeout, non-unauth, non-throttling errors, do not retry + } else if !graph.IsErrTimeout(err) && + !graph.IsInternalServerError(err) { + // Don't retry for non-timeout, on-unauth, as + // we are already retrying it in the default + // retry middleware break } diff --git a/src/internal/connector/onedrive/item.go b/src/internal/connector/onedrive/item.go index c4fd1b380..b1027de9d 100644 --- a/src/internal/connector/onedrive/item.go +++ b/src/internal/connector/onedrive/item.go @@ -105,6 +105,10 @@ func downloadItem(hc *http.Client, item models.DriveItemable) (*http.Response, e return resp, graph.Err401Unauthorized } + if resp.StatusCode == http.StatusInternalServerError { + return resp, graph.Err500InternalServerError + } + if resp.StatusCode == http.StatusServiceUnavailable { return resp, graph.Err503ServiceUnavailable }