From 7905adb8e1a7c546188499c06cdb2e67bed1bf23 Mon Sep 17 00:00:00 2001 From: Keepers Date: Wed, 9 Aug 2023 13:17:38 -0600 Subject: [PATCH] test http2 stream errors, update client options (#3994) #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :robot: Supportability/Tests #### Test Plan - [x] :zap: Unit test --- src/go.sum | 6 ++ src/internal/m365/graph/http_wrapper.go | 18 ++--- src/internal/m365/graph/http_wrapper_test.go | 69 ++++++++++++++++++++ src/internal/m365/graph/mock/service.go | 2 +- src/internal/m365/graph/service.go | 58 +++++++++++----- src/internal/m365/graph/service_test.go | 69 +++++++++++++++++--- 6 files changed, 190 insertions(+), 32 deletions(-) diff --git a/src/go.sum b/src/go.sum index 01774fef2..0df8046bb 100644 --- a/src/go.sum +++ b/src/go.sum @@ -122,6 +122,7 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -225,6 +226,7 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -232,6 +234,7 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= @@ -303,6 +306,7 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/natefinch/atomic v1.0.1 h1:ZPYKxkqQOx3KZ+RsbnP/YsgvxWQPGxjC0oBt2AhwV0A= github.com/natefinch/atomic v1.0.1/go.mod h1:N/D/ELrljoqDyT3rZrsUmtsuzvHkeB/wWjHV22AZRbM= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4= @@ -404,6 +408,7 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.48.0 h1:oJWvHb9BIZToTQS3MuQ2R3bJZiNSa2KiNdeI8A+79Tc= github.com/valyala/fasthttp v1.48.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/vbauerster/mpb/v8 v8.1.6 h1:EswHDkAsy4OQ7QBAmU1MUPz4vHzl6KlINjlh7vJoxvY= github.com/vbauerster/mpb/v8 v8.1.6/go.mod h1:O9/Wl8X9dUbR63tZ41MLIAxrtNfwlpwUhGkeYugUPW8= github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c h1:3lbZUMbMiGUW/LMkfsEABsc5zNT9+b1CvsJx47JzJ8g= @@ -782,6 +787,7 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/src/internal/m365/graph/http_wrapper.go b/src/internal/m365/graph/http_wrapper.go index 23e5566e1..b810c7aa3 100644 --- a/src/internal/m365/graph/http_wrapper.go +++ b/src/internal/m365/graph/http_wrapper.go @@ -4,11 +4,12 @@ import ( "context" "io" "net/http" - "regexp" "time" "github.com/alcionai/clues" khttp "github.com/microsoft/kiota-http-go" + "github.com/pkg/errors" + "golang.org/x/net/http2" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/version" @@ -56,7 +57,7 @@ func NewHTTPWrapper(opts ...Option) *httpWrapper { cc.apply(hc) - return &httpWrapper{hc} + return &httpWrapper{hc, cc} } // NewNoTimeoutHTTPWrapper constructs a http wrapper with no context timeout. @@ -74,8 +75,6 @@ func NewNoTimeoutHTTPWrapper(opts ...Option) *httpWrapper { // requests // --------------------------------------------------------------------------- -var streamErrRE = regexp.MustCompile(`stream error: stream ID \d+; .+; received from peer`) - // Request does the provided request. func (hw httpWrapper) Request( ctx context.Context, @@ -105,18 +104,20 @@ func (hw httpWrapper) Request( // retry wrapper is unsophisticated, but should only // retry in the event of a `stream error`, which is not // a common expectation. - for i := 0; i < 3; i++ { + for i := 0; i < hw.config.maxConnectionRetries+1; i++ { ictx := clues.Add(ctx, "request_retry_iter", i) resp, err = hw.client.Do(req) - if err != nil && !streamErrRE.MatchString(err.Error()) { - return nil, Stack(ictx, err) - } if err == nil { break } + var http2StreamErr http2.StreamError + if !errors.As(err, &http2StreamErr) { + return nil, Stack(ictx, err) + } + logger.Ctx(ictx).Debug("http2 stream error") events.Inc(events.APICall, "streamerror") @@ -137,6 +138,7 @@ func (hw httpWrapper) Request( type ( httpWrapper struct { client *http.Client + config *clientConfig } customTransport struct { diff --git a/src/internal/m365/graph/http_wrapper_test.go b/src/internal/m365/graph/http_wrapper_test.go index 31e5faf52..fb01ef5b4 100644 --- a/src/internal/m365/graph/http_wrapper_test.go +++ b/src/internal/m365/graph/http_wrapper_test.go @@ -7,8 +7,10 @@ import ( "github.com/alcionai/clues" khttp "github.com/microsoft/kiota-http-go" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "golang.org/x/net/http2" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" @@ -116,3 +118,70 @@ func (suite *HTTPWrapperUnitSuite) TestNewHTTPWrapper_redirectMiddleware() { // require.Equal(t, 1, calledCorrectly, "test server was called with expected path") require.Equal(t, http.StatusOK, resp.StatusCode) } + +func (suite *HTTPWrapperUnitSuite) TestNewHTTPWrapper_http2StreamErrorRetries() { + var ( + url = "https://graph.microsoft.com/fnords/beaux/regard" + streamErr = http2.StreamError{ + StreamID: 1, + Code: http2.ErrCodeEnhanceYourCalm, + Cause: assert.AnError, + } + ) + + table := []struct { + name string + retries int + expectRetries int + }{ + { + name: "zero retries", + retries: 0, + expectRetries: 0, + }, + { + name: "negative max", + retries: -1, + expectRetries: 0, + }, + { + name: "upper limit", + retries: 9001, + expectRetries: 5, + }, + { + name: "four", + retries: 4, + expectRetries: 4, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + // -1 to account for the first try, + // which isn't a retry. + tries := -1 + + mwResp := mwForceResp{ + err: streamErr, + alternate: func(*http.Request) (bool, *http.Response, error) { + tries++ + return false, nil, nil + }, + } + + hw := NewHTTPWrapper( + appendMiddleware(&mwResp), + MaxConnectionRetries(test.retries)) + + _, err := hw.Request(ctx, http.MethodGet, url, nil, nil) + require.ErrorAs(t, err, &http2.StreamError{}, clues.ToCore(err)) + + require.Equal(t, test.expectRetries, tries, "count of retries") + }) + } +} diff --git a/src/internal/m365/graph/mock/service.go b/src/internal/m365/graph/mock/service.go index 813488626..64f6c3d6f 100644 --- a/src/internal/m365/graph/mock/service.go +++ b/src/internal/m365/graph/mock/service.go @@ -33,7 +33,7 @@ func CreateAdapter( return nil, err } - httpClient := graph.KiotaHTTPClient(opts...) + httpClient, _ := graph.KiotaHTTPClient(opts...) // This makes sure that we are able to intercept any requests via // gock. Only necessary for testing. diff --git a/src/internal/m365/graph/service.go b/src/internal/m365/graph/service.go index 2f5ae72ee..212314e49 100644 --- a/src/internal/m365/graph/service.go +++ b/src/internal/m365/graph/service.go @@ -120,7 +120,7 @@ func CreateAdapter( return nil, err } - httpClient := KiotaHTTPClient(opts...) + httpClient, cc := KiotaHTTPClient(opts...) adpt, err := msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient( auth, @@ -130,7 +130,7 @@ func CreateAdapter( return nil, clues.Stack(err) } - return wrapAdapter(adpt), nil + return wrapAdapter(adpt, cc), nil } func GetAuth(tenant string, client string, secret string) (*kauth.AzureIdentityAuthenticationProvider, error) { @@ -158,7 +158,7 @@ func GetAuth(tenant string, client string, secret string) (*kauth.AzureIdentityA // and consume relatively unbound socket connections. It is important // to centralize this client to be passed downstream where api calls // can utilize it on a per-download basis. -func KiotaHTTPClient(opts ...Option) *http.Client { +func KiotaHTTPClient(opts ...Option) (*http.Client, *clientConfig) { var ( clientOptions = msgraphsdkgo.GetDefaultClientOptions() cc = populateConfig(opts...) @@ -170,7 +170,7 @@ func KiotaHTTPClient(opts ...Option) *http.Client { cc.apply(httpClient) - return httpClient + return httpClient, cc } // --------------------------------------------------------------------------- @@ -179,11 +179,14 @@ func KiotaHTTPClient(opts ...Option) *http.Client { type clientConfig struct { noTimeout bool - // MaxRetries before failure + // MaxConnectionRetries is the number of connection-level retries that + // attempt to re-run the request due to a broken or closed connection. + maxConnectionRetries int + // MaxRetries is the number of middleware retires attempted + // before returning with failure maxRetries int // The minimum delay in seconds between retries - minDelay time.Duration - overrideRetryCount bool + minDelay time.Duration appendMiddleware []khttp.Middleware } @@ -193,8 +196,9 @@ type Option func(*clientConfig) // populate constructs a clientConfig according to the provided options. func populateConfig(opts ...Option) *clientConfig { cc := clientConfig{ - maxRetries: defaultMaxRetries, - minDelay: defaultDelay, + maxConnectionRetries: defaultMaxRetries, + maxRetries: defaultMaxRetries, + minDelay: defaultDelay, } for _, opt := range opts { @@ -227,14 +231,25 @@ func NoTimeout() Option { func MaxRetries(max int) Option { return func(c *clientConfig) { - c.overrideRetryCount = true + if max < 0 { + max = 0 + } else if max > 5 { + max = 5 + } + c.maxRetries = max } } -func MinimumBackoff(dur time.Duration) Option { +func MinimumBackoff(min time.Duration) Option { return func(c *clientConfig) { - c.minDelay = dur + if min < 100*time.Millisecond { + min = 100 * time.Millisecond + } else if min > 5*time.Second { + min = 5 * time.Second + } + + c.minDelay = min } } @@ -246,6 +261,18 @@ func appendMiddleware(mw ...khttp.Middleware) Option { } } +func MaxConnectionRetries(max int) Option { + return func(c *clientConfig) { + if max < 0 { + max = 0 + } else if max > 5 { + max = 5 + } + + c.maxConnectionRetries = max + } +} + // --------------------------------------------------------------------------- // Middleware Control // --------------------------------------------------------------------------- @@ -302,10 +329,11 @@ var _ abstractions.RequestAdapter = &adapterWrap{} // 3. Error and debug conditions are logged. type adapterWrap struct { abstractions.RequestAdapter + config *clientConfig } -func wrapAdapter(gra *msgraphsdkgo.GraphRequestAdapter) *adapterWrap { - return &adapterWrap{gra} +func wrapAdapter(gra *msgraphsdkgo.GraphRequestAdapter, cc *clientConfig) *adapterWrap { + return &adapterWrap{gra, cc} } var connectionEnded = filters.Contains([]string{ @@ -331,7 +359,7 @@ func (aw *adapterWrap) Send( // retry wrapper is unsophisticated, but should only // retry in the event of a `stream error`, which is not // a common expectation. - for i := 0; i < 3; i++ { + for i := 0; i < aw.config.maxConnectionRetries+1; i++ { ictx := clues.Add(ctx, "request_retry_iter", i) sp, err = aw.RequestAdapter.Send(ctx, requestInfo, constructor, errorMappings) diff --git a/src/internal/m365/graph/service_test.go b/src/internal/m365/graph/service_test.go index 08d8c6169..fb8e31b7b 100644 --- a/src/internal/m365/graph/service_test.go +++ b/src/internal/m365/graph/service_test.go @@ -67,9 +67,10 @@ func (suite *GraphIntgSuite) TestCreateAdapter() { func (suite *GraphIntgSuite) TestHTTPClient() { table := []struct { - name string - opts []Option - check func(*testing.T, *http.Client) + name string + opts []Option + check func(*testing.T, *http.Client) + checkConfig func(*testing.T, *clientConfig) }{ { name: "no options", @@ -77,23 +78,75 @@ func (suite *GraphIntgSuite) TestHTTPClient() { check: func(t *testing.T, c *http.Client) { assert.Equal(t, defaultHTTPClientTimeout, c.Timeout, "default timeout") }, + checkConfig: func(t *testing.T, c *clientConfig) { + assert.Equal(t, defaultDelay, c.minDelay, "default delay") + assert.Equal(t, defaultMaxRetries, c.maxRetries, "max retries") + assert.Equal(t, defaultMaxRetries, c.maxConnectionRetries, "max connection retries") + }, }, { - name: "no timeout", - opts: []Option{NoTimeout()}, + name: "configured options", + opts: []Option{ + NoTimeout(), + MaxRetries(4), + MaxConnectionRetries(2), + MinimumBackoff(999 * time.Millisecond), + }, check: func(t *testing.T, c *http.Client) { // FIXME: Change to 0 one upstream issue is fixed assert.Equal(t, time.Duration(48*time.Hour), c.Timeout, "unlimited timeout") }, + checkConfig: func(t *testing.T, c *clientConfig) { + assert.Equal(t, 999*time.Millisecond, c.minDelay, "minimum delay") + assert.Equal(t, 4, c.maxRetries, "max retries") + assert.Equal(t, 2, c.maxConnectionRetries, "max connection retries") + }, + }, + { + name: "below minimums", + opts: []Option{ + NoTimeout(), + MaxRetries(-1), + MaxConnectionRetries(-1), + MinimumBackoff(0), + }, + check: func(t *testing.T, c *http.Client) { + // FIXME: Change to 0 one upstream issue is fixed + assert.Equal(t, time.Duration(48*time.Hour), c.Timeout, "unlimited timeout") + }, + checkConfig: func(t *testing.T, c *clientConfig) { + assert.Equal(t, 100*time.Millisecond, c.minDelay, "minimum delay") + assert.Equal(t, 0, c.maxRetries, "max retries") + assert.Equal(t, 0, c.maxConnectionRetries, "max connection retries") + }, + }, + { + name: "above maximums", + opts: []Option{ + NoTimeout(), + MaxRetries(9001), + MaxConnectionRetries(9001), + MinimumBackoff(999 * time.Second), + }, + check: func(t *testing.T, c *http.Client) { + // FIXME: Change to 0 one upstream issue is fixed + assert.Equal(t, time.Duration(48*time.Hour), c.Timeout, "unlimited timeout") + }, + checkConfig: func(t *testing.T, c *clientConfig) { + assert.Equal(t, 5*time.Second, c.minDelay, "minimum delay") + assert.Equal(t, 5, c.maxRetries, "max retries") + assert.Equal(t, 5, c.maxConnectionRetries, "max connection retries") + }, }, } for _, test := range table { suite.Run(test.name, func() { t := suite.T() - cli := KiotaHTTPClient(test.opts...) + cli, cc := KiotaHTTPClient(test.opts...) assert.NotNil(t, cli) test.check(t, cli) + test.checkConfig(t, cc) }) } } @@ -178,12 +231,12 @@ func (suite *GraphIntgSuite) TestAdapterWrap_retriesConnectionClose() { // the query doesn't matter _, err = users.NewItemCalendarsItemEventsDeltaRequestBuilder(url, adpt).Get(ctx, nil) require.ErrorIs(t, err, syscall.ECONNRESET, clues.ToCore(err)) - require.Equal(t, 12, count, "number of retries") + require.Equal(t, 16, count, "number of retries") count = 0 // the query doesn't matter _, err = NewService(adpt).Client().Users().Get(ctx, nil) require.ErrorIs(t, err, syscall.ECONNRESET, clues.ToCore(err)) - require.Equal(t, 12, count, "number of retries") + require.Equal(t, 16, count, "number of retries") }