Retry more errors in graph adapter wrapper (#5203)

<!-- PR description-->

* We have started seeing `io.ErrUnexpectedEOF` and `read: connection timed out` in the last 2 days for exchange backups. Retry those.
* Also increase retry count from 3 to 6. This is more of a hail mary to retry `InvalidAuthenticationToken` errors. We have observed that retries do help. But for a small set of requests, we end up exhausting retries and eventually fail with InvalidAuthenticationToken error. Hoping that bumping this to 6 will get us some relief. This fix may be removed if we find the rootcause/pattern behind this.
* The event list test was taking > 150 secs. Thought I'd push this change as its a small fix. I first thought my PR https://github.com/alcionai/corso/pull/5202 broke the test. So I investigated this.

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [x] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abhishek Pandey 2024-02-09 21:11:05 -08:00 committed by GitHub
parent 71a9087e4d
commit f92f811559
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 116 additions and 48 deletions

View File

@ -18,6 +18,7 @@ import (
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
) )
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -107,13 +108,10 @@ func (suite *EventsPagerUnitSuite) TestEventsList() {
Reply(http.StatusOK). Reply(http.StatusOK).
JSON(validEventsListSingleNextLinkResponse) JSON(validEventsListSingleNextLinkResponse)
// Number of retries and delay between retries is handled by a kiota
// middleware. We can change the default config parameters when setting
// up the mock in a later PR.
gock.New(graphAPIHostURL). gock.New(graphAPIHostURL).
Get(nextLinkPath). Get(nextLinkPath).
AddMatcher(pageSizeMatcher(t, expectedPageSize)). AddMatcher(pageSizeMatcher(t, expectedPageSize)).
Times(4). Times(2). // retry count is configured to 1
Reply(http.StatusServiceUnavailable). Reply(http.StatusServiceUnavailable).
BodyString(""). BodyString("").
Type("text/plain") Type("text/plain")
@ -241,7 +239,9 @@ func (suite *EventsPagerUnitSuite) TestEventsList() {
creds, err := a.M365Config() creds, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
client, err := gockClient(creds, count.New()) // Run with a single retry since 503 retries are exponential and
// the test will take a long time to run.
client, err := gockClient(creds, count.New(), graph.MaxRetries(1))
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
t.Cleanup(gock.Off) t.Cleanup(gock.Off)

View File

@ -2,6 +2,7 @@ package graph
import ( import (
"context" "context"
"io"
"net/http" "net/http"
"time" "time"
@ -173,7 +174,15 @@ func KiotaHTTPClient(
const ( const (
defaultDelay = 3 * time.Second defaultDelay = 3 * time.Second
defaultHTTPClientTimeout = 1 * time.Hour defaultHTTPClientTimeout = 1 * time.Hour
defaultMaxRetries = 3
// Default retry count for retry middlewares
defaultMaxRetries = 3
// Retry count for graph adapter
//
// Bumping retries to 6 since we have noticed that auth token expiry errors
// may continue to persist even after 3 retries.
adapterMaxRetries = 6
// FIXME: This should ideally be 0, but if we set to 0, graph // FIXME: This should ideally be 0, but if we set to 0, graph
// client with automatically set the context timeout to 0 as // client with automatically set the context timeout to 0 as
// well which will make the client unusable. // well which will make the client unusable.
@ -200,7 +209,7 @@ type Option func(*clientConfig)
// populate constructs a clientConfig according to the provided options. // populate constructs a clientConfig according to the provided options.
func populateConfig(opts ...Option) *clientConfig { func populateConfig(opts ...Option) *clientConfig {
cc := clientConfig{ cc := clientConfig{
maxConnectionRetries: defaultMaxRetries, maxConnectionRetries: adapterMaxRetries,
maxRetries: defaultMaxRetries, maxRetries: defaultMaxRetries,
minDelay: defaultDelay, minDelay: defaultDelay,
timeout: defaultHTTPClientTimeout, timeout: defaultHTTPClientTimeout,
@ -373,9 +382,11 @@ func wrapAdapter(gra *msgraphsdkgo.GraphRequestAdapter, cc *clientConfig) *adapt
} }
} }
var connectionEnded = filters.Contains([]string{ // Graph may abruptly close connections, which we should retry.
var connectionEnded = filters.In([]string{
"connection reset by peer", "connection reset by peer",
"client connection force closed", "client connection force closed",
"read: connection timed out",
}) })
func (aw *adapterWrap) Send( func (aw *adapterWrap) Send(
@ -416,7 +427,9 @@ func (aw *adapterWrap) Send(
err = stackWithCoreErr(ictx, err, 1) err = stackWithCoreErr(ictx, err, 1)
e = err e = err
if IsErrConnectionReset(err) || connectionEnded.Compare(err.Error()) { if IsErrConnectionReset(err) ||
connectionEnded.Compare(err.Error()) ||
errors.Is(err, io.ErrUnexpectedEOF) {
logger.Ctx(ictx).Debug("http connection error") logger.Ctx(ictx).Debug("http connection error")
events.Inc(events.APICall, "connectionerror") events.Inc(events.APICall, "connectionerror")
} else if errors.Is(err, core.ErrAuthTokenExpired) { } else if errors.Is(err, core.ErrAuthTokenExpired) {

View File

@ -93,7 +93,7 @@ func (suite *GraphIntgSuite) TestHTTPClient() {
checkConfig: func(t *testing.T, c *clientConfig) { checkConfig: func(t *testing.T, c *clientConfig) {
assert.Equal(t, defaultDelay, c.minDelay, "default delay") assert.Equal(t, defaultDelay, c.minDelay, "default delay")
assert.Equal(t, defaultMaxRetries, c.maxRetries, "max retries") assert.Equal(t, defaultMaxRetries, c.maxRetries, "max retries")
assert.Equal(t, defaultMaxRetries, c.maxConnectionRetries, "max connection retries") assert.Equal(t, adapterMaxRetries, c.maxConnectionRetries, "max connection retries")
}, },
}, },
{ {
@ -221,43 +221,92 @@ func (suite *GraphIntgSuite) TestAdapterWrap_catchesPanic() {
require.Contains(t, err.Error(), "panic", clues.ToCore(err)) require.Contains(t, err.Error(), "panic", clues.ToCore(err))
} }
func (suite *GraphIntgSuite) TestAdapterWrap_retriesConnectionClose() { func (suite *GraphIntgSuite) TestAdapterWrap_retriesConnectionInterruptions() {
t := suite.T() table := []struct {
name string
ctx, flush := tester.NewContext(t) providedErr error
defer flush() expectRetryCount int
expectErr assert.ErrorAssertionFunc
retryInc := 0 }{
{
// the panics should get caught and returned as errors name: "ECONNRESET",
alwaysECONNRESET := mwForceResp{ providedErr: syscall.ECONNRESET,
err: syscall.ECONNRESET, expectRetryCount: 7,
alternate: func(req *http.Request) (bool, *http.Response, error) { expectErr: assert.Error,
retryInc++ },
return false, nil, nil {
name: "connection reset by peer",
providedErr: clues.New("connection reset by peer what is love"),
expectRetryCount: 7,
expectErr: assert.Error,
},
{
name: "read: connection timed out",
providedErr: clues.New("read: connection timed out baby don't hurt me"),
expectRetryCount: 7,
expectErr: assert.Error,
},
{
name: "unexpected EOF",
providedErr: io.ErrUnexpectedEOF,
expectRetryCount: 7,
expectErr: assert.Error,
},
{
name: "nil error",
providedErr: nil,
expectRetryCount: 1,
expectErr: assert.NoError,
},
{
name: "non retriable error",
providedErr: clues.New("no more"),
expectRetryCount: 1,
expectErr: assert.Error,
}, },
} }
adpt, err := CreateAdapter( for _, test := range table {
suite.credentials.AzureTenantID, suite.Run(test.name, func() {
suite.credentials.AzureClientID, t := suite.T()
suite.credentials.AzureClientSecret,
count.New(),
appendMiddleware(&alwaysECONNRESET),
// Configure retry middlewares so that they don't retry on connection reset.
// Those middlewares have their own tests to verify retries.
MaxRetries(-1))
require.NoError(t, err, clues.ToCore(err))
// Retry delay doesn't really matter here since all requests will be intercepted ctx, flush := tester.NewContext(t)
// by the test middleware. Set it to 0 to reduce test runtime. defer flush()
aw := adpt.(*adapterWrap)
aw.retryDelay = 0
// the query doesn't matter retryInc := 0
_, err = NewService(adpt).Client().Users().Get(ctx, nil) forceErrMW := mwForceResp{
require.ErrorIs(t, err, syscall.ECONNRESET, clues.ToCore(err)) err: test.providedErr,
require.Equal(t, 4, retryInc, "number of retries") // send some dummy response, because kiota retry handler panics
// if err is nil and resp is nil.
resp: &http.Response{},
alternate: func(req *http.Request) (bool, *http.Response, error) {
retryInc++
return false, nil, nil
},
}
adpt, err := CreateAdapter(
suite.credentials.AzureTenantID,
suite.credentials.AzureClientID,
suite.credentials.AzureClientSecret,
count.New(),
appendMiddleware(&forceErrMW),
// Configure retry middlewares so that they don't retry on connection reset.
// Those middlewares have their own tests to verify retries.
MaxRetries(-1))
require.NoError(t, err, clues.ToCore(err))
// Retry delay doesn't really matter here since all requests will be intercepted
// by the test middleware. Set it to 0 to reduce test runtime.
aw := adpt.(*adapterWrap)
aw.retryDelay = 0
// the query doesn't matter
_, err = NewService(adpt).Client().Users().Get(ctx, nil)
test.expectErr(t, err, clues.ToCore(err))
require.Equal(t, test.expectRetryCount, retryInc, "number of retries")
})
}
} }
func (suite *GraphIntgSuite) TestAdapterWrap_retriesBadJWTToken() { func (suite *GraphIntgSuite) TestAdapterWrap_retriesBadJWTToken() {
@ -315,14 +364,14 @@ func (suite *GraphIntgSuite) TestAdapterWrap_retriesBadJWTToken() {
NewItemCalendarsItemEventsDeltaRequestBuilder("https://graph.microsoft.com/fnords/beaux/regard", adpt). NewItemCalendarsItemEventsDeltaRequestBuilder("https://graph.microsoft.com/fnords/beaux/regard", adpt).
Get(ctx, nil) Get(ctx, nil)
assert.ErrorIs(t, err, core.ErrAuthTokenExpired, clues.ToCore(err)) assert.ErrorIs(t, err, core.ErrAuthTokenExpired, clues.ToCore(err))
assert.Equal(t, 4, retryInc, "number of retries") assert.Equal(t, adapterMaxRetries+1, retryInc, "number of retries")
retryInc = 0 retryInc = 0
// the query doesn't matter // the query doesn't matter
_, err = NewService(adpt).Client().Users().Get(ctx, nil) _, err = NewService(adpt).Client().Users().Get(ctx, nil)
assert.ErrorIs(t, err, core.ErrAuthTokenExpired, clues.ToCore(err)) assert.ErrorIs(t, err, core.ErrAuthTokenExpired, clues.ToCore(err))
assert.Equal(t, 4, retryInc, "number of retries") assert.Equal(t, adapterMaxRetries+1, retryInc, "number of retries")
} }
// TestAdapterWrap_retriesInvalidRequest tests adapter retries for graph 400 // TestAdapterWrap_retriesInvalidRequest tests adapter retries for graph 400
@ -398,7 +447,7 @@ func (suite *GraphIntgSuite) TestAdapterWrap_retriesInvalidRequest() {
_, err = NewService(adpt).Client().Users().Get(ctx, nil) _, err = NewService(adpt).Client().Users().Get(ctx, nil)
return err return err
}, },
expectedRetries: 4, expectedRetries: adapterMaxRetries + 1,
}, },
{ {
name: "POST request, no retry", name: "POST request, no retry",

View File

@ -23,13 +23,19 @@ import (
// GockClient produces a new exchange api client that can be // GockClient produces a new exchange api client that can be
// mocked using gock. // mocked using gock.
func gockClient(creds account.M365Config, counter *count.Bus) (Client, error) { func gockClient(
s, err := graph.NewGockService(creds, counter) creds account.M365Config,
counter *count.Bus,
opts ...graph.Option,
) (Client, error) {
s, err := graph.NewGockService(creds, counter, opts...)
if err != nil { if err != nil {
return Client{}, err return Client{}, err
} }
li, err := graph.NewGockService(creds, counter, graph.NoTimeout()) opts = append(opts, graph.NoTimeout())
li, err := graph.NewGockService(creds, counter, opts...)
if err != nil { if err != nil {
return Client{}, err return Client{}, err
} }