From 328d75945b00c51805434852156e92e411f4e4a0 Mon Sep 17 00:00:00 2001 From: Keepers Date: Thu, 23 Mar 2023 18:40:08 -0600 Subject: [PATCH] PoC for centralized throttling control (#2887) PoC for using a centralized middleware to control 1k-per-minute get requests to graph api. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :robot: Supportability/Tests #### Test Plan - [x] :muscle: Manual --- src/cmd/sanity_test/sanity_tests.go | 3 +- src/go.mod | 1 + src/go.sum | 2 + src/internal/connector/discovery/api/api.go | 3 +- .../discovery/api/beta_service_test.go | 3 +- .../connector/exchange/restore_test.go | 5 +- .../connector/exchange/service_functions.go | 3 +- .../graph/betasdk/beta_client_test.go | 3 +- src/internal/connector/graph/service.go | 53 +++++++++++++++++-- src/internal/connector/graph_connector.go | 3 +- .../connector/onedrive/api/drive_test.go | 5 +- .../connector/onedrive/service_test.go | 3 +- .../connector/sharepoint/api/helper_test.go | 3 +- .../connector/sharepoint/data_collections.go | 5 +- .../connector/sharepoint/helper_test.go | 3 +- src/internal/connector/sharepoint/restore.go | 5 +- src/internal/operations/restore_test.go | 8 ++- 17 files changed, 85 insertions(+), 26 deletions(-) diff --git a/src/cmd/sanity_test/sanity_tests.go b/src/cmd/sanity_test/sanity_tests.go index d091b0b5f..65fbb9307 100644 --- a/src/cmd/sanity_test/sanity_tests.go +++ b/src/cmd/sanity_test/sanity_tests.go @@ -19,8 +19,7 @@ func main() { adapter, err := graph.CreateAdapter( os.Getenv("AZURE_TENANT_ID"), os.Getenv("AZURE_CLIENT_ID"), - os.Getenv("AZURE_CLIENT_SECRET"), - ) + os.Getenv("AZURE_CLIENT_SECRET")) if err != nil { fmt.Println("error while creating adapter: ", err) os.Exit(1) diff --git a/src/go.mod b/src/go.mod index a2fd5fa96..ba47354a1 100644 --- a/src/go.mod +++ b/src/go.mod @@ -29,6 +29,7 @@ require ( github.com/vbauerster/mpb/v8 v8.1.6 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb + golang.org/x/time v0.1.0 golang.org/x/tools v0.7.0 gopkg.in/resty.v1 v1.12.0 ) diff --git a/src/go.sum b/src/go.sum index e8f661fa7..ee51ec28f 100644 --- a/src/go.sum +++ b/src/go.sum @@ -630,6 +630,8 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA= +golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/src/internal/connector/discovery/api/api.go b/src/internal/connector/discovery/api/api.go index 20fc1b25d..6e623c4d2 100644 --- a/src/internal/connector/discovery/api/api.go +++ b/src/internal/connector/discovery/api/api.go @@ -47,8 +47,7 @@ func newService(creds account.M365Config) (*graph.Service, error) { adapter, err := graph.CreateAdapter( creds.AzureTenantID, creds.AzureClientID, - creds.AzureClientSecret, - ) + creds.AzureClientSecret) if err != nil { return nil, errors.Wrap(err, "generating graph api service client") } diff --git a/src/internal/connector/discovery/api/beta_service_test.go b/src/internal/connector/discovery/api/beta_service_test.go index 2a1dabd19..90ffd1972 100644 --- a/src/internal/connector/discovery/api/beta_service_test.go +++ b/src/internal/connector/discovery/api/beta_service_test.go @@ -30,8 +30,7 @@ func (suite *BetaUnitSuite) TestBetaService_Adapter() { adpt, err := graph.CreateAdapter( m365.AzureTenantID, m365.AzureClientID, - m365.AzureClientSecret, - ) + m365.AzureClientSecret) require.NoError(t, err, clues.ToCore(err)) service := NewBetaService(adpt) diff --git a/src/internal/connector/exchange/restore_test.go b/src/internal/connector/exchange/restore_test.go index 7f07be1e2..cd8448c94 100644 --- a/src/internal/connector/exchange/restore_test.go +++ b/src/internal/connector/exchange/restore_test.go @@ -49,7 +49,10 @@ func (suite *ExchangeRestoreSuite) SetupSuite() { suite.ac, err = api.NewClient(m365) require.NoError(t, err, clues.ToCore(err)) - adpt, err := graph.CreateAdapter(m365.AzureTenantID, m365.AzureClientID, m365.AzureClientSecret) + adpt, err := graph.CreateAdapter( + m365.AzureTenantID, + m365.AzureClientID, + m365.AzureClientSecret) require.NoError(t, err, clues.ToCore(err)) suite.gs = graph.NewService(adpt) diff --git a/src/internal/connector/exchange/service_functions.go b/src/internal/connector/exchange/service_functions.go index d976607af..62b1058ff 100644 --- a/src/internal/connector/exchange/service_functions.go +++ b/src/internal/connector/exchange/service_functions.go @@ -21,8 +21,7 @@ func createService(credentials account.M365Config) (*graph.Service, error) { adapter, err := graph.CreateAdapter( credentials.AzureTenantID, credentials.AzureClientID, - credentials.AzureClientSecret, - ) + credentials.AzureClientSecret) if err != nil { return nil, errors.Wrap(err, "creating microsoft graph service for exchange") } diff --git a/src/internal/connector/graph/betasdk/beta_client_test.go b/src/internal/connector/graph/betasdk/beta_client_test.go index cd1dc3dda..6f03a7eff 100644 --- a/src/internal/connector/graph/betasdk/beta_client_test.go +++ b/src/internal/connector/graph/betasdk/beta_client_test.go @@ -38,8 +38,7 @@ func (suite *BetaClientSuite) TestCreateBetaClient() { adpt, err := graph.CreateAdapter( suite.credentials.AzureTenantID, suite.credentials.AzureClientID, - suite.credentials.AzureClientSecret, - ) + suite.credentials.AzureClientSecret) require.NoError(t, err, clues.ToCore(err)) diff --git a/src/internal/connector/graph/service.go b/src/internal/connector/graph/service.go index 8b7d39c3f..8db4670f8 100644 --- a/src/internal/connector/graph/service.go +++ b/src/internal/connector/graph/service.go @@ -17,6 +17,7 @@ import ( msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core" "github.com/pkg/errors" + "golang.org/x/time/rate" "github.com/alcionai/clues" "github.com/alcionai/corso/src/pkg/account" @@ -167,7 +168,10 @@ func MinimumBackoff(dur time.Duration) option { // CreateAdapter uses provided credentials to log into M365 using Kiota Azure Library // with Azure identity package. An adapter object is a necessary to component // to create *msgraphsdk.GraphServiceClient -func CreateAdapter(tenant, client, secret string, opts ...option) (*msgraphsdk.GraphRequestAdapter, error) { +func CreateAdapter( + tenant, client, secret string, + opts ...option, +) (*msgraphsdk.GraphRequestAdapter, error) { // Client Provider: Uses Secret for access to tenant-level data cred, err := azidentity.NewClientSecretCredential(tenant, client, secret, nil) if err != nil { @@ -224,12 +228,15 @@ func GetMiddlewares(maxRetry int, delay time.Duration) []khttp.Middleware { khttp.NewParametersNameDecodingHandler(), khttp.NewUserAgentHandler(), &LoggingMiddleware{}, + &ThrottleControlMiddleware{}, } } // GetKiotaMiddlewares creates a default slice of middleware for the Graph Client. -func GetKiotaMiddlewares(options *msgraphgocore.GraphClientOptions, - maxRetry int, minDelay time.Duration, +func GetKiotaMiddlewares( + options *msgraphgocore.GraphClientOptions, + maxRetry int, + minDelay time.Duration, ) []khttp.Middleware { kiotaMiddlewares := GetMiddlewares(maxRetry, minDelay) graphMiddlewares := []khttp.Middleware{ @@ -368,3 +375,43 @@ func (middleware RetryHandler) Intercept( return response, nil } + +// We're trying to keep calls below the 10k-per-10-minute threshold. +// 15 tokens every second nets 900 per minute. That's 9000 every 10 minutes, +// which is a bit below the mark. +// But suppose we have a minute-long dry spell followed by a 10 minute tsunami. +// We'll have built up 900 tokens in reserve, so the first 900 calls go through +// immediately. Over the next 10 minutes, we'll partition out the other calls +// at a rate of 900-per-minute, ending at a total of 9900. Theoretically, if +// the volume keeps up after that, we'll always stay between 9000 and 9900 out +// of 10k. +const ( + perSecond = 15 + maxCap = 900 +) + +// Single, global rate limiter at this time. Refinements for method (creates, +// versus reads) or service can come later. +var limiter = rate.NewLimiter(perSecond, maxCap) + +// QueueRequest will allow the request to occur immediately if we're under the +// 1k-calls-per-minute rate. Otherwise, the call will wait in a queue until +// the next token set is available. +func QueueRequest(ctx context.Context) { + if err := limiter.Wait(ctx); err != nil { + logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter") + } +} + +// ThrottleControlMiddleware is used to ensure we don't overstep 10k-per-10-min +// request limits. +type ThrottleControlMiddleware struct{} + +func (handler *ThrottleControlMiddleware) Intercept( + pipeline khttp.Pipeline, + middlewareIndex int, + req *http.Request, +) (*http.Response, error) { + QueueRequest(req.Context()) + return pipeline.Next(req, middlewareIndex) +} diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index c1b3f6529..d20d0fabd 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -105,8 +105,7 @@ func (gc *GraphConnector) createService() (*graph.Service, error) { adapter, err := graph.CreateAdapter( gc.credentials.AzureTenantID, gc.credentials.AzureClientID, - gc.credentials.AzureClientSecret, - ) + gc.credentials.AzureClientSecret) if err != nil { return &graph.Service{}, err } diff --git a/src/internal/connector/onedrive/api/drive_test.go b/src/internal/connector/onedrive/api/drive_test.go index ec4406a35..03d50b643 100644 --- a/src/internal/connector/onedrive/api/drive_test.go +++ b/src/internal/connector/onedrive/api/drive_test.go @@ -27,7 +27,10 @@ func (suite *OneDriveAPISuite) SetupSuite() { require.NoError(t, err, clues.ToCore(err)) suite.creds = m365 - adpt, err := graph.CreateAdapter(m365.AzureTenantID, m365.AzureClientID, m365.AzureClientSecret) + adpt, err := graph.CreateAdapter( + m365.AzureTenantID, + m365.AzureClientID, + m365.AzureClientSecret) require.NoError(t, err, clues.ToCore(err)) suite.service = graph.NewService(adpt) diff --git a/src/internal/connector/onedrive/service_test.go b/src/internal/connector/onedrive/service_test.go index 352ca5878..d2bffa258 100644 --- a/src/internal/connector/onedrive/service_test.go +++ b/src/internal/connector/onedrive/service_test.go @@ -43,8 +43,7 @@ func NewOneDriveService(credentials account.M365Config) (*oneDriveService, error adapter, err := graph.CreateAdapter( credentials.AzureTenantID, credentials.AzureClientID, - credentials.AzureClientSecret, - ) + credentials.AzureClientSecret) if err != nil { return nil, err } diff --git a/src/internal/connector/sharepoint/api/helper_test.go b/src/internal/connector/sharepoint/api/helper_test.go index 442d267d9..974d3cc43 100644 --- a/src/internal/connector/sharepoint/api/helper_test.go +++ b/src/internal/connector/sharepoint/api/helper_test.go @@ -15,8 +15,7 @@ func createTestBetaService(t *testing.T, credentials account.M365Config) *discov adapter, err := graph.CreateAdapter( credentials.AzureTenantID, credentials.AzureClientID, - credentials.AzureClientSecret, - ) + credentials.AzureClientSecret) require.NoError(t, err, clues.ToCore(err)) return discover.NewBetaService(adapter) diff --git a/src/internal/connector/sharepoint/data_collections.go b/src/internal/connector/sharepoint/data_collections.go index 9ee099c37..bc72f9e57 100644 --- a/src/internal/connector/sharepoint/data_collections.go +++ b/src/internal/connector/sharepoint/data_collections.go @@ -236,7 +236,10 @@ func collectPages( // make the betaClient // Need to receive From DataCollection Call - adpt, err := graph.CreateAdapter(creds.AzureTenantID, creds.AzureClientID, creds.AzureClientSecret) + adpt, err := graph.CreateAdapter( + creds.AzureTenantID, + creds.AzureClientID, + creds.AzureClientSecret) if err != nil { return nil, clues.Wrap(err, "creating azure client adapter") } diff --git a/src/internal/connector/sharepoint/helper_test.go b/src/internal/connector/sharepoint/helper_test.go index 8e0f645ce..6b1e8eaa5 100644 --- a/src/internal/connector/sharepoint/helper_test.go +++ b/src/internal/connector/sharepoint/helper_test.go @@ -51,8 +51,7 @@ func createTestService(t *testing.T, credentials account.M365Config) *graph.Serv adapter, err := graph.CreateAdapter( credentials.AzureTenantID, credentials.AzureClientID, - credentials.AzureClientSecret, - ) + credentials.AzureClientSecret) require.NoError(t, err, "creating microsoft graph service for exchange", clues.ToCore(err)) return graph.NewService(adapter) diff --git a/src/internal/connector/sharepoint/restore.go b/src/internal/connector/sharepoint/restore.go index 44bb31302..5502b57f3 100644 --- a/src/internal/connector/sharepoint/restore.go +++ b/src/internal/connector/sharepoint/restore.go @@ -304,7 +304,10 @@ func RestorePageCollection( defer end() - adpt, err := graph.CreateAdapter(creds.AzureTenantID, creds.AzureClientID, creds.AzureClientSecret) + adpt, err := graph.CreateAdapter( + creds.AzureTenantID, + creds.AzureClientID, + creds.AzureClientSecret) if err != nil { return metrics, clues.Wrap(err, "constructing graph client") } diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index 27533fe78..1e7cec997 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -345,12 +345,18 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() { return bsel.Selector }, cleanup: func(t *testing.T, dest string) { + ctx, flush := tester.NewContext() + defer flush() + act := tester.NewM365Account(t) m365, err := act.M365Config() require.NoError(t, err, clues.ToCore(err)) - adpt, err := graph.CreateAdapter(m365.AzureTenantID, m365.AzureClientID, m365.AzureClientSecret) + adpt, err := graph.CreateAdapter( + m365.AzureTenantID, + m365.AzureClientID, + m365.AzureClientSecret) require.NoError(t, err, clues.ToCore(err)) service := graph.NewService(adpt)