From 99be32686d3af05ec0df88e1b78c6ed443f3384c Mon Sep 17 00:00:00 2001 From: Keepers Date: Thu, 3 Aug 2023 19:19:16 -0600 Subject: [PATCH] centralize graph api panic and conn retry (#3884) Adds a middleware to the graph api adapter which watches all http request responses for both panics and connection reset issues. Panics are transformed into standard errors. Connection resets get retried up to three times. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :sunflower: Feature #### Test Plan - [x] :zap: Unit test --- src/internal/common/crash/crash.go | 3 +- .../m365/graph/betasdk/beta_client.go | 21 ++- src/internal/m365/graph/http_wrapper.go | 7 +- src/internal/m365/graph/http_wrapper_test.go | 15 +-- src/internal/m365/graph/service.go | 92 ++++++++++++- src/internal/m365/graph/service_test.go | 123 +++++++++++++++--- .../m365/sharepoint/api/beta_service.go | 4 +- 7 files changed, 214 insertions(+), 51 deletions(-) diff --git a/src/internal/common/crash/crash.go b/src/internal/common/crash/crash.go index bc1c53159..38afb1982 100644 --- a/src/internal/common/crash/crash.go +++ b/src/internal/common/crash/crash.go @@ -61,7 +61,8 @@ func Recovery(ctx context.Context, r any, namespace string) error { err = clues.Wrap(err, "panic recovery"+inFile). WithClues(ctx). - With("stacktrace", string(debug.Stack())) + With("stacktrace", string(debug.Stack())). + WithTrace(2) logger.CtxErr(ctx, err).Error(namespace + " panic") return err diff --git a/src/internal/m365/graph/betasdk/beta_client.go b/src/internal/m365/graph/betasdk/beta_client.go index 62224721b..cf5c5607b 100644 --- a/src/internal/m365/graph/betasdk/beta_client.go +++ b/src/internal/m365/graph/betasdk/beta_client.go @@ -1,12 +1,11 @@ package betasdk import ( - absser "github.com/microsoft/kiota-abstractions-go" + abstractions "github.com/microsoft/kiota-abstractions-go" kioser "github.com/microsoft/kiota-abstractions-go/serialization" kform "github.com/microsoft/kiota-serialization-form-go" kw "github.com/microsoft/kiota-serialization-json-go" ktext "github.com/microsoft/kiota-serialization-text-go" - msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" i1a3c1a5501c5e41b7fd169f2d4c768dce9b096ac28fb5431bf02afcc57295411 "github.com/alcionai/corso/src/internal/m365/graph/betasdk/sites" ) @@ -43,34 +42,34 @@ type BetaClient struct { // Path parameters for the request pathParameters map[string]string // The request adapter to use to execute the requests. - requestAdapter *msgraphsdk.GraphRequestAdapter + requestAdapter abstractions.RequestAdapter // Url template to use to build the URL for the current request builder urlTemplate string } // NewBetaClient instantiates a new BetaClient and sets the default values. // func NewBetaClient(requestAdapter i2ae4187f7daee263371cb1c977df639813ab50ffa529013b7437480d1ec0158f.RequestAdapter)(*BetaClient) { -func NewBetaClient(requestAdapter *msgraphsdk.GraphRequestAdapter) *BetaClient { +func NewBetaClient(requestAdapter abstractions.RequestAdapter) *BetaClient { m := &BetaClient{} m.pathParameters = make(map[string]string) m.urlTemplate = "{+baseurl}" m.requestAdapter = requestAdapter - absser.RegisterDefaultSerializer(func() kioser.SerializationWriterFactory { + abstractions.RegisterDefaultSerializer(func() kioser.SerializationWriterFactory { return kw.NewJsonSerializationWriterFactory() }) - absser.RegisterDefaultSerializer(func() kioser.SerializationWriterFactory { + abstractions.RegisterDefaultSerializer(func() kioser.SerializationWriterFactory { return ktext.NewTextSerializationWriterFactory() }) - absser.RegisterDefaultSerializer(func() kioser.SerializationWriterFactory { + abstractions.RegisterDefaultSerializer(func() kioser.SerializationWriterFactory { return kform.NewFormSerializationWriterFactory() }) - absser.RegisterDefaultDeserializer(func() kioser.ParseNodeFactory { + abstractions.RegisterDefaultDeserializer(func() kioser.ParseNodeFactory { return kw.NewJsonParseNodeFactory() }) - absser.RegisterDefaultDeserializer(func() kioser.ParseNodeFactory { + abstractions.RegisterDefaultDeserializer(func() kioser.ParseNodeFactory { return ktext.NewTextParseNodeFactory() }) - absser.RegisterDefaultDeserializer(func() kioser.ParseNodeFactory { + abstractions.RegisterDefaultDeserializer(func() kioser.ParseNodeFactory { return kform.NewFormParseNodeFactory() }) @@ -93,6 +92,6 @@ func (m *BetaClient) SitesById(id string) *i1a3c1a5501c5e41b7fd169f2d4c768dce9b0 } // Adapter() helper method to export Adapter for iterating -func (m *BetaClient) Adapter() *msgraphsdk.GraphRequestAdapter { +func (m *BetaClient) Adapter() abstractions.RequestAdapter { return m.requestAdapter } diff --git a/src/internal/m365/graph/http_wrapper.go b/src/internal/m365/graph/http_wrapper.go index 8a55d3ca6..23e5566e1 100644 --- a/src/internal/m365/graph/http_wrapper.go +++ b/src/internal/m365/graph/http_wrapper.go @@ -99,15 +99,13 @@ func (hw httpWrapper) Request( var resp *http.Response - i := 0 - // stream errors from http/2 will fail before we reach // client middleware handling, therefore we don't get to // make use of the retry middleware. This external // retry wrapper is unsophisticated, but should only // retry in the event of a `stream error`, which is not // a common expectation. - for i < 3 { + for i := 0; i < 3; i++ { ictx := clues.Add(ctx, "request_retry_iter", i) resp, err = hw.client.Do(req) @@ -119,11 +117,10 @@ func (hw httpWrapper) Request( break } - logger.Ctx(ictx).Debug("retrying after stream error") + logger.Ctx(ictx).Debug("http2 stream error") events.Inc(events.APICall, "streamerror") time.Sleep(3 * time.Second) - i++ } if err != nil { diff --git a/src/internal/m365/graph/http_wrapper_test.go b/src/internal/m365/graph/http_wrapper_test.go index 19711edc4..31e5faf52 100644 --- a/src/internal/m365/graph/http_wrapper_test.go +++ b/src/internal/m365/graph/http_wrapper_test.go @@ -49,8 +49,11 @@ func (suite *HTTPWrapperIntgSuite) TestNewHTTPWrapper() { } type mwForceResp struct { - err error - resp *http.Response + err error + resp *http.Response + // if alternate returns true, the middleware returns the + // response and error returned by the func instead of the + // resp and error saved in the struct. alternate func(*http.Request) (bool, *http.Response, error) } @@ -81,14 +84,8 @@ func (suite *HTTPWrapperUnitSuite) TestNewHTTPWrapper_redirectMiddleware() { ctx, flush := tester.NewContext(t) defer flush() - var ( - uri = "https://graph.microsoft.com" - path = "/fnords/beaux/regard" - url = uri + path - ) + url := "https://graph.microsoft.com/fnords/beaux/regard" - // can't use gock for this, or else it'll short-circuit the transport, - // and thus skip all the middleware hdr := http.Header{} hdr.Set("Location", "localhost:99999999/smarfs") diff --git a/src/internal/m365/graph/service.go b/src/internal/m365/graph/service.go index 04076f0e0..2f5ae72ee 100644 --- a/src/internal/m365/graph/service.go +++ b/src/internal/m365/graph/service.go @@ -1,18 +1,24 @@ package graph import ( + "context" "net/http" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/alcionai/clues" + abstractions "github.com/microsoft/kiota-abstractions-go" "github.com/microsoft/kiota-abstractions-go/serialization" kauth "github.com/microsoft/kiota-authentication-azure-go" khttp "github.com/microsoft/kiota-http-go" msgraphsdkgo "github.com/microsoftgraph/msgraph-sdk-go" msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core" + "github.com/alcionai/corso/src/internal/common/crash" "github.com/alcionai/corso/src/internal/common/idname" + "github.com/alcionai/corso/src/internal/events" + "github.com/alcionai/corso/src/pkg/filters" + "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" ) @@ -53,7 +59,7 @@ type Servicer interface { Client() *msgraphsdkgo.GraphServiceClient // Adapter() returns GraphRequest adapter used to process large requests, create batches // and page iterators - Adapter() *msgraphsdkgo.GraphRequestAdapter + Adapter() abstractions.RequestAdapter } // --------------------------------------------------------------------------- @@ -63,18 +69,18 @@ type Servicer interface { var _ Servicer = &Service{} type Service struct { - adapter *msgraphsdkgo.GraphRequestAdapter + adapter abstractions.RequestAdapter client *msgraphsdkgo.GraphServiceClient } -func NewService(adapter *msgraphsdkgo.GraphRequestAdapter) *Service { +func NewService(adapter abstractions.RequestAdapter) *Service { return &Service{ adapter: adapter, client: msgraphsdkgo.NewGraphServiceClient(adapter), } } -func (s Service) Adapter() *msgraphsdkgo.GraphRequestAdapter { +func (s Service) Adapter() abstractions.RequestAdapter { return s.adapter } @@ -104,11 +110,11 @@ func (s Service) Serialize(object serialization.Parsable) ([]byte, error) { // CreateAdapter uses provided credentials to log into M365 using Kiota Azure Library // with Azure identity package. An adapter object is a necessary to component -// to create *msgraphsdk.GraphServiceClient +// to create a graph api client connection. func CreateAdapter( tenant, client, secret string, opts ...Option, -) (*msgraphsdkgo.GraphRequestAdapter, error) { +) (abstractions.RequestAdapter, error) { auth, err := GetAuth(tenant, client, secret) if err != nil { return nil, err @@ -116,10 +122,15 @@ func CreateAdapter( httpClient := KiotaHTTPClient(opts...) - return msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient( + adpt, err := msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient( auth, nil, nil, httpClient) + if err != nil { + return nil, clues.Stack(err) + } + + return wrapAdapter(adpt), nil } func GetAuth(tenant string, client string, secret string) (*kauth.AzureIdentityAuthenticationProvider, error) { @@ -275,3 +286,70 @@ func kiotaMiddlewares( return mw } + +// --------------------------------------------------------------------------- +// Graph Api Adapter Wrapper +// --------------------------------------------------------------------------- + +var _ abstractions.RequestAdapter = &adapterWrap{} + +// adapterWrap takes a GraphRequestAdapter and replaces the Send() function to +// act as a middleware for all http calls. Certain error conditions never reach +// the the client middleware layer, and therefore miss out on logging and retries. +// By hijacking the Send() call, we can ensure three basic needs: +// 1. Panics generated by the graph client are caught instead of crashing corso. +// 2. Http and Http2 connection closures are retried. +// 3. Error and debug conditions are logged. +type adapterWrap struct { + abstractions.RequestAdapter +} + +func wrapAdapter(gra *msgraphsdkgo.GraphRequestAdapter) *adapterWrap { + return &adapterWrap{gra} +} + +var connectionEnded = filters.Contains([]string{ + "connection reset by peer", + "client connection force closed", +}) + +func (aw *adapterWrap) Send( + ctx context.Context, + requestInfo *abstractions.RequestInformation, + constructor serialization.ParsableFactory, + errorMappings abstractions.ErrorMappings, +) (sp serialization.Parsable, err error) { + defer func() { + if crErr := crash.Recovery(ctx, recover(), "graph adapter request"); crErr != nil { + err = Stack(ctx, crErr) + } + }() + + // stream errors from http/2 will fail before we reach + // client middleware handling, therefore we don't get to + // make use of the retry middleware. This external + // retry wrapper is unsophisticated, but should only + // retry in the event of a `stream error`, which is not + // a common expectation. + for i := 0; i < 3; i++ { + ictx := clues.Add(ctx, "request_retry_iter", i) + + sp, err = aw.RequestAdapter.Send(ctx, requestInfo, constructor, errorMappings) + if err != nil && + !(IsErrConnectionReset(err) || + connectionEnded.Compare(err.Error())) { + return nil, Stack(ictx, err) + } + + if err == nil { + break + } + + logger.Ctx(ictx).Debug("http connection error") + events.Inc(events.APICall, "connectionerror") + + time.Sleep(3 * time.Second) + } + + return sp, err +} diff --git a/src/internal/m365/graph/service_test.go b/src/internal/m365/graph/service_test.go index 6a97f4913..08d8c6169 100644 --- a/src/internal/m365/graph/service_test.go +++ b/src/internal/m365/graph/service_test.go @@ -2,11 +2,13 @@ package graph import ( "net/http" + "syscall" "testing" "time" "github.com/alcionai/clues" "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/microsoftgraph/msgraph-sdk-go/users" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -16,36 +18,54 @@ import ( "github.com/alcionai/corso/src/pkg/account" ) -type GraphUnitSuite struct { +type GraphIntgSuite struct { tester.Suite - credentials account.M365Config + fakeCredentials account.M365Config + credentials account.M365Config } -func TestGraphUnitSuite(t *testing.T) { - suite.Run(t, &GraphUnitSuite{Suite: tester.NewUnitSuite(t)}) +func TestGraphIntgSuite(t *testing.T) { + suite.Run(t, &GraphIntgSuite{ + Suite: tester.NewIntegrationSuite( + t, + [][]string{tconfig.M365AcctCredEnvs}), + }) } -func (suite *GraphUnitSuite) SetupSuite() { +func (suite *GraphIntgSuite) SetupSuite() { t := suite.T() - a := tconfig.NewFakeM365Account(t) - m365, err := a.M365Config() + + ctx, flush := tester.NewContext(t) + defer flush() + + fakeAcct := tconfig.NewFakeM365Account(t) + acct := tconfig.NewM365Account(t) + + m365, err := fakeAcct.M365Config() + require.NoError(t, err, clues.ToCore(err)) + + suite.fakeCredentials = m365 + + m365, err = acct.M365Config() require.NoError(t, err, clues.ToCore(err)) suite.credentials = m365 + + InitializeConcurrencyLimiter(ctx, false, 0) } -func (suite *GraphUnitSuite) TestCreateAdapter() { +func (suite *GraphIntgSuite) TestCreateAdapter() { t := suite.T() adpt, err := CreateAdapter( - suite.credentials.AzureTenantID, - suite.credentials.AzureClientID, - suite.credentials.AzureClientSecret) + suite.fakeCredentials.AzureTenantID, + suite.fakeCredentials.AzureClientID, + suite.fakeCredentials.AzureClientSecret) assert.NoError(t, err, clues.ToCore(err)) assert.NotNil(t, adpt) } -func (suite *GraphUnitSuite) TestHTTPClient() { +func (suite *GraphIntgSuite) TestHTTPClient() { table := []struct { name string opts []Option @@ -78,12 +98,12 @@ func (suite *GraphUnitSuite) TestHTTPClient() { } } -func (suite *GraphUnitSuite) TestSerializationEndPoint() { +func (suite *GraphIntgSuite) TestSerializationEndPoint() { t := suite.T() adpt, err := CreateAdapter( - suite.credentials.AzureTenantID, - suite.credentials.AzureClientID, - suite.credentials.AzureClientSecret) + suite.fakeCredentials.AzureTenantID, + suite.fakeCredentials.AzureClientID, + suite.fakeCredentials.AzureClientSecret) require.NoError(t, err, clues.ToCore(err)) serv := NewService(adpt) @@ -96,3 +116,74 @@ func (suite *GraphUnitSuite) TestSerializationEndPoint() { assert.NotNil(t, byteArray) t.Log(string(byteArray)) } + +func (suite *GraphIntgSuite) TestAdapterWrap_catchesPanic() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + url := "https://graph.microsoft.com/fnords/beaux/regard" + + // the panics should get caught and returned as errors + alwaysPanicMiddleware := mwForceResp{ + alternate: func(req *http.Request) (bool, *http.Response, error) { + panic(clues.New("intentional panic")) + }, + } + + adpt, err := CreateAdapter( + suite.credentials.AzureTenantID, + suite.credentials.AzureClientID, + suite.credentials.AzureClientSecret, + appendMiddleware(&alwaysPanicMiddleware)) + require.NoError(t, err, clues.ToCore(err)) + + // the query doesn't matter + _, err = users.NewItemCalendarsItemEventsDeltaRequestBuilder(url, adpt).Get(ctx, nil) + require.Error(t, err, clues.ToCore(err)) + require.Contains(t, err.Error(), "panic", clues.ToCore(err)) + + // the query doesn't matter + _, err = NewService(adpt).Client().Users().Get(ctx, nil) + require.Error(t, err, clues.ToCore(err)) + require.Contains(t, err.Error(), "panic", clues.ToCore(err)) +} + +func (suite *GraphIntgSuite) TestAdapterWrap_retriesConnectionClose() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + url := "https://graph.microsoft.com/fnords/beaux/regard" + count := 0 + + // the panics should get caught and returned as errors + alwaysECONNRESET := mwForceResp{ + err: syscall.ECONNRESET, + alternate: func(req *http.Request) (bool, *http.Response, error) { + count++ + return false, nil, nil + }, + } + + adpt, err := CreateAdapter( + suite.credentials.AzureTenantID, + suite.credentials.AzureClientID, + suite.credentials.AzureClientSecret, + appendMiddleware(&alwaysECONNRESET)) + require.NoError(t, err, clues.ToCore(err)) + + // the query doesn't matter + _, err = users.NewItemCalendarsItemEventsDeltaRequestBuilder(url, adpt).Get(ctx, nil) + require.ErrorIs(t, err, syscall.ECONNRESET, clues.ToCore(err)) + require.Equal(t, 12, count, "number of retries") + + count = 0 + + // the query doesn't matter + _, err = NewService(adpt).Client().Users().Get(ctx, nil) + require.ErrorIs(t, err, syscall.ECONNRESET, clues.ToCore(err)) + require.Equal(t, 12, count, "number of retries") +} diff --git a/src/internal/m365/sharepoint/api/beta_service.go b/src/internal/m365/sharepoint/api/beta_service.go index b184002a1..ce1ad1d90 100644 --- a/src/internal/m365/sharepoint/api/beta_service.go +++ b/src/internal/m365/sharepoint/api/beta_service.go @@ -2,8 +2,8 @@ package api import ( "github.com/alcionai/clues" + abstractions "github.com/microsoft/kiota-abstractions-go" "github.com/microsoft/kiota-abstractions-go/serialization" - msgraphsdkgo "github.com/microsoftgraph/msgraph-sdk-go" "github.com/alcionai/corso/src/internal/m365/graph/betasdk" ) @@ -19,7 +19,7 @@ func (s BetaService) Client() *betasdk.BetaClient { return s.client } -func NewBetaService(adpt *msgraphsdkgo.GraphRequestAdapter) *BetaService { +func NewBetaService(adpt abstractions.RequestAdapter) *BetaService { return &BetaService{ client: betasdk.NewBetaClient(adpt), }