From f3b2e9a632c55804916f69fbcda157143e34b8f2 Mon Sep 17 00:00:00 2001 From: Abin Simon Date: Wed, 1 Feb 2023 03:27:38 +0530 Subject: [PATCH] Retry handling for delta queries in Exchange (#2328) ## Description Added retry handling for delta queries in OneDrive. Also, bumping time timeout for graph api calls from 90s to 3m as we were seeing client timeouts for graph api calls. ~Haven't added retry for every request in exchange as I'm hoping https://github.com/alcionai/corso/issues/2287 will be a better way to handle this.~ ## Does this PR need a docs update or release note? - [x] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [ ] :no_entry: No ## Type of change - [ ] :sunflower: Feature - [x] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Test - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup ## Issue(s) * # ## Test Plan - [x] :muscle: Manual - [ ] :zap: Unit test - [ ] :green_heart: E2E --- CHANGELOG.md | 1 + src/internal/connector/exchange/api/api.go | 24 ++ .../connector/exchange/api/contacts.go | 51 +++- src/internal/connector/exchange/api/events.go | 52 +++- src/internal/connector/exchange/api/mail.go | 41 ++- src/internal/connector/graph/errors.go | 24 +- src/internal/connector/graph/errors_test.go | 248 ++++++++++++++++++ src/internal/connector/graph/service.go | 4 +- src/internal/connector/graph/service_test.go | 2 +- src/internal/connector/onedrive/collection.go | 10 +- src/internal/connector/onedrive/item.go | 4 + 11 files changed, 424 insertions(+), 37 deletions(-) create mode 100644 src/internal/connector/graph/errors_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b18a4d04..b96aca9e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Document Corso's fault-tolerance and restartability features +- Add retries on timeouts and status code 500 for Exchange ## [v0.2.0] (alpha) - 2023-1-29 diff --git a/src/internal/connector/exchange/api/api.go b/src/internal/connector/exchange/api/api.go index c4858c5c1..c47cc9b5b 100644 --- a/src/internal/connector/exchange/api/api.go +++ b/src/internal/connector/exchange/api/api.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/alcionai/corso/src/internal/connector/graph" + "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/pkg/account" ) @@ -153,3 +154,26 @@ func HasAttachments(body models.ItemBodyable) bool { return strings.Contains(content, "src=\"cid:") } + +// Run a function with retries +func runWithRetry(run func() error) error { + var err error + + for i := 0; i < numberOfRetries; i++ { + err = run() + if err == nil { + return nil + } + + // only retry on timeouts and 500-internal-errors. + if !(graph.IsErrTimeout(err) || graph.IsInternalServerError(err)) { + break + } + + if i < numberOfRetries { + time.Sleep(time.Duration(3*(i+2)) * time.Second) + } + } + + return support.ConnectorStackErrorTraceWrap(err, "maximum retries or unretryable") +} diff --git a/src/internal/connector/exchange/api/contacts.go b/src/internal/connector/exchange/api/contacts.go index 0db1e964c..33f05d2a3 100644 --- a/src/internal/connector/exchange/api/contacts.go +++ b/src/internal/connector/exchange/api/contacts.go @@ -61,7 +61,16 @@ func (c Contacts) GetItem( ctx context.Context, user, itemID string, ) (serialization.Parsable, *details.ExchangeInfo, error) { - cont, err := c.stable.Client().UsersById(user).ContactsById(itemID).Get(ctx, nil) + var ( + cont models.Contactable + err error + ) + + err = runWithRetry(func() error { + cont, err = c.stable.Client().UsersById(user).ContactsById(itemID).Get(ctx, nil) + return err + }) + if err != nil { return nil, nil, err } @@ -81,7 +90,14 @@ func (c Contacts) GetAllContactFolderNamesForUser( return nil, err } - return c.stable.Client().UsersById(user).ContactFolders().Get(ctx, options) + var resp models.ContactFolderCollectionResponseable + + err = runWithRetry(func() error { + resp, err = c.stable.Client().UsersById(user).ContactFolders().Get(ctx, options) + return err + }) + + return resp, err } func (c Contacts) GetContainerByID( @@ -93,10 +109,14 @@ func (c Contacts) GetContainerByID( return nil, errors.Wrap(err, "options for contact folder") } - return c.stable.Client(). - UsersById(userID). - ContactFoldersById(dirID). - Get(ctx, ofcf) + var resp models.ContactFolderable + + err = runWithRetry(func() error { + resp, err = c.stable.Client().UsersById(userID).ContactFoldersById(dirID).Get(ctx, ofcf) + return err + }) + + return resp, err } // EnumerateContainers iterates through all of the users current @@ -117,6 +137,7 @@ func (c Contacts) EnumerateContainers( var ( errs *multierror.Error + resp models.ContactFolderCollectionResponseable fields = []string{"displayName", "parentFolderId"} ) @@ -131,7 +152,11 @@ func (c Contacts) EnumerateContainers( ChildFolders() for { - resp, err := builder.Get(ctx, ofcf) + err = runWithRetry(func() error { + resp, err = builder.Get(ctx, ofcf) + return err + }) + if err != nil { return errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } @@ -174,7 +199,17 @@ type contactPager struct { } func (p *contactPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) { - return p.builder.Get(ctx, p.options) + var ( + resp api.DeltaPageLinker + err error + ) + + err = runWithRetry(func() error { + resp, err = p.builder.Get(ctx, p.options) + return err + }) + + return resp, err } func (p *contactPager) setNext(nextLink string) { diff --git a/src/internal/connector/exchange/api/events.go b/src/internal/connector/exchange/api/events.go index e643c1f89..63545143d 100644 --- a/src/internal/connector/exchange/api/events.go +++ b/src/internal/connector/exchange/api/events.go @@ -73,7 +73,13 @@ func (c Events) GetContainerByID( return nil, errors.Wrap(err, "options for event calendar") } - cal, err := service.Client().UsersById(userID).CalendarsById(containerID).Get(ctx, ofc) + var cal models.Calendarable + + err = runWithRetry(func() error { + cal, err = service.Client().UsersById(userID).CalendarsById(containerID).Get(ctx, ofc) + return err + }) + if err != nil { return nil, err } @@ -86,7 +92,16 @@ func (c Events) GetItem( ctx context.Context, user, itemID string, ) (serialization.Parsable, *details.ExchangeInfo, error) { - event, err := c.stable.Client().UsersById(user).EventsById(itemID).Get(ctx, nil) + var ( + event models.Eventable + err error + ) + + err = runWithRetry(func() error { + event, err = c.stable.Client().UsersById(user).EventsById(itemID).Get(ctx, nil) + return err + }) + if err != nil { return nil, nil, err } @@ -128,7 +143,14 @@ func (c Client) GetAllCalendarNamesForUser( return nil, err } - return c.stable.Client().UsersById(user).Calendars().Get(ctx, options) + var resp models.CalendarCollectionResponseable + + err = runWithRetry(func() error { + resp, err = c.stable.Client().UsersById(user).Calendars().Get(ctx, options) + return err + }) + + return resp, err } // EnumerateContainers iterates through all of the users current @@ -147,7 +169,10 @@ func (c Events) EnumerateContainers( return err } - var errs *multierror.Error + var ( + resp models.CalendarCollectionResponseable + errs *multierror.Error + ) ofc, err := optionsForCalendars([]string{"name"}) if err != nil { @@ -157,7 +182,13 @@ func (c Events) EnumerateContainers( builder := service.Client().UsersById(userID).Calendars() for { - resp, err := builder.Get(ctx, ofc) + var err error + + err = runWithRetry(func() error { + resp, err = builder.Get(ctx, ofc) + return err + }) + if err != nil { return errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } @@ -205,7 +236,16 @@ type eventPager struct { } func (p *eventPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) { - resp, err := p.builder.Get(ctx, p.options) + var ( + resp api.DeltaPageLinker + err error + ) + + err = runWithRetry(func() error { + resp, err = p.builder.Get(ctx, p.options) + return err + }) + return resp, err } diff --git a/src/internal/connector/exchange/api/mail.go b/src/internal/connector/exchange/api/mail.go index bbac48a66..597676874 100644 --- a/src/internal/connector/exchange/api/mail.go +++ b/src/internal/connector/exchange/api/mail.go @@ -95,7 +95,14 @@ func (c Mail) GetContainerByID( return nil, errors.Wrap(err, "options for mail folder") } - return service.Client().UsersById(userID).MailFoldersById(dirID).Get(ctx, ofmf) + var resp graph.Container + + err = runWithRetry(func() error { + resp, err = service.Client().UsersById(userID).MailFoldersById(dirID).Get(ctx, ofmf) + return err + }) + + return resp, err } // GetItem retrieves a Messageable item. If the item contains an attachment, that @@ -104,7 +111,16 @@ func (c Mail) GetItem( ctx context.Context, user, itemID string, ) (serialization.Parsable, *details.ExchangeInfo, error) { - mail, err := c.stable.Client().UsersById(user).MessagesById(itemID).Get(ctx, nil) + var ( + mail models.Messageable + err error + ) + + err = runWithRetry(func() error { + mail, err = c.stable.Client().UsersById(user).MessagesById(itemID).Get(ctx, nil) + return err + }) + if err != nil { return nil, nil, err } @@ -154,6 +170,7 @@ func (c Mail) EnumerateContainers( } var ( + resp users.ItemMailFoldersDeltaResponseable errs *multierror.Error builder = service.Client(). UsersById(userID). @@ -162,7 +179,13 @@ func (c Mail) EnumerateContainers( ) for { - resp, err := builder.Get(ctx, nil) + var err error + + err = runWithRetry(func() error { + resp, err = builder.Get(ctx, nil) + return err + }) + if err != nil { return errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } @@ -200,7 +223,17 @@ type mailPager struct { } func (p *mailPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) { - return p.builder.Get(ctx, p.options) + var ( + page api.DeltaPageLinker + err error + ) + + err = runWithRetry(func() error { + page, err = p.builder.Get(ctx, p.options) + return err + }) + + return page, err } func (p *mailPager) setNext(nextLink string) { diff --git a/src/internal/connector/graph/errors.go b/src/internal/connector/graph/errors.go index c75e4a6cb..21116057d 100644 --- a/src/internal/connector/graph/errors.go +++ b/src/internal/connector/graph/errors.go @@ -17,6 +17,7 @@ import ( // --------------------------------------------------------------------------- const ( + errCodeActivityLimitReached = "activityLimitReached" errCodeItemNotFound = "ErrorItemNotFound" errCodeEmailFolderNotFound = "ErrorSyncFolderNotFound" errCodeResyncRequired = "ResyncRequired" @@ -31,8 +32,10 @@ var ( // normally the graph client will catch this for us, but in case we // run our own client Do(), we need to translate it to a timeout type // failure locally. - Err429TooManyRequests = errors.New("429 too many requests") - Err503ServiceUnavailable = errors.New("503 Service Unavailable") + Err429TooManyRequests = errors.New("429 too many requests") + Err503ServiceUnavailable = errors.New("503 Service Unavailable") + Err504GatewayTimeout = errors.New("504 Gateway Timeout") + Err500InternalServerError = errors.New("500 Internal Server Error") ) // The folder or item was deleted between the time we identified @@ -113,6 +116,10 @@ func IsErrThrottled(err error) bool { return true } + if hasErrorCode(err, errCodeActivityLimitReached) { + return true + } + e := ErrThrottled{} return errors.As(err, &e) @@ -135,21 +142,18 @@ func IsErrUnauthorized(err error) bool { return errors.As(err, &e) } -type ErrServiceUnavailable struct { +type ErrInternalServerError struct { common.Err } -func IsSericeUnavailable(err error) bool { - if errors.Is(err, Err503ServiceUnavailable) { +func IsInternalServerError(err error) bool { + if errors.Is(err, Err500InternalServerError) { return true } - e := ErrUnauthorized{} - if errors.As(err, &e) { - return true - } + e := ErrInternalServerError{} - return true + return errors.As(err, &e) } // --------------------------------------------------------------------------- diff --git a/src/internal/connector/graph/errors_test.go b/src/internal/connector/graph/errors_test.go new file mode 100644 index 000000000..c7f889b83 --- /dev/null +++ b/src/internal/connector/graph/errors_test.go @@ -0,0 +1,248 @@ +package graph + +import ( + "context" + "testing" + + "github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/common" +) + +type GraphErrorsUnitSuite struct { + suite.Suite +} + +func TestGraphErrorsUnitSuite(t *testing.T) { + suite.Run(t, new(GraphErrorsUnitSuite)) +} + +func odErr(code string) *odataerrors.ODataError { + odErr := &odataerrors.ODataError{} + merr := odataerrors.MainError{} + merr.SetCode(&code) + odErr.SetError(&merr) + + return odErr +} + +func (suite *GraphErrorsUnitSuite) TestIsErrDeletedInFlight() { + table := []struct { + name string + err error + expect assert.BoolAssertionFunc + }{ + { + name: "nil", + err: nil, + expect: assert.False, + }, + { + name: "non-matching", + err: assert.AnError, + expect: assert.False, + }, + { + name: "as", + err: ErrDeletedInFlight{Err: *common.EncapsulateError(assert.AnError)}, + expect: assert.True, + }, + { + name: "non-matching oDataErr", + err: odErr("fnords"), + expect: assert.False, + }, + { + name: "not-found oDataErr", + err: odErr(errCodeItemNotFound), + expect: assert.True, + }, + { + name: "sync-not-found oDataErr", + err: odErr(errCodeSyncFolderNotFound), + expect: assert.True, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + test.expect(t, IsErrDeletedInFlight(test.err)) + }) + } +} + +func (suite *GraphErrorsUnitSuite) TestIsErrInvalidDelta() { + table := []struct { + name string + err error + expect assert.BoolAssertionFunc + }{ + { + name: "nil", + err: nil, + expect: assert.False, + }, + { + name: "non-matching", + err: assert.AnError, + expect: assert.False, + }, + { + name: "as", + err: ErrInvalidDelta{Err: *common.EncapsulateError(assert.AnError)}, + expect: assert.True, + }, + { + name: "non-matching oDataErr", + err: odErr("fnords"), + expect: assert.False, + }, + { + name: "resync-required oDataErr", + err: odErr(errCodeResyncRequired), + expect: assert.True, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + test.expect(t, IsErrInvalidDelta(test.err)) + }) + } +} + +func (suite *GraphErrorsUnitSuite) TestIsErrTimeout() { + table := []struct { + name string + err error + expect assert.BoolAssertionFunc + }{ + { + name: "nil", + err: nil, + expect: assert.False, + }, + { + name: "non-matching", + err: assert.AnError, + expect: assert.False, + }, + { + name: "as", + err: ErrTimeout{Err: *common.EncapsulateError(assert.AnError)}, + expect: assert.True, + }, + { + name: "context deadline", + err: context.DeadlineExceeded, + expect: assert.True, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + test.expect(t, IsErrTimeout(test.err)) + }) + } +} + +func (suite *GraphErrorsUnitSuite) TestIsErrThrottled() { + table := []struct { + name string + err error + expect assert.BoolAssertionFunc + }{ + { + name: "nil", + err: nil, + expect: assert.False, + }, + { + name: "non-matching", + err: assert.AnError, + expect: assert.False, + }, + { + name: "as", + err: ErrThrottled{Err: *common.EncapsulateError(assert.AnError)}, + expect: assert.True, + }, + { + name: "is429", + err: Err429TooManyRequests, + expect: assert.True, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + test.expect(t, IsErrThrottled(test.err)) + }) + } +} + +func (suite *GraphErrorsUnitSuite) TestIsErrUnauthorized() { + table := []struct { + name string + err error + expect assert.BoolAssertionFunc + }{ + { + name: "nil", + err: nil, + expect: assert.False, + }, + { + name: "non-matching", + err: assert.AnError, + expect: assert.False, + }, + { + name: "as", + err: ErrUnauthorized{Err: *common.EncapsulateError(assert.AnError)}, + expect: assert.True, + }, + { + name: "is429", + err: Err401Unauthorized, + expect: assert.True, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + test.expect(t, IsErrUnauthorized(test.err)) + }) + } +} + +func (suite *GraphErrorsUnitSuite) TestIsInternalServerError() { + table := []struct { + name string + err error + expect assert.BoolAssertionFunc + }{ + { + name: "nil", + err: nil, + expect: assert.False, + }, + { + name: "non-matching", + err: assert.AnError, + expect: assert.False, + }, + { + name: "as", + err: ErrInternalServerError{Err: *common.EncapsulateError(assert.AnError)}, + expect: assert.True, + }, + { + name: "is429", + err: Err500InternalServerError, + expect: assert.True, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + test.expect(t, IsInternalServerError(test.err)) + }) + } +} diff --git a/src/internal/connector/graph/service.go b/src/internal/connector/graph/service.go index 6c0e6dbc1..093995a42 100644 --- a/src/internal/connector/graph/service.go +++ b/src/internal/connector/graph/service.go @@ -149,7 +149,7 @@ func HTTPClient(opts ...option) *http.Client { middlewares := msgraphgocore.GetDefaultMiddlewaresWithOptions(&clientOptions) middlewares = append(middlewares, &LoggingMiddleware{}) httpClient := msgraphgocore.GetDefaultClient(&clientOptions, middlewares...) - httpClient.Timeout = time.Second * 90 + httpClient.Timeout = time.Minute * 3 (&clientConfig{}). populate(opts...). @@ -250,7 +250,6 @@ func (handler *LoggingMiddleware) Intercept( respDump, _ := httputil.DumpResponse(resp, false) metadata := []any{ - "idx", middlewareIndex, "method", req.Method, "status", resp.Status, "statusCode", resp.StatusCode, @@ -273,7 +272,6 @@ func (handler *LoggingMiddleware) Intercept( respDump, _ := httputil.DumpResponse(resp, true) metadata := []any{ - "idx", middlewareIndex, "method", req.Method, "status", resp.Status, "statusCode", resp.StatusCode, diff --git a/src/internal/connector/graph/service_test.go b/src/internal/connector/graph/service_test.go index 14bdc9c36..c2ef2d699 100644 --- a/src/internal/connector/graph/service_test.go +++ b/src/internal/connector/graph/service_test.go @@ -53,7 +53,7 @@ func (suite *GraphUnitSuite) TestHTTPClient() { name: "no options", opts: []option{}, check: func(t *testing.T, c *http.Client) { - assert.Equal(t, 90*time.Second, c.Timeout, "default timeout") + assert.Equal(t, 3*time.Minute, c.Timeout, "default timeout") }, }, { diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index a786de0ab..c4e1825bd 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -271,11 +271,11 @@ func (oc *Collection) populateItems(ctx context.Context) { continue - } else if !graph.IsErrTimeout(err) && !graph.IsErrThrottled(err) && !graph.IsSericeUnavailable(err) { - // TODO: graphAPI will provides headers that state the duration to wait - // in order to succeed again. The one second sleep won't cut it here. - // - // for all non-timeout, non-unauth, non-throttling errors, do not retry + } else if !graph.IsErrTimeout(err) && + !graph.IsInternalServerError(err) { + // Don't retry for non-timeout, on-unauth, as + // we are already retrying it in the default + // retry middleware break } diff --git a/src/internal/connector/onedrive/item.go b/src/internal/connector/onedrive/item.go index c4fd1b380..b1027de9d 100644 --- a/src/internal/connector/onedrive/item.go +++ b/src/internal/connector/onedrive/item.go @@ -105,6 +105,10 @@ func downloadItem(hc *http.Client, item models.DriveItemable) (*http.Response, e return resp, graph.Err401Unauthorized } + if resp.StatusCode == http.StatusInternalServerError { + return resp, graph.Err500InternalServerError + } + if resp.StatusCode == http.StatusServiceUnavailable { return resp, graph.Err503ServiceUnavailable }