From 2ab3c890b4c20c65a53a9110419eafb73759f16f Mon Sep 17 00:00:00 2001 From: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Date: Wed, 7 Feb 2024 23:43:20 -0800 Subject: [PATCH] Retry events item enumeration with smaller page size for some cases (#5194) Update the events item enumerator to switch to a smaller page size for the delta pager if we fail enumeration with a 503 error and no content. We've found that that situation is indicative of the Graph server being slow and a smaller page size allows us to make progress still --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [x] :clock1: Yes, but in a later PR - [ ] :no_entry: No #### Type of change - [ ] :sunflower: Feature - [x] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test (in another PR) - [ ] :green_heart: E2E --- src/pkg/services/m365/api/config.go | 5 +- src/pkg/services/m365/api/events_pager.go | 55 ++++++- .../services/m365/api/events_pager_test.go | 137 ++++++++++++++++++ src/pkg/services/m365/api/graph/errors.go | 20 +++ src/pkg/services/m365/api/graph/middleware.go | 10 +- .../m365/api/graph/middleware_test.go | 5 +- .../services/m365/api/graph/service_mock.go | 28 +++- src/pkg/services/m365/api/helper_test.go | 1 + src/pkg/services/m365/api/mail_test.go | 16 +- 9 files changed, 259 insertions(+), 18 deletions(-) diff --git a/src/pkg/services/m365/api/config.go b/src/pkg/services/m365/api/config.go index 50453c81e..91694c670 100644 --- a/src/pkg/services/m365/api/config.go +++ b/src/pkg/services/m365/api/config.go @@ -8,8 +8,9 @@ import ( ) const ( - maxNonDeltaPageSize = int32(999) - maxDeltaPageSize = int32(500) + maxNonDeltaPageSize = int32(999) + maxDeltaPageSize = int32(500) + minEventsDeltaPageSize = int32(10) ) // selectable values, case insensitive diff --git a/src/pkg/services/m365/api/events_pager.go b/src/pkg/services/m365/api/events_pager.go index 50a977d92..d3b1e3d2c 100644 --- a/src/pkg/services/m365/api/events_pager.go +++ b/src/pkg/services/m365/api/events_pager.go @@ -2,6 +2,7 @@ package api import ( "context" + "errors" "fmt" "github.com/alcionai/clues" @@ -9,6 +10,7 @@ import ( "github.com/microsoftgraph/msgraph-sdk-go/users" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api/graph" "github.com/alcionai/corso/src/pkg/services/m365/api/pagers" @@ -200,12 +202,29 @@ func (c Events) NewEventsDeltaPager( ctx context.Context, userID, containerID, prevDeltaLink string, selectProps ...string, +) pagers.DeltaHandler[models.Eventable] { + return c.newEventsDeltaPagerWithPageSize( + ctx, + userID, + containerID, + prevDeltaLink, + c.options.DeltaPageSize, + selectProps...) +} + +func (c Events) newEventsDeltaPagerWithPageSize( + ctx context.Context, + userID string, + containerID string, + prevDeltaLink string, + pageSize int32, + selectProps ...string, ) pagers.DeltaHandler[models.Eventable] { options := &users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration{ // do NOT set Top. It limits the total items received. QueryParameters: &users.ItemCalendarsItemEventsDeltaRequestBuilderGetQueryParameters{}, Headers: newPreferHeaders( - preferPageSize(c.options.DeltaPageSize), + preferPageSize(pageSize), preferImmutableIDs(c.options.ToggleFeatures.ExchangeImmutableIDs)), } @@ -259,11 +278,45 @@ func (c Events) GetAddedAndRemovedItemIDs( containerID, prevDeltaLink, idAnd()...) + + // Experiments showed that non-delta endpoint didn't have the same performance + // degradation that delta endpoint did so a larger page size should be ok. pager := c.NewEventsPager( userID, containerID, idAnd(lastModifiedDateTime)...) + // Try running the query with the given limits. If we fail to do a delta + // enumeration with a 5xx error try rerunning the query with a smaller page + // size. We've seen some resources where this consistently happens but a small + // page size allows us to make progress. + addedRemoved, err := pagers.GetAddedAndRemovedItemIDs[models.Eventable]( + ctx, + pager, + deltaPager, + prevDeltaLink, + config.CanMakeDeltaQueries, + config.LimitResults, + pagers.AddedAndRemovedByAddtlData[models.Eventable]) + if err == nil || !errors.Is(err, graph.ErrServiceUnavailableEmptyResp) { + return addedRemoved, clues.Stack(err).OrNil() + } + + effectivePageSize := minEventsDeltaPageSize + + logger.Ctx(ctx).Infow( + "retrying event item query with reduced page size", + "delta_pager_effective_page_size", effectivePageSize, + "delta_pager_default_page_size", c.options.DeltaPageSize) + + deltaPager = c.newEventsDeltaPagerWithPageSize( + ctx, + userID, + containerID, + prevDeltaLink, + effectivePageSize, + idAnd()...) + return pagers.GetAddedAndRemovedItemIDs[models.Eventable]( ctx, pager, diff --git a/src/pkg/services/m365/api/events_pager_test.go b/src/pkg/services/m365/api/events_pager_test.go index 26261cc8c..a50b73bb1 100644 --- a/src/pkg/services/m365/api/events_pager_test.go +++ b/src/pkg/services/m365/api/events_pager_test.go @@ -1,9 +1,14 @@ package api import ( + "fmt" + "net/http" + stdpath "path" + "strings" "testing" "github.com/alcionai/clues" + "github.com/h2non/gock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -12,8 +17,140 @@ import ( "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" + "github.com/alcionai/corso/src/pkg/count" ) +// --------------------------------------------------------------------------- +// Unit tests +// --------------------------------------------------------------------------- + +type EventsPagerUnitSuite struct { + tester.Suite +} + +func TestEventsPagerUnitSuite(t *testing.T) { + suite.Run(t, &EventsPagerUnitSuite{ + Suite: tester.NewUnitSuite(t), + }) +} + +func (suite *EventsPagerUnitSuite) TestGetAddedAndRemovedItemIDs_SendsCorrectDeltaPageSize() { + const ( + validEmptyResponse = `{ + "@odata.context": "https://graph.microsoft.com/beta/$metadata#Collection(event)", + "value": [], + "@odata.deltaLink": "link" +}` + + // deltaPath helps make gock matching a little easier since it splits out + // the hostname from the remainder of the URL. Graph SDK uses the URL + // directly though. + deltaPath = "/prev-delta" + prevDelta = graphAPIHostURL + deltaPath + + userID = "user-id" + containerID = "container-id" + ) + + deltaTests := []struct { + name string + reqPath string + inputDelta string + }{ + { + name: "NoPrevDelta", + reqPath: stdpath.Join( + "/beta", + "users", + userID, + "calendars", + containerID, + "events", + "delta"), + }, + { + name: "HasPrevDelta", + reqPath: deltaPath, + inputDelta: prevDelta, + }, + } + + for _, deltaTest := range deltaTests { + suite.Run(deltaTest.name, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + t.Cleanup(flush) + + a := tconfig.NewFakeM365Account(t) + creds, err := a.M365Config() + require.NoError(t, err, clues.ToCore(err)) + + client, err := gockClient(creds, count.New()) + require.NoError(t, err, clues.ToCore(err)) + + t.Cleanup(gock.Off) + + // Number of retries and delay between retries is handled by a kiota + // middleware. We can change the default config parameters when setting up + // the mock in a later PR. + gock.New(graphAPIHostURL). + Get(deltaTest.reqPath). + Times(4). + Reply(http.StatusServiceUnavailable). + BodyString(""). + Type("text/plain") + + gock.New(graphAPIHostURL). + Get(deltaTest.reqPath). + SetMatcher(gock.NewMatcher()). + // Need a custom Matcher since the prefer header is also used for + // immutable ID behavior. + AddMatcher(func(got *http.Request, want *gock.Request) (bool, error) { + var ( + found bool + preferHeaders = got.Header.Values("Prefer") + expected = fmt.Sprintf( + "odata.maxpagesize=%d", + minEventsDeltaPageSize) + ) + + for _, header := range preferHeaders { + if strings.Contains(header, expected) { + found = true + break + } + } + + assert.Truef( + t, + found, + "header %s not found in set %v", + expected, + preferHeaders) + + return true, nil + }). + Reply(http.StatusOK). + JSON(validEmptyResponse) + + _, err = client.Events().GetAddedAndRemovedItemIDs( + ctx, + userID, + containerID, + deltaTest.inputDelta, + CallConfig{ + CanMakeDeltaQueries: true, + }) + require.NoError(t, err, clues.ToCore(err)) + }) + } +} + +// --------------------------------------------------------------------------- +// Integration tests +// --------------------------------------------------------------------------- + type EventsPagerIntgSuite struct { tester.Suite its intgTesterSetup diff --git a/src/pkg/services/m365/api/graph/errors.go b/src/pkg/services/m365/api/graph/errors.go index 6c99d3f21..90ba550de 100644 --- a/src/pkg/services/m365/api/graph/errors.go +++ b/src/pkg/services/m365/api/graph/errors.go @@ -111,6 +111,11 @@ const ( LabelsSkippable = "skippable_errors" ) +// ErrServiceUnavailableEmptyResp indicates the remote service returned a 503 +// with an empty response body. This can sometimes happen if a request times out +// during processing. +var ErrServiceUnavailableEmptyResp = clues.New("service unavailable and no returned content") + // --------------------------------------------------------------------------- // error categorization // --------------------------------------------------------------------------- @@ -395,6 +400,21 @@ func stackReq( resp *http.Response, e error, ) *clues.Err { + // For service unavailable stack an error we can check against. We can't check + // this at higher layers because we won't have the content length or server + // response code thanks to graph SDK. We do need to check this before checking + // if there was no error though because the status code itself doesn't count + // as an error. + // + // Returning an error here is non-standard, but if we don't return something + // then all we get from graph SDK is an error saying "content is empty" which + // isn't particularly useful. + if resp != nil && + resp.ContentLength == 0 && + resp.StatusCode == http.StatusServiceUnavailable { + e = clues.Stack(ErrServiceUnavailableEmptyResp, e) + } + if e == nil { return nil } diff --git a/src/pkg/services/m365/api/graph/middleware.go b/src/pkg/services/m365/api/graph/middleware.go index 3405c030c..0c5156ed0 100644 --- a/src/pkg/services/m365/api/graph/middleware.go +++ b/src/pkg/services/m365/api/graph/middleware.go @@ -164,7 +164,15 @@ func (mw RetryMiddleware) Intercept( mw.isRetriableRespCode(ctx, resp) if !retriable { - return resp, stackReq(ctx, req, resp, err).OrNil() + // Returning a response and an error causes output from either some part of + // the middleware/graph/golang or the mocking library used for testing. + // Return one or the other to avoid this. + err := stackReq(ctx, req, resp, err).OrNil() + if err != nil { + return nil, err + } + + return resp, nil } exponentialBackOff := backoff.NewExponentialBackOff() diff --git a/src/pkg/services/m365/api/graph/middleware_test.go b/src/pkg/services/m365/api/graph/middleware_test.go index 8c7bd0dc8..1a1f93501 100644 --- a/src/pkg/services/m365/api/graph/middleware_test.go +++ b/src/pkg/services/m365/api/graph/middleware_test.go @@ -43,8 +43,9 @@ func newMWReturns(code int, body []byte, err error) mwReturns { } resp := &http.Response{ - StatusCode: code, - Body: brc, + ContentLength: int64(len(body)), + StatusCode: code, + Body: brc, } if code == 0 { diff --git a/src/pkg/services/m365/api/graph/service_mock.go b/src/pkg/services/m365/api/graph/service_mock.go index b90c312f7..119504554 100644 --- a/src/pkg/services/m365/api/graph/service_mock.go +++ b/src/pkg/services/m365/api/graph/service_mock.go @@ -6,6 +6,7 @@ import ( "github.com/alcionai/clues" "github.com/h2non/gock" abstractions "github.com/microsoft/kiota-abstractions-go" + khttp "github.com/microsoft/kiota-http-go" msgraphsdkgo "github.com/microsoftgraph/msgraph-sdk-go" "github.com/alcionai/corso/src/pkg/account" @@ -34,6 +35,9 @@ func NewGockService( counter *count.Bus, opts ...Option, ) (*Service, error) { + // Need to initialize the concurrency limiter else we'll get an error. + InitializeConcurrencyLimiter(context.Background(), true, 4) + a, err := CreateGockAdapter( creds.AzureTenantID, creds.AzureClientID, @@ -54,11 +58,27 @@ func CreateGockAdapter( counter *count.Bus, opts ...Option, ) (abstractions.RequestAdapter, error) { - httpClient, cc := KiotaHTTPClient(counter, opts...) + cc := populateConfig(opts...) + // We need to manufacture our own graph client since gock.InterceptClient + // replaces the transport which replaces all the middleware we add. Start with + // a client without any middleware and then replace the transport completely + // with the mocked one since there's no real in between that we could use. + clientOptions := msgraphsdkgo.GetDefaultClientOptions() + middlewares := kiotaMiddlewares(&clientOptions, cc, counter) - // This makes sure that we are able to intercept any requests via - // gock. Only necessary for testing. - gock.InterceptClient(httpClient) + //nolint:lll + // This is lifted from + // https://github.com/microsoft/kiota-http-go/blob/e32eb086c8d28002dcba922f3271d56327ba8b03/pipeline.go#L75 + // which was found by following + // https://github.com/microsoftgraph/msgraph-sdk-go-core/blob/93a2c8acb7dfff7f3e2791670f51ccb001d7b127/graph_client_factory.go#L26 + httpClient := khttp.GetDefaultClient() + httpClient.Transport = khttp.NewCustomTransportWithParentTransport( + // gock's default transport isn't quite the same as the default graph uses + // but since we're mocking things it's probably ok. + gock.NewTransport(), + middlewares...) + + cc.apply(httpClient) ng, err := msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient( // Use our own mock auth instance. This allows us to completely avoid having diff --git a/src/pkg/services/m365/api/helper_test.go b/src/pkg/services/m365/api/helper_test.go index 2d993996e..cda1182f4 100644 --- a/src/pkg/services/m365/api/helper_test.go +++ b/src/pkg/services/m365/api/helper_test.go @@ -38,6 +38,7 @@ func gockClient(creds account.M365Config, counter *count.Bus) (Client, error) { Credentials: creds, Stable: s, LargeItem: li, + options: control.DefaultOptions(), }, nil } diff --git a/src/pkg/services/m365/api/mail_test.go b/src/pkg/services/m365/api/mail_test.go index 313fd55ba..69c5765b9 100644 --- a/src/pkg/services/m365/api/mail_test.go +++ b/src/pkg/services/m365/api/mail_test.go @@ -313,9 +313,12 @@ func (suite *MailAPIIntgSuite) TestMail_attachmentListDownload() { Reply(200). JSON(graphTD.ParseableToMap(suite.T(), atts)) - interceptV1Path("users", "user", "messages", mid, "attachments", aid). - Reply(200). - JSON(graphTD.ParseableToMap(suite.T(), attch)) + // TODO(pandeyabs): This does not test getAttachmentsIterated code path. + // We need a separate test for that. + // For that we'd need to fail /attachments call(above) with an error. + // This would trigger a /attachments?$select=id,size call, followed by + // an $expand=microsoft.graph.itemAttachment/item call for each attachment. + // These paths need to be added to the gock intercepts. }, attachmentCount: 1, size: 200, @@ -349,11 +352,8 @@ func (suite *MailAPIIntgSuite) TestMail_attachmentListDownload() { Reply(200). JSON(graphTD.ParseableToMap(suite.T(), atts)) - for i := 0; i < 5; i++ { - interceptV1Path("users", "user", "messages", mid, "attachments", aid). - Reply(200). - JSON(graphTD.ParseableToMap(suite.T(), attch)) - } + // TODO(pandeyabs): This does not test getAttachmentsIterated code path. + // We need a separate test for that. }, attachmentCount: 5, size: 1000,