diff --git a/src/internal/common/limiters/token_bucket.go b/src/internal/common/limiters/token_bucket.go new file mode 100644 index 000000000..a33bb4b11 --- /dev/null +++ b/src/internal/common/limiters/token_bucket.go @@ -0,0 +1,29 @@ +package limiters + +import ( + "context" + + "golang.org/x/time/rate" +) + +var _ Limiter = &TokenBucket{} + +// Wrapper around the golang.org/x/time/rate token bucket rate limiter. +type TokenBucket struct { + lim *rate.Limiter +} + +func NewTokenBucketLimiter(r int, burst int) Limiter { + lim := rate.NewLimiter(rate.Limit(r), burst) + return &TokenBucket{lim: lim} +} + +func (tb *TokenBucket) Wait(ctx context.Context) error { + return tb.lim.Wait(ctx) +} + +func (tb *TokenBucket) WaitN(ctx context.Context, n int) error { + return tb.lim.WaitN(ctx, n) +} + +func (tb *TokenBucket) Shutdown() {} diff --git a/src/internal/m365/controller.go b/src/internal/m365/controller.go index d5b8d629a..b38a5c8cd 100644 --- a/src/internal/m365/controller.go +++ b/src/internal/m365/controller.go @@ -8,6 +8,7 @@ import ( "github.com/alcionai/clues" "github.com/alcionai/corso/src/internal/common/idname" + "github.com/alcionai/corso/src/internal/common/limiters" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/resource" @@ -78,7 +79,24 @@ func NewController( return nil, clues.Wrap(err, "retrieving m365 account configuration").WithClues(ctx) } - ac, err := api.NewClient(creds, co, counter) + // Pick a rate limiter based on the service type + var lim limiters.Limiter + + switch pst { + case path.OneDriveService, path.SharePointService, path.GroupsService: + lim = limiters.NewTokenBucketLimiter(graph.DrivePerSecond, graph.DriveMaxCap) + default: + // TODO(pandeyabs): Change default to token bucket exch limits like it exists today. + lim, err = limiters.NewSlidingWindowLimiter( + graph.ExchangeTimeLimit, + graph.ExchangeSlideInterval, + graph.ExchangeTokenQuota) + if err != nil { + return nil, clues.Wrap(err, "creating sliding window limiter").WithClues(ctx) + } + } + + ac, err := api.NewClient(creds, co, counter, lim) if err != nil { return nil, clues.Wrap(err, "creating api client").WithClues(ctx) } diff --git a/src/internal/m365/graph/concurrency_middleware.go b/src/internal/m365/graph/concurrency_middleware.go index 60762ca32..1814fb447 100644 --- a/src/internal/m365/graph/concurrency_middleware.go +++ b/src/internal/m365/graph/concurrency_middleware.go @@ -11,6 +11,7 @@ import ( khttp "github.com/microsoft/kiota-http-go" "golang.org/x/time/rate" + "github.com/alcionai/corso/src/internal/common/limiters" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -88,18 +89,23 @@ const ( // but doing so risks timeouts. It's better to give the limits breathing room. defaultPerSecond = 16 // 16 * 60 * 10 = 9600 defaultMaxCap = 200 // real cap is 10k-per-10-minutes + + ExchangeTimeLimit = 10 * time.Minute + ExchangeTokenQuota = 9600 + ExchangeSlideInterval = 1 * time.Second + // since drive runs on a per-minute, rather than per-10-minute bucket, we have // to keep the max cap equal to the per-second cap. A large maxCap pool (say, // 1200, similar to the per-minute cap) would allow us to make a flood of 2400 // calls in the first minute, putting us over the per-minute limit. Keeping // the cap at the per-second burst means we only dole out a max of 1240 in one // minute (20 cap + 1200 per minute + one burst of padding). - drivePerSecond = 20 // 20 * 60 = 1200 - driveMaxCap = 20 // real cap is 1250-per-minute + DrivePerSecond = 20 // 20 * 60 = 1200 + DriveMaxCap = 20 // real cap is 1250-per-minute ) var ( - driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap) + driveLimiter = rate.NewLimiter(DrivePerSecond, DriveMaxCap) // also used as the exchange service limiter defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap) ) @@ -116,21 +122,6 @@ func BindRateLimiterConfig(ctx context.Context, lc LimiterCfg) context.Context { return context.WithValue(ctx, limiterCfgCtxKey, lc) } -func ctxLimiter(ctx context.Context) *rate.Limiter { - lc, ok := extractRateLimiterConfig(ctx) - if !ok { - return defaultLimiter - } - - switch lc.Service { - // FIXME: Handle based on category once we add chat backup - case path.OneDriveService, path.SharePointService, path.GroupsService: - return driveLimiter - default: - return defaultLimiter - } -} - func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) { l := ctx.Value(limiterCfgCtxKey) if l == nil { @@ -184,24 +175,25 @@ func ctxLimiterConsumption(ctx context.Context, defaultConsumption int) int { // QueueRequest will allow the request to occur immediately if we're under the // calls-per-minute rate. Otherwise, the call will wait in a queue until // the next token set is available. -func QueueRequest(ctx context.Context) { - limiter := ctxLimiter(ctx) +func QueueRequest(ctx context.Context, lim limiters.Limiter) { consume := ctxLimiterConsumption(ctx, defaultLC) - if err := limiter.WaitN(ctx, consume); err != nil { + if err := lim.WaitN(ctx, consume); err != nil { logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter") } } // RateLimiterMiddleware is used to ensure we don't overstep per-min request limits. -type RateLimiterMiddleware struct{} +type RateLimiterMiddleware struct { + lim limiters.Limiter +} func (mw *RateLimiterMiddleware) Intercept( pipeline khttp.Pipeline, middlewareIndex int, req *http.Request, ) (*http.Response, error) { - QueueRequest(req.Context()) + QueueRequest(req.Context(), mw.lim) return pipeline.Next(req, middlewareIndex) } diff --git a/src/internal/m365/graph/http_wrapper.go b/src/internal/m365/graph/http_wrapper.go index b03e8a3e9..0aef0e700 100644 --- a/src/internal/m365/graph/http_wrapper.go +++ b/src/internal/m365/graph/http_wrapper.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" "golang.org/x/net/http2" + "github.com/alcionai/corso/src/internal/common/limiters" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/version" "github.com/alcionai/corso/src/pkg/count" @@ -39,13 +40,14 @@ type Requester interface { // can utilize it on a per-download basis. func NewHTTPWrapper( counter *count.Bus, + lim limiters.Limiter, opts ...Option, ) *httpWrapper { var ( cc = populateConfig(opts...) rt = customTransport{ n: pipeline{ - middlewares: internalMiddleware(cc, counter), + middlewares: internalMiddleware(cc, counter, lim), transport: defaultTransport(), }, } @@ -72,10 +74,11 @@ func NewHTTPWrapper( // can utilize it on a per-download basis. func NewNoTimeoutHTTPWrapper( counter *count.Bus, + lim limiters.Limiter, opts ...Option, ) *httpWrapper { opts = append(opts, NoTimeout()) - return NewHTTPWrapper(counter, opts...) + return NewHTTPWrapper(counter, lim, opts...) } // --------------------------------------------------------------------------- @@ -188,6 +191,7 @@ func defaultTransport() http.RoundTripper { func internalMiddleware( cc *clientConfig, counter *count.Bus, + lim limiters.Limiter, ) []khttp.Middleware { throttler := &throttlingMiddleware{ tf: newTimedFence(), @@ -203,7 +207,9 @@ func internalMiddleware( khttp.NewRedirectHandler(), &LoggingMiddleware{}, throttler, - &RateLimiterMiddleware{}, + &RateLimiterMiddleware{ + lim: lim, + }, &MetricsMiddleware{ counter: counter, }, diff --git a/src/internal/m365/graph/service.go b/src/internal/m365/graph/service.go index 3a0b6ce38..f0119ab2e 100644 --- a/src/internal/m365/graph/service.go +++ b/src/internal/m365/graph/service.go @@ -16,6 +16,7 @@ import ( "github.com/alcionai/corso/src/internal/common/crash" "github.com/alcionai/corso/src/internal/common/idname" + "github.com/alcionai/corso/src/internal/common/limiters" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/filters" @@ -106,6 +107,7 @@ func (s Service) Serialize(object serialization.Parsable) ([]byte, error) { func CreateAdapter( tenant, client, secret string, counter *count.Bus, + lim limiters.Limiter, opts ...Option, ) (abstractions.RequestAdapter, error) { auth, err := GetAuth(tenant, client, secret) @@ -113,7 +115,7 @@ func CreateAdapter( return nil, err } - httpClient, cc := KiotaHTTPClient(counter, opts...) + httpClient, cc := KiotaHTTPClient(counter, lim, opts...) adpt, err := msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient( auth, @@ -152,12 +154,13 @@ func GetAuth(tenant string, client string, secret string) (*kauth.AzureIdentityA // can utilize it on a per-download basis. func KiotaHTTPClient( counter *count.Bus, + lim limiters.Limiter, opts ...Option, ) (*http.Client, *clientConfig) { var ( clientOptions = msgraphsdkgo.GetDefaultClientOptions() cc = populateConfig(opts...) - middlewares = kiotaMiddlewares(&clientOptions, cc, counter) + middlewares = kiotaMiddlewares(&clientOptions, cc, counter, lim) httpClient = msgraphgocore.GetDefaultClient(&clientOptions, middlewares...) ) @@ -277,6 +280,7 @@ func kiotaMiddlewares( options *msgraphgocore.GraphClientOptions, cc *clientConfig, counter *count.Bus, + lim limiters.Limiter, ) []khttp.Middleware { mw := []khttp.Middleware{ msgraphgocore.NewGraphTelemetryHandler(options), @@ -305,7 +309,9 @@ func kiotaMiddlewares( mw = append( mw, throttler, - &RateLimiterMiddleware{}, + &RateLimiterMiddleware{ + lim: lim, + }, &MetricsMiddleware{ counter: counter, }) diff --git a/src/pkg/services/m365/api/client.go b/src/pkg/services/m365/api/client.go index adcd08e05..8a40b886d 100644 --- a/src/pkg/services/m365/api/client.go +++ b/src/pkg/services/m365/api/client.go @@ -7,6 +7,7 @@ import ( "github.com/alcionai/clues" + "github.com/alcionai/corso/src/internal/common/limiters" "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" @@ -43,6 +44,9 @@ type Client struct { counter *count.Bus options control.Options + + // rate limiter + lim limiters.Limiter } // NewClient produces a new exchange api client. Must be used in @@ -51,18 +55,19 @@ func NewClient( creds account.M365Config, co control.Options, counter *count.Bus, + lim limiters.Limiter, ) (Client, error) { - s, err := NewService(creds, counter) + s, err := NewService(creds, counter, lim) if err != nil { return Client{}, err } - li, err := newLargeItemService(creds, counter) + li, err := newLargeItemService(creds, counter, lim) if err != nil { return Client{}, err } - rqr := graph.NewNoTimeoutHTTPWrapper(counter) + rqr := graph.NewNoTimeoutHTTPWrapper(counter, lim) if co.DeltaPageSize < 1 || co.DeltaPageSize > maxDeltaPageSize { co.DeltaPageSize = maxDeltaPageSize @@ -75,6 +80,7 @@ func NewClient( Requester: rqr, counter: counter, options: co, + lim: lim, } return cli, nil @@ -100,6 +106,7 @@ func (c Client) Service(counter *count.Bus) (graph.Servicer, error) { func NewService( creds account.M365Config, counter *count.Bus, + lim limiters.Limiter, opts ...graph.Option, ) (*graph.Service, error) { a, err := graph.CreateAdapter( @@ -107,6 +114,7 @@ func NewService( creds.AzureClientID, creds.AzureClientSecret, counter, + lim, opts...) if err != nil { return nil, clues.Wrap(err, "generating graph api adapter") @@ -118,8 +126,9 @@ func NewService( func newLargeItemService( creds account.M365Config, counter *count.Bus, + lim limiters.Limiter, ) (*graph.Service, error) { - a, err := NewService(creds, counter, graph.NoTimeout()) + a, err := NewService(creds, counter, lim, graph.NoTimeout()) if err != nil { return nil, clues.Wrap(err, "generating no-timeout graph adapter") }