PoC for centralized throttling control (#2887)
PoC for using a centralized middleware to control 1k-per-minute get requests to graph api. --- #### Does this PR need a docs update or release note? - [x] ⛔ No #### Type of change - [x] 🤖 Supportability/Tests #### Test Plan - [x] 💪 Manual
This commit is contained in:
parent
b82c994b91
commit
328d75945b
@ -19,8 +19,7 @@ func main() {
|
||||
adapter, err := graph.CreateAdapter(
|
||||
os.Getenv("AZURE_TENANT_ID"),
|
||||
os.Getenv("AZURE_CLIENT_ID"),
|
||||
os.Getenv("AZURE_CLIENT_SECRET"),
|
||||
)
|
||||
os.Getenv("AZURE_CLIENT_SECRET"))
|
||||
if err != nil {
|
||||
fmt.Println("error while creating adapter: ", err)
|
||||
os.Exit(1)
|
||||
|
||||
@ -29,6 +29,7 @@ require (
|
||||
github.com/vbauerster/mpb/v8 v8.1.6
|
||||
go.uber.org/zap v1.24.0
|
||||
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb
|
||||
golang.org/x/time v0.1.0
|
||||
golang.org/x/tools v0.7.0
|
||||
gopkg.in/resty.v1 v1.12.0
|
||||
)
|
||||
|
||||
@ -630,6 +630,8 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=
|
||||
golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||
|
||||
@ -47,8 +47,7 @@ func newService(creds account.M365Config) (*graph.Service, error) {
|
||||
adapter, err := graph.CreateAdapter(
|
||||
creds.AzureTenantID,
|
||||
creds.AzureClientID,
|
||||
creds.AzureClientSecret,
|
||||
)
|
||||
creds.AzureClientSecret)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "generating graph api service client")
|
||||
}
|
||||
|
||||
@ -30,8 +30,7 @@ func (suite *BetaUnitSuite) TestBetaService_Adapter() {
|
||||
adpt, err := graph.CreateAdapter(
|
||||
m365.AzureTenantID,
|
||||
m365.AzureClientID,
|
||||
m365.AzureClientSecret,
|
||||
)
|
||||
m365.AzureClientSecret)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
service := NewBetaService(adpt)
|
||||
|
||||
@ -49,7 +49,10 @@ func (suite *ExchangeRestoreSuite) SetupSuite() {
|
||||
suite.ac, err = api.NewClient(m365)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
adpt, err := graph.CreateAdapter(m365.AzureTenantID, m365.AzureClientID, m365.AzureClientSecret)
|
||||
adpt, err := graph.CreateAdapter(
|
||||
m365.AzureTenantID,
|
||||
m365.AzureClientID,
|
||||
m365.AzureClientSecret)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
suite.gs = graph.NewService(adpt)
|
||||
|
||||
@ -21,8 +21,7 @@ func createService(credentials account.M365Config) (*graph.Service, error) {
|
||||
adapter, err := graph.CreateAdapter(
|
||||
credentials.AzureTenantID,
|
||||
credentials.AzureClientID,
|
||||
credentials.AzureClientSecret,
|
||||
)
|
||||
credentials.AzureClientSecret)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "creating microsoft graph service for exchange")
|
||||
}
|
||||
|
||||
@ -38,8 +38,7 @@ func (suite *BetaClientSuite) TestCreateBetaClient() {
|
||||
adpt, err := graph.CreateAdapter(
|
||||
suite.credentials.AzureTenantID,
|
||||
suite.credentials.AzureClientID,
|
||||
suite.credentials.AzureClientSecret,
|
||||
)
|
||||
suite.credentials.AzureClientSecret)
|
||||
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
|
||||
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
@ -167,7 +168,10 @@ func MinimumBackoff(dur time.Duration) option {
|
||||
// CreateAdapter uses provided credentials to log into M365 using Kiota Azure Library
|
||||
// with Azure identity package. An adapter object is a necessary to component
|
||||
// to create *msgraphsdk.GraphServiceClient
|
||||
func CreateAdapter(tenant, client, secret string, opts ...option) (*msgraphsdk.GraphRequestAdapter, error) {
|
||||
func CreateAdapter(
|
||||
tenant, client, secret string,
|
||||
opts ...option,
|
||||
) (*msgraphsdk.GraphRequestAdapter, error) {
|
||||
// Client Provider: Uses Secret for access to tenant-level data
|
||||
cred, err := azidentity.NewClientSecretCredential(tenant, client, secret, nil)
|
||||
if err != nil {
|
||||
@ -224,12 +228,15 @@ func GetMiddlewares(maxRetry int, delay time.Duration) []khttp.Middleware {
|
||||
khttp.NewParametersNameDecodingHandler(),
|
||||
khttp.NewUserAgentHandler(),
|
||||
&LoggingMiddleware{},
|
||||
&ThrottleControlMiddleware{},
|
||||
}
|
||||
}
|
||||
|
||||
// GetKiotaMiddlewares creates a default slice of middleware for the Graph Client.
|
||||
func GetKiotaMiddlewares(options *msgraphgocore.GraphClientOptions,
|
||||
maxRetry int, minDelay time.Duration,
|
||||
func GetKiotaMiddlewares(
|
||||
options *msgraphgocore.GraphClientOptions,
|
||||
maxRetry int,
|
||||
minDelay time.Duration,
|
||||
) []khttp.Middleware {
|
||||
kiotaMiddlewares := GetMiddlewares(maxRetry, minDelay)
|
||||
graphMiddlewares := []khttp.Middleware{
|
||||
@ -368,3 +375,43 @@ func (middleware RetryHandler) Intercept(
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
// We're trying to keep calls below the 10k-per-10-minute threshold.
|
||||
// 15 tokens every second nets 900 per minute. That's 9000 every 10 minutes,
|
||||
// which is a bit below the mark.
|
||||
// But suppose we have a minute-long dry spell followed by a 10 minute tsunami.
|
||||
// We'll have built up 900 tokens in reserve, so the first 900 calls go through
|
||||
// immediately. Over the next 10 minutes, we'll partition out the other calls
|
||||
// at a rate of 900-per-minute, ending at a total of 9900. Theoretically, if
|
||||
// the volume keeps up after that, we'll always stay between 9000 and 9900 out
|
||||
// of 10k.
|
||||
const (
|
||||
perSecond = 15
|
||||
maxCap = 900
|
||||
)
|
||||
|
||||
// Single, global rate limiter at this time. Refinements for method (creates,
|
||||
// versus reads) or service can come later.
|
||||
var limiter = rate.NewLimiter(perSecond, maxCap)
|
||||
|
||||
// QueueRequest will allow the request to occur immediately if we're under the
|
||||
// 1k-calls-per-minute rate. Otherwise, the call will wait in a queue until
|
||||
// the next token set is available.
|
||||
func QueueRequest(ctx context.Context) {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter")
|
||||
}
|
||||
}
|
||||
|
||||
// ThrottleControlMiddleware is used to ensure we don't overstep 10k-per-10-min
|
||||
// request limits.
|
||||
type ThrottleControlMiddleware struct{}
|
||||
|
||||
func (handler *ThrottleControlMiddleware) Intercept(
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
) (*http.Response, error) {
|
||||
QueueRequest(req.Context())
|
||||
return pipeline.Next(req, middlewareIndex)
|
||||
}
|
||||
|
||||
@ -105,8 +105,7 @@ func (gc *GraphConnector) createService() (*graph.Service, error) {
|
||||
adapter, err := graph.CreateAdapter(
|
||||
gc.credentials.AzureTenantID,
|
||||
gc.credentials.AzureClientID,
|
||||
gc.credentials.AzureClientSecret,
|
||||
)
|
||||
gc.credentials.AzureClientSecret)
|
||||
if err != nil {
|
||||
return &graph.Service{}, err
|
||||
}
|
||||
|
||||
@ -27,7 +27,10 @@ func (suite *OneDriveAPISuite) SetupSuite() {
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
suite.creds = m365
|
||||
adpt, err := graph.CreateAdapter(m365.AzureTenantID, m365.AzureClientID, m365.AzureClientSecret)
|
||||
adpt, err := graph.CreateAdapter(
|
||||
m365.AzureTenantID,
|
||||
m365.AzureClientID,
|
||||
m365.AzureClientSecret)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
suite.service = graph.NewService(adpt)
|
||||
|
||||
@ -43,8 +43,7 @@ func NewOneDriveService(credentials account.M365Config) (*oneDriveService, error
|
||||
adapter, err := graph.CreateAdapter(
|
||||
credentials.AzureTenantID,
|
||||
credentials.AzureClientID,
|
||||
credentials.AzureClientSecret,
|
||||
)
|
||||
credentials.AzureClientSecret)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -15,8 +15,7 @@ func createTestBetaService(t *testing.T, credentials account.M365Config) *discov
|
||||
adapter, err := graph.CreateAdapter(
|
||||
credentials.AzureTenantID,
|
||||
credentials.AzureClientID,
|
||||
credentials.AzureClientSecret,
|
||||
)
|
||||
credentials.AzureClientSecret)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
return discover.NewBetaService(adapter)
|
||||
|
||||
@ -236,7 +236,10 @@ func collectPages(
|
||||
|
||||
// make the betaClient
|
||||
// Need to receive From DataCollection Call
|
||||
adpt, err := graph.CreateAdapter(creds.AzureTenantID, creds.AzureClientID, creds.AzureClientSecret)
|
||||
adpt, err := graph.CreateAdapter(
|
||||
creds.AzureTenantID,
|
||||
creds.AzureClientID,
|
||||
creds.AzureClientSecret)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "creating azure client adapter")
|
||||
}
|
||||
|
||||
@ -51,8 +51,7 @@ func createTestService(t *testing.T, credentials account.M365Config) *graph.Serv
|
||||
adapter, err := graph.CreateAdapter(
|
||||
credentials.AzureTenantID,
|
||||
credentials.AzureClientID,
|
||||
credentials.AzureClientSecret,
|
||||
)
|
||||
credentials.AzureClientSecret)
|
||||
require.NoError(t, err, "creating microsoft graph service for exchange", clues.ToCore(err))
|
||||
|
||||
return graph.NewService(adapter)
|
||||
|
||||
@ -304,7 +304,10 @@ func RestorePageCollection(
|
||||
|
||||
defer end()
|
||||
|
||||
adpt, err := graph.CreateAdapter(creds.AzureTenantID, creds.AzureClientID, creds.AzureClientSecret)
|
||||
adpt, err := graph.CreateAdapter(
|
||||
creds.AzureTenantID,
|
||||
creds.AzureClientID,
|
||||
creds.AzureClientSecret)
|
||||
if err != nil {
|
||||
return metrics, clues.Wrap(err, "constructing graph client")
|
||||
}
|
||||
|
||||
@ -345,12 +345,18 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() {
|
||||
return bsel.Selector
|
||||
},
|
||||
cleanup: func(t *testing.T, dest string) {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
act := tester.NewM365Account(t)
|
||||
|
||||
m365, err := act.M365Config()
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
adpt, err := graph.CreateAdapter(m365.AzureTenantID, m365.AzureClientID, m365.AzureClientSecret)
|
||||
adpt, err := graph.CreateAdapter(
|
||||
m365.AzureTenantID,
|
||||
m365.AzureClientID,
|
||||
m365.AzureClientSecret)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
service := graph.NewService(adpt)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user