Retry events item enumeration with smaller page size for some cases (#5194)

Update the events item enumerator to switch to a smaller page size for
the delta pager if we fail enumeration with a 503 error and no content.
We've found that that situation is indicative of the Graph server
being slow and a smaller page size allows us to make progress still

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [x] 🕐 Yes, but in a later PR
- [ ]  No

#### Type of change

- [ ] 🌻 Feature
- [x] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

#### Test Plan

- [ ] 💪 Manual
- [x]  Unit test (in another PR)
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2024-02-07 23:43:20 -08:00 committed by GitHub
parent f00dd0f88a
commit 2ab3c890b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 259 additions and 18 deletions

View File

@ -10,6 +10,7 @@ import (
const (
maxNonDeltaPageSize = int32(999)
maxDeltaPageSize = int32(500)
minEventsDeltaPageSize = int32(10)
)
// selectable values, case insensitive

View File

@ -2,6 +2,7 @@ package api
import (
"context"
"errors"
"fmt"
"github.com/alcionai/clues"
@ -9,6 +10,7 @@ import (
"github.com/microsoftgraph/msgraph-sdk-go/users"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
"github.com/alcionai/corso/src/pkg/services/m365/api/pagers"
@ -200,12 +202,29 @@ func (c Events) NewEventsDeltaPager(
ctx context.Context,
userID, containerID, prevDeltaLink string,
selectProps ...string,
) pagers.DeltaHandler[models.Eventable] {
return c.newEventsDeltaPagerWithPageSize(
ctx,
userID,
containerID,
prevDeltaLink,
c.options.DeltaPageSize,
selectProps...)
}
func (c Events) newEventsDeltaPagerWithPageSize(
ctx context.Context,
userID string,
containerID string,
prevDeltaLink string,
pageSize int32,
selectProps ...string,
) pagers.DeltaHandler[models.Eventable] {
options := &users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration{
// do NOT set Top. It limits the total items received.
QueryParameters: &users.ItemCalendarsItemEventsDeltaRequestBuilderGetQueryParameters{},
Headers: newPreferHeaders(
preferPageSize(c.options.DeltaPageSize),
preferPageSize(pageSize),
preferImmutableIDs(c.options.ToggleFeatures.ExchangeImmutableIDs)),
}
@ -259,11 +278,45 @@ func (c Events) GetAddedAndRemovedItemIDs(
containerID,
prevDeltaLink,
idAnd()...)
// Experiments showed that non-delta endpoint didn't have the same performance
// degradation that delta endpoint did so a larger page size should be ok.
pager := c.NewEventsPager(
userID,
containerID,
idAnd(lastModifiedDateTime)...)
// Try running the query with the given limits. If we fail to do a delta
// enumeration with a 5xx error try rerunning the query with a smaller page
// size. We've seen some resources where this consistently happens but a small
// page size allows us to make progress.
addedRemoved, err := pagers.GetAddedAndRemovedItemIDs[models.Eventable](
ctx,
pager,
deltaPager,
prevDeltaLink,
config.CanMakeDeltaQueries,
config.LimitResults,
pagers.AddedAndRemovedByAddtlData[models.Eventable])
if err == nil || !errors.Is(err, graph.ErrServiceUnavailableEmptyResp) {
return addedRemoved, clues.Stack(err).OrNil()
}
effectivePageSize := minEventsDeltaPageSize
logger.Ctx(ctx).Infow(
"retrying event item query with reduced page size",
"delta_pager_effective_page_size", effectivePageSize,
"delta_pager_default_page_size", c.options.DeltaPageSize)
deltaPager = c.newEventsDeltaPagerWithPageSize(
ctx,
userID,
containerID,
prevDeltaLink,
effectivePageSize,
idAnd()...)
return pagers.GetAddedAndRemovedItemIDs[models.Eventable](
ctx,
pager,

View File

@ -1,9 +1,14 @@
package api
import (
"fmt"
"net/http"
stdpath "path"
"strings"
"testing"
"github.com/alcionai/clues"
"github.com/h2non/gock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
@ -12,8 +17,140 @@ import (
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/count"
)
// ---------------------------------------------------------------------------
// Unit tests
// ---------------------------------------------------------------------------
type EventsPagerUnitSuite struct {
tester.Suite
}
func TestEventsPagerUnitSuite(t *testing.T) {
suite.Run(t, &EventsPagerUnitSuite{
Suite: tester.NewUnitSuite(t),
})
}
func (suite *EventsPagerUnitSuite) TestGetAddedAndRemovedItemIDs_SendsCorrectDeltaPageSize() {
const (
validEmptyResponse = `{
"@odata.context": "https://graph.microsoft.com/beta/$metadata#Collection(event)",
"value": [],
"@odata.deltaLink": "link"
}`
// deltaPath helps make gock matching a little easier since it splits out
// the hostname from the remainder of the URL. Graph SDK uses the URL
// directly though.
deltaPath = "/prev-delta"
prevDelta = graphAPIHostURL + deltaPath
userID = "user-id"
containerID = "container-id"
)
deltaTests := []struct {
name string
reqPath string
inputDelta string
}{
{
name: "NoPrevDelta",
reqPath: stdpath.Join(
"/beta",
"users",
userID,
"calendars",
containerID,
"events",
"delta"),
},
{
name: "HasPrevDelta",
reqPath: deltaPath,
inputDelta: prevDelta,
},
}
for _, deltaTest := range deltaTests {
suite.Run(deltaTest.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
t.Cleanup(flush)
a := tconfig.NewFakeM365Account(t)
creds, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
client, err := gockClient(creds, count.New())
require.NoError(t, err, clues.ToCore(err))
t.Cleanup(gock.Off)
// Number of retries and delay between retries is handled by a kiota
// middleware. We can change the default config parameters when setting up
// the mock in a later PR.
gock.New(graphAPIHostURL).
Get(deltaTest.reqPath).
Times(4).
Reply(http.StatusServiceUnavailable).
BodyString("").
Type("text/plain")
gock.New(graphAPIHostURL).
Get(deltaTest.reqPath).
SetMatcher(gock.NewMatcher()).
// Need a custom Matcher since the prefer header is also used for
// immutable ID behavior.
AddMatcher(func(got *http.Request, want *gock.Request) (bool, error) {
var (
found bool
preferHeaders = got.Header.Values("Prefer")
expected = fmt.Sprintf(
"odata.maxpagesize=%d",
minEventsDeltaPageSize)
)
for _, header := range preferHeaders {
if strings.Contains(header, expected) {
found = true
break
}
}
assert.Truef(
t,
found,
"header %s not found in set %v",
expected,
preferHeaders)
return true, nil
}).
Reply(http.StatusOK).
JSON(validEmptyResponse)
_, err = client.Events().GetAddedAndRemovedItemIDs(
ctx,
userID,
containerID,
deltaTest.inputDelta,
CallConfig{
CanMakeDeltaQueries: true,
})
require.NoError(t, err, clues.ToCore(err))
})
}
}
// ---------------------------------------------------------------------------
// Integration tests
// ---------------------------------------------------------------------------
type EventsPagerIntgSuite struct {
tester.Suite
its intgTesterSetup

View File

@ -111,6 +111,11 @@ const (
LabelsSkippable = "skippable_errors"
)
// ErrServiceUnavailableEmptyResp indicates the remote service returned a 503
// with an empty response body. This can sometimes happen if a request times out
// during processing.
var ErrServiceUnavailableEmptyResp = clues.New("service unavailable and no returned content")
// ---------------------------------------------------------------------------
// error categorization
// ---------------------------------------------------------------------------
@ -395,6 +400,21 @@ func stackReq(
resp *http.Response,
e error,
) *clues.Err {
// For service unavailable stack an error we can check against. We can't check
// this at higher layers because we won't have the content length or server
// response code thanks to graph SDK. We do need to check this before checking
// if there was no error though because the status code itself doesn't count
// as an error.
//
// Returning an error here is non-standard, but if we don't return something
// then all we get from graph SDK is an error saying "content is empty" which
// isn't particularly useful.
if resp != nil &&
resp.ContentLength == 0 &&
resp.StatusCode == http.StatusServiceUnavailable {
e = clues.Stack(ErrServiceUnavailableEmptyResp, e)
}
if e == nil {
return nil
}

View File

@ -164,7 +164,15 @@ func (mw RetryMiddleware) Intercept(
mw.isRetriableRespCode(ctx, resp)
if !retriable {
return resp, stackReq(ctx, req, resp, err).OrNil()
// Returning a response and an error causes output from either some part of
// the middleware/graph/golang or the mocking library used for testing.
// Return one or the other to avoid this.
err := stackReq(ctx, req, resp, err).OrNil()
if err != nil {
return nil, err
}
return resp, nil
}
exponentialBackOff := backoff.NewExponentialBackOff()

View File

@ -43,6 +43,7 @@ func newMWReturns(code int, body []byte, err error) mwReturns {
}
resp := &http.Response{
ContentLength: int64(len(body)),
StatusCode: code,
Body: brc,
}

View File

@ -6,6 +6,7 @@ import (
"github.com/alcionai/clues"
"github.com/h2non/gock"
abstractions "github.com/microsoft/kiota-abstractions-go"
khttp "github.com/microsoft/kiota-http-go"
msgraphsdkgo "github.com/microsoftgraph/msgraph-sdk-go"
"github.com/alcionai/corso/src/pkg/account"
@ -34,6 +35,9 @@ func NewGockService(
counter *count.Bus,
opts ...Option,
) (*Service, error) {
// Need to initialize the concurrency limiter else we'll get an error.
InitializeConcurrencyLimiter(context.Background(), true, 4)
a, err := CreateGockAdapter(
creds.AzureTenantID,
creds.AzureClientID,
@ -54,11 +58,27 @@ func CreateGockAdapter(
counter *count.Bus,
opts ...Option,
) (abstractions.RequestAdapter, error) {
httpClient, cc := KiotaHTTPClient(counter, opts...)
cc := populateConfig(opts...)
// We need to manufacture our own graph client since gock.InterceptClient
// replaces the transport which replaces all the middleware we add. Start with
// a client without any middleware and then replace the transport completely
// with the mocked one since there's no real in between that we could use.
clientOptions := msgraphsdkgo.GetDefaultClientOptions()
middlewares := kiotaMiddlewares(&clientOptions, cc, counter)
// This makes sure that we are able to intercept any requests via
// gock. Only necessary for testing.
gock.InterceptClient(httpClient)
//nolint:lll
// This is lifted from
// https://github.com/microsoft/kiota-http-go/blob/e32eb086c8d28002dcba922f3271d56327ba8b03/pipeline.go#L75
// which was found by following
// https://github.com/microsoftgraph/msgraph-sdk-go-core/blob/93a2c8acb7dfff7f3e2791670f51ccb001d7b127/graph_client_factory.go#L26
httpClient := khttp.GetDefaultClient()
httpClient.Transport = khttp.NewCustomTransportWithParentTransport(
// gock's default transport isn't quite the same as the default graph uses
// but since we're mocking things it's probably ok.
gock.NewTransport(),
middlewares...)
cc.apply(httpClient)
ng, err := msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient(
// Use our own mock auth instance. This allows us to completely avoid having

View File

@ -38,6 +38,7 @@ func gockClient(creds account.M365Config, counter *count.Bus) (Client, error) {
Credentials: creds,
Stable: s,
LargeItem: li,
options: control.DefaultOptions(),
}, nil
}

View File

@ -313,9 +313,12 @@ func (suite *MailAPIIntgSuite) TestMail_attachmentListDownload() {
Reply(200).
JSON(graphTD.ParseableToMap(suite.T(), atts))
interceptV1Path("users", "user", "messages", mid, "attachments", aid).
Reply(200).
JSON(graphTD.ParseableToMap(suite.T(), attch))
// TODO(pandeyabs): This does not test getAttachmentsIterated code path.
// We need a separate test for that.
// For that we'd need to fail /attachments call(above) with an error.
// This would trigger a /attachments?$select=id,size call, followed by
// an $expand=microsoft.graph.itemAttachment/item call for each attachment.
// These paths need to be added to the gock intercepts.
},
attachmentCount: 1,
size: 200,
@ -349,11 +352,8 @@ func (suite *MailAPIIntgSuite) TestMail_attachmentListDownload() {
Reply(200).
JSON(graphTD.ParseableToMap(suite.T(), atts))
for i := 0; i < 5; i++ {
interceptV1Path("users", "user", "messages", mid, "attachments", aid).
Reply(200).
JSON(graphTD.ParseableToMap(suite.T(), attch))
}
// TODO(pandeyabs): This does not test getAttachmentsIterated code path.
// We need a separate test for that.
},
attachmentCount: 5,
size: 1000,