centralize graph api panic and conn retry (#3884)

Adds a middleware to the graph api adapter which
watches all http request responses for both panics and connection reset issues.  Panics are
transformed into standard errors.  Connection
resets get retried up to three times.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🌻 Feature

#### Test Plan

- [x]  Unit test
This commit is contained in:
Keepers 2023-08-03 19:19:16 -06:00 committed by GitHub
parent d46cb1e3b6
commit 99be32686d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 214 additions and 51 deletions

View File

@ -61,7 +61,8 @@ func Recovery(ctx context.Context, r any, namespace string) error {
err = clues.Wrap(err, "panic recovery"+inFile). err = clues.Wrap(err, "panic recovery"+inFile).
WithClues(ctx). WithClues(ctx).
With("stacktrace", string(debug.Stack())) With("stacktrace", string(debug.Stack())).
WithTrace(2)
logger.CtxErr(ctx, err).Error(namespace + " panic") logger.CtxErr(ctx, err).Error(namespace + " panic")
return err return err

View File

@ -1,12 +1,11 @@
package betasdk package betasdk
import ( import (
absser "github.com/microsoft/kiota-abstractions-go" abstractions "github.com/microsoft/kiota-abstractions-go"
kioser "github.com/microsoft/kiota-abstractions-go/serialization" kioser "github.com/microsoft/kiota-abstractions-go/serialization"
kform "github.com/microsoft/kiota-serialization-form-go" kform "github.com/microsoft/kiota-serialization-form-go"
kw "github.com/microsoft/kiota-serialization-json-go" kw "github.com/microsoft/kiota-serialization-json-go"
ktext "github.com/microsoft/kiota-serialization-text-go" ktext "github.com/microsoft/kiota-serialization-text-go"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
i1a3c1a5501c5e41b7fd169f2d4c768dce9b096ac28fb5431bf02afcc57295411 "github.com/alcionai/corso/src/internal/m365/graph/betasdk/sites" i1a3c1a5501c5e41b7fd169f2d4c768dce9b096ac28fb5431bf02afcc57295411 "github.com/alcionai/corso/src/internal/m365/graph/betasdk/sites"
) )
@ -43,34 +42,34 @@ type BetaClient struct {
// Path parameters for the request // Path parameters for the request
pathParameters map[string]string pathParameters map[string]string
// The request adapter to use to execute the requests. // The request adapter to use to execute the requests.
requestAdapter *msgraphsdk.GraphRequestAdapter requestAdapter abstractions.RequestAdapter
// Url template to use to build the URL for the current request builder // Url template to use to build the URL for the current request builder
urlTemplate string urlTemplate string
} }
// NewBetaClient instantiates a new BetaClient and sets the default values. // NewBetaClient instantiates a new BetaClient and sets the default values.
// func NewBetaClient(requestAdapter i2ae4187f7daee263371cb1c977df639813ab50ffa529013b7437480d1ec0158f.RequestAdapter)(*BetaClient) { // func NewBetaClient(requestAdapter i2ae4187f7daee263371cb1c977df639813ab50ffa529013b7437480d1ec0158f.RequestAdapter)(*BetaClient) {
func NewBetaClient(requestAdapter *msgraphsdk.GraphRequestAdapter) *BetaClient { func NewBetaClient(requestAdapter abstractions.RequestAdapter) *BetaClient {
m := &BetaClient{} m := &BetaClient{}
m.pathParameters = make(map[string]string) m.pathParameters = make(map[string]string)
m.urlTemplate = "{+baseurl}" m.urlTemplate = "{+baseurl}"
m.requestAdapter = requestAdapter m.requestAdapter = requestAdapter
absser.RegisterDefaultSerializer(func() kioser.SerializationWriterFactory { abstractions.RegisterDefaultSerializer(func() kioser.SerializationWriterFactory {
return kw.NewJsonSerializationWriterFactory() return kw.NewJsonSerializationWriterFactory()
}) })
absser.RegisterDefaultSerializer(func() kioser.SerializationWriterFactory { abstractions.RegisterDefaultSerializer(func() kioser.SerializationWriterFactory {
return ktext.NewTextSerializationWriterFactory() return ktext.NewTextSerializationWriterFactory()
}) })
absser.RegisterDefaultSerializer(func() kioser.SerializationWriterFactory { abstractions.RegisterDefaultSerializer(func() kioser.SerializationWriterFactory {
return kform.NewFormSerializationWriterFactory() return kform.NewFormSerializationWriterFactory()
}) })
absser.RegisterDefaultDeserializer(func() kioser.ParseNodeFactory { abstractions.RegisterDefaultDeserializer(func() kioser.ParseNodeFactory {
return kw.NewJsonParseNodeFactory() return kw.NewJsonParseNodeFactory()
}) })
absser.RegisterDefaultDeserializer(func() kioser.ParseNodeFactory { abstractions.RegisterDefaultDeserializer(func() kioser.ParseNodeFactory {
return ktext.NewTextParseNodeFactory() return ktext.NewTextParseNodeFactory()
}) })
absser.RegisterDefaultDeserializer(func() kioser.ParseNodeFactory { abstractions.RegisterDefaultDeserializer(func() kioser.ParseNodeFactory {
return kform.NewFormParseNodeFactory() return kform.NewFormParseNodeFactory()
}) })
@ -93,6 +92,6 @@ func (m *BetaClient) SitesById(id string) *i1a3c1a5501c5e41b7fd169f2d4c768dce9b0
} }
// Adapter() helper method to export Adapter for iterating // Adapter() helper method to export Adapter for iterating
func (m *BetaClient) Adapter() *msgraphsdk.GraphRequestAdapter { func (m *BetaClient) Adapter() abstractions.RequestAdapter {
return m.requestAdapter return m.requestAdapter
} }

View File

@ -99,15 +99,13 @@ func (hw httpWrapper) Request(
var resp *http.Response var resp *http.Response
i := 0
// stream errors from http/2 will fail before we reach // stream errors from http/2 will fail before we reach
// client middleware handling, therefore we don't get to // client middleware handling, therefore we don't get to
// make use of the retry middleware. This external // make use of the retry middleware. This external
// retry wrapper is unsophisticated, but should only // retry wrapper is unsophisticated, but should only
// retry in the event of a `stream error`, which is not // retry in the event of a `stream error`, which is not
// a common expectation. // a common expectation.
for i < 3 { for i := 0; i < 3; i++ {
ictx := clues.Add(ctx, "request_retry_iter", i) ictx := clues.Add(ctx, "request_retry_iter", i)
resp, err = hw.client.Do(req) resp, err = hw.client.Do(req)
@ -119,11 +117,10 @@ func (hw httpWrapper) Request(
break break
} }
logger.Ctx(ictx).Debug("retrying after stream error") logger.Ctx(ictx).Debug("http2 stream error")
events.Inc(events.APICall, "streamerror") events.Inc(events.APICall, "streamerror")
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
i++
} }
if err != nil { if err != nil {

View File

@ -49,8 +49,11 @@ func (suite *HTTPWrapperIntgSuite) TestNewHTTPWrapper() {
} }
type mwForceResp struct { type mwForceResp struct {
err error err error
resp *http.Response resp *http.Response
// if alternate returns true, the middleware returns the
// response and error returned by the func instead of the
// resp and error saved in the struct.
alternate func(*http.Request) (bool, *http.Response, error) alternate func(*http.Request) (bool, *http.Response, error)
} }
@ -81,14 +84,8 @@ func (suite *HTTPWrapperUnitSuite) TestNewHTTPWrapper_redirectMiddleware() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
var ( url := "https://graph.microsoft.com/fnords/beaux/regard"
uri = "https://graph.microsoft.com"
path = "/fnords/beaux/regard"
url = uri + path
)
// can't use gock for this, or else it'll short-circuit the transport,
// and thus skip all the middleware
hdr := http.Header{} hdr := http.Header{}
hdr.Set("Location", "localhost:99999999/smarfs") hdr.Set("Location", "localhost:99999999/smarfs")

View File

@ -1,18 +1,24 @@
package graph package graph
import ( import (
"context"
"net/http" "net/http"
"time" "time"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/alcionai/clues" "github.com/alcionai/clues"
abstractions "github.com/microsoft/kiota-abstractions-go"
"github.com/microsoft/kiota-abstractions-go/serialization" "github.com/microsoft/kiota-abstractions-go/serialization"
kauth "github.com/microsoft/kiota-authentication-azure-go" kauth "github.com/microsoft/kiota-authentication-azure-go"
khttp "github.com/microsoft/kiota-http-go" khttp "github.com/microsoft/kiota-http-go"
msgraphsdkgo "github.com/microsoftgraph/msgraph-sdk-go" msgraphsdkgo "github.com/microsoftgraph/msgraph-sdk-go"
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core" msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
"github.com/alcionai/corso/src/internal/common/crash"
"github.com/alcionai/corso/src/internal/common/idname" "github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/pkg/filters"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
) )
@ -53,7 +59,7 @@ type Servicer interface {
Client() *msgraphsdkgo.GraphServiceClient Client() *msgraphsdkgo.GraphServiceClient
// Adapter() returns GraphRequest adapter used to process large requests, create batches // Adapter() returns GraphRequest adapter used to process large requests, create batches
// and page iterators // and page iterators
Adapter() *msgraphsdkgo.GraphRequestAdapter Adapter() abstractions.RequestAdapter
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -63,18 +69,18 @@ type Servicer interface {
var _ Servicer = &Service{} var _ Servicer = &Service{}
type Service struct { type Service struct {
adapter *msgraphsdkgo.GraphRequestAdapter adapter abstractions.RequestAdapter
client *msgraphsdkgo.GraphServiceClient client *msgraphsdkgo.GraphServiceClient
} }
func NewService(adapter *msgraphsdkgo.GraphRequestAdapter) *Service { func NewService(adapter abstractions.RequestAdapter) *Service {
return &Service{ return &Service{
adapter: adapter, adapter: adapter,
client: msgraphsdkgo.NewGraphServiceClient(adapter), client: msgraphsdkgo.NewGraphServiceClient(adapter),
} }
} }
func (s Service) Adapter() *msgraphsdkgo.GraphRequestAdapter { func (s Service) Adapter() abstractions.RequestAdapter {
return s.adapter return s.adapter
} }
@ -104,11 +110,11 @@ func (s Service) Serialize(object serialization.Parsable) ([]byte, error) {
// CreateAdapter uses provided credentials to log into M365 using Kiota Azure Library // CreateAdapter uses provided credentials to log into M365 using Kiota Azure Library
// with Azure identity package. An adapter object is a necessary to component // with Azure identity package. An adapter object is a necessary to component
// to create *msgraphsdk.GraphServiceClient // to create a graph api client connection.
func CreateAdapter( func CreateAdapter(
tenant, client, secret string, tenant, client, secret string,
opts ...Option, opts ...Option,
) (*msgraphsdkgo.GraphRequestAdapter, error) { ) (abstractions.RequestAdapter, error) {
auth, err := GetAuth(tenant, client, secret) auth, err := GetAuth(tenant, client, secret)
if err != nil { if err != nil {
return nil, err return nil, err
@ -116,10 +122,15 @@ func CreateAdapter(
httpClient := KiotaHTTPClient(opts...) httpClient := KiotaHTTPClient(opts...)
return msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient( adpt, err := msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient(
auth, auth,
nil, nil, nil, nil,
httpClient) httpClient)
if err != nil {
return nil, clues.Stack(err)
}
return wrapAdapter(adpt), nil
} }
func GetAuth(tenant string, client string, secret string) (*kauth.AzureIdentityAuthenticationProvider, error) { func GetAuth(tenant string, client string, secret string) (*kauth.AzureIdentityAuthenticationProvider, error) {
@ -275,3 +286,70 @@ func kiotaMiddlewares(
return mw return mw
} }
// ---------------------------------------------------------------------------
// Graph Api Adapter Wrapper
// ---------------------------------------------------------------------------
var _ abstractions.RequestAdapter = &adapterWrap{}
// adapterWrap takes a GraphRequestAdapter and replaces the Send() function to
// act as a middleware for all http calls. Certain error conditions never reach
// the the client middleware layer, and therefore miss out on logging and retries.
// By hijacking the Send() call, we can ensure three basic needs:
// 1. Panics generated by the graph client are caught instead of crashing corso.
// 2. Http and Http2 connection closures are retried.
// 3. Error and debug conditions are logged.
type adapterWrap struct {
abstractions.RequestAdapter
}
func wrapAdapter(gra *msgraphsdkgo.GraphRequestAdapter) *adapterWrap {
return &adapterWrap{gra}
}
var connectionEnded = filters.Contains([]string{
"connection reset by peer",
"client connection force closed",
})
func (aw *adapterWrap) Send(
ctx context.Context,
requestInfo *abstractions.RequestInformation,
constructor serialization.ParsableFactory,
errorMappings abstractions.ErrorMappings,
) (sp serialization.Parsable, err error) {
defer func() {
if crErr := crash.Recovery(ctx, recover(), "graph adapter request"); crErr != nil {
err = Stack(ctx, crErr)
}
}()
// stream errors from http/2 will fail before we reach
// client middleware handling, therefore we don't get to
// make use of the retry middleware. This external
// retry wrapper is unsophisticated, but should only
// retry in the event of a `stream error`, which is not
// a common expectation.
for i := 0; i < 3; i++ {
ictx := clues.Add(ctx, "request_retry_iter", i)
sp, err = aw.RequestAdapter.Send(ctx, requestInfo, constructor, errorMappings)
if err != nil &&
!(IsErrConnectionReset(err) ||
connectionEnded.Compare(err.Error())) {
return nil, Stack(ictx, err)
}
if err == nil {
break
}
logger.Ctx(ictx).Debug("http connection error")
events.Inc(events.APICall, "connectionerror")
time.Sleep(3 * time.Second)
}
return sp, err
}

View File

@ -2,11 +2,13 @@ package graph
import ( import (
"net/http" "net/http"
"syscall"
"testing" "testing"
"time" "time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/microsoftgraph/msgraph-sdk-go/users"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -16,36 +18,54 @@ import (
"github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/account"
) )
type GraphUnitSuite struct { type GraphIntgSuite struct {
tester.Suite tester.Suite
credentials account.M365Config fakeCredentials account.M365Config
credentials account.M365Config
} }
func TestGraphUnitSuite(t *testing.T) { func TestGraphIntgSuite(t *testing.T) {
suite.Run(t, &GraphUnitSuite{Suite: tester.NewUnitSuite(t)}) suite.Run(t, &GraphIntgSuite{
Suite: tester.NewIntegrationSuite(
t,
[][]string{tconfig.M365AcctCredEnvs}),
})
} }
func (suite *GraphUnitSuite) SetupSuite() { func (suite *GraphIntgSuite) SetupSuite() {
t := suite.T() t := suite.T()
a := tconfig.NewFakeM365Account(t)
m365, err := a.M365Config() ctx, flush := tester.NewContext(t)
defer flush()
fakeAcct := tconfig.NewFakeM365Account(t)
acct := tconfig.NewM365Account(t)
m365, err := fakeAcct.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.fakeCredentials = m365
m365, err = acct.M365Config()
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
suite.credentials = m365 suite.credentials = m365
InitializeConcurrencyLimiter(ctx, false, 0)
} }
func (suite *GraphUnitSuite) TestCreateAdapter() { func (suite *GraphIntgSuite) TestCreateAdapter() {
t := suite.T() t := suite.T()
adpt, err := CreateAdapter( adpt, err := CreateAdapter(
suite.credentials.AzureTenantID, suite.fakeCredentials.AzureTenantID,
suite.credentials.AzureClientID, suite.fakeCredentials.AzureClientID,
suite.credentials.AzureClientSecret) suite.fakeCredentials.AzureClientSecret)
assert.NoError(t, err, clues.ToCore(err)) assert.NoError(t, err, clues.ToCore(err))
assert.NotNil(t, adpt) assert.NotNil(t, adpt)
} }
func (suite *GraphUnitSuite) TestHTTPClient() { func (suite *GraphIntgSuite) TestHTTPClient() {
table := []struct { table := []struct {
name string name string
opts []Option opts []Option
@ -78,12 +98,12 @@ func (suite *GraphUnitSuite) TestHTTPClient() {
} }
} }
func (suite *GraphUnitSuite) TestSerializationEndPoint() { func (suite *GraphIntgSuite) TestSerializationEndPoint() {
t := suite.T() t := suite.T()
adpt, err := CreateAdapter( adpt, err := CreateAdapter(
suite.credentials.AzureTenantID, suite.fakeCredentials.AzureTenantID,
suite.credentials.AzureClientID, suite.fakeCredentials.AzureClientID,
suite.credentials.AzureClientSecret) suite.fakeCredentials.AzureClientSecret)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
serv := NewService(adpt) serv := NewService(adpt)
@ -96,3 +116,74 @@ func (suite *GraphUnitSuite) TestSerializationEndPoint() {
assert.NotNil(t, byteArray) assert.NotNil(t, byteArray)
t.Log(string(byteArray)) t.Log(string(byteArray))
} }
func (suite *GraphIntgSuite) TestAdapterWrap_catchesPanic() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
url := "https://graph.microsoft.com/fnords/beaux/regard"
// the panics should get caught and returned as errors
alwaysPanicMiddleware := mwForceResp{
alternate: func(req *http.Request) (bool, *http.Response, error) {
panic(clues.New("intentional panic"))
},
}
adpt, err := CreateAdapter(
suite.credentials.AzureTenantID,
suite.credentials.AzureClientID,
suite.credentials.AzureClientSecret,
appendMiddleware(&alwaysPanicMiddleware))
require.NoError(t, err, clues.ToCore(err))
// the query doesn't matter
_, err = users.NewItemCalendarsItemEventsDeltaRequestBuilder(url, adpt).Get(ctx, nil)
require.Error(t, err, clues.ToCore(err))
require.Contains(t, err.Error(), "panic", clues.ToCore(err))
// the query doesn't matter
_, err = NewService(adpt).Client().Users().Get(ctx, nil)
require.Error(t, err, clues.ToCore(err))
require.Contains(t, err.Error(), "panic", clues.ToCore(err))
}
func (suite *GraphIntgSuite) TestAdapterWrap_retriesConnectionClose() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
url := "https://graph.microsoft.com/fnords/beaux/regard"
count := 0
// the panics should get caught and returned as errors
alwaysECONNRESET := mwForceResp{
err: syscall.ECONNRESET,
alternate: func(req *http.Request) (bool, *http.Response, error) {
count++
return false, nil, nil
},
}
adpt, err := CreateAdapter(
suite.credentials.AzureTenantID,
suite.credentials.AzureClientID,
suite.credentials.AzureClientSecret,
appendMiddleware(&alwaysECONNRESET))
require.NoError(t, err, clues.ToCore(err))
// the query doesn't matter
_, err = users.NewItemCalendarsItemEventsDeltaRequestBuilder(url, adpt).Get(ctx, nil)
require.ErrorIs(t, err, syscall.ECONNRESET, clues.ToCore(err))
require.Equal(t, 12, count, "number of retries")
count = 0
// the query doesn't matter
_, err = NewService(adpt).Client().Users().Get(ctx, nil)
require.ErrorIs(t, err, syscall.ECONNRESET, clues.ToCore(err))
require.Equal(t, 12, count, "number of retries")
}

View File

@ -2,8 +2,8 @@ package api
import ( import (
"github.com/alcionai/clues" "github.com/alcionai/clues"
abstractions "github.com/microsoft/kiota-abstractions-go"
"github.com/microsoft/kiota-abstractions-go/serialization" "github.com/microsoft/kiota-abstractions-go/serialization"
msgraphsdkgo "github.com/microsoftgraph/msgraph-sdk-go"
"github.com/alcionai/corso/src/internal/m365/graph/betasdk" "github.com/alcionai/corso/src/internal/m365/graph/betasdk"
) )
@ -19,7 +19,7 @@ func (s BetaService) Client() *betasdk.BetaClient {
return s.client return s.client
} }
func NewBetaService(adpt *msgraphsdkgo.GraphRequestAdapter) *BetaService { func NewBetaService(adpt abstractions.RequestAdapter) *BetaService {
return &BetaService{ return &BetaService{
client: betasdk.NewBetaClient(adpt), client: betasdk.NewBetaClient(adpt),
} }