graph limiter movement

This commit is contained in:
Abhishek Pandey 2023-11-09 19:29:07 -08:00
parent f10a7bb7b7
commit 196f73b082
6 changed files with 94 additions and 34 deletions

View File

@ -0,0 +1,29 @@
package limiters
import (
"context"
"golang.org/x/time/rate"
)
var _ Limiter = &TokenBucket{}
// Wrapper around the golang.org/x/time/rate token bucket rate limiter.
type TokenBucket struct {
lim *rate.Limiter
}
func NewTokenBucketLimiter(r int, burst int) Limiter {
lim := rate.NewLimiter(rate.Limit(r), burst)
return &TokenBucket{lim: lim}
}
func (tb *TokenBucket) Wait(ctx context.Context) error {
return tb.lim.Wait(ctx)
}
func (tb *TokenBucket) WaitN(ctx context.Context, n int) error {
return tb.lim.WaitN(ctx, n)
}
func (tb *TokenBucket) Shutdown() {}

View File

@ -8,6 +8,7 @@ import (
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/common/limiters"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/resource"
@ -78,7 +79,24 @@ func NewController(
return nil, clues.Wrap(err, "retrieving m365 account configuration").WithClues(ctx)
}
ac, err := api.NewClient(creds, co, counter)
// Pick a rate limiter based on the service type
var lim limiters.Limiter
switch pst {
case path.OneDriveService, path.SharePointService, path.GroupsService:
lim = limiters.NewTokenBucketLimiter(graph.DrivePerSecond, graph.DriveMaxCap)
default:
// TODO(pandeyabs): Change default to token bucket exch limits like it exists today.
lim, err = limiters.NewSlidingWindowLimiter(
graph.ExchangeTimeLimit,
graph.ExchangeSlideInterval,
graph.ExchangeTokenQuota)
if err != nil {
return nil, clues.Wrap(err, "creating sliding window limiter").WithClues(ctx)
}
}
ac, err := api.NewClient(creds, co, counter, lim)
if err != nil {
return nil, clues.Wrap(err, "creating api client").WithClues(ctx)
}

View File

@ -11,6 +11,7 @@ import (
khttp "github.com/microsoft/kiota-http-go"
"golang.org/x/time/rate"
"github.com/alcionai/corso/src/internal/common/limiters"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -88,18 +89,23 @@ const (
// but doing so risks timeouts. It's better to give the limits breathing room.
defaultPerSecond = 16 // 16 * 60 * 10 = 9600
defaultMaxCap = 200 // real cap is 10k-per-10-minutes
ExchangeTimeLimit = 10 * time.Minute
ExchangeTokenQuota = 9600
ExchangeSlideInterval = 1 * time.Second
// since drive runs on a per-minute, rather than per-10-minute bucket, we have
// to keep the max cap equal to the per-second cap. A large maxCap pool (say,
// 1200, similar to the per-minute cap) would allow us to make a flood of 2400
// calls in the first minute, putting us over the per-minute limit. Keeping
// the cap at the per-second burst means we only dole out a max of 1240 in one
// minute (20 cap + 1200 per minute + one burst of padding).
drivePerSecond = 20 // 20 * 60 = 1200
driveMaxCap = 20 // real cap is 1250-per-minute
DrivePerSecond = 20 // 20 * 60 = 1200
DriveMaxCap = 20 // real cap is 1250-per-minute
)
var (
driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap)
driveLimiter = rate.NewLimiter(DrivePerSecond, DriveMaxCap)
// also used as the exchange service limiter
defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap)
)
@ -116,21 +122,6 @@ func BindRateLimiterConfig(ctx context.Context, lc LimiterCfg) context.Context {
return context.WithValue(ctx, limiterCfgCtxKey, lc)
}
func ctxLimiter(ctx context.Context) *rate.Limiter {
lc, ok := extractRateLimiterConfig(ctx)
if !ok {
return defaultLimiter
}
switch lc.Service {
// FIXME: Handle based on category once we add chat backup
case path.OneDriveService, path.SharePointService, path.GroupsService:
return driveLimiter
default:
return defaultLimiter
}
}
func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) {
l := ctx.Value(limiterCfgCtxKey)
if l == nil {
@ -184,24 +175,25 @@ func ctxLimiterConsumption(ctx context.Context, defaultConsumption int) int {
// QueueRequest will allow the request to occur immediately if we're under the
// calls-per-minute rate. Otherwise, the call will wait in a queue until
// the next token set is available.
func QueueRequest(ctx context.Context) {
limiter := ctxLimiter(ctx)
func QueueRequest(ctx context.Context, lim limiters.Limiter) {
consume := ctxLimiterConsumption(ctx, defaultLC)
if err := limiter.WaitN(ctx, consume); err != nil {
if err := lim.WaitN(ctx, consume); err != nil {
logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter")
}
}
// RateLimiterMiddleware is used to ensure we don't overstep per-min request limits.
type RateLimiterMiddleware struct{}
type RateLimiterMiddleware struct {
lim limiters.Limiter
}
func (mw *RateLimiterMiddleware) Intercept(
pipeline khttp.Pipeline,
middlewareIndex int,
req *http.Request,
) (*http.Response, error) {
QueueRequest(req.Context())
QueueRequest(req.Context(), mw.lim)
return pipeline.Next(req, middlewareIndex)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/net/http2"
"github.com/alcionai/corso/src/internal/common/limiters"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/count"
@ -39,13 +40,14 @@ type Requester interface {
// can utilize it on a per-download basis.
func NewHTTPWrapper(
counter *count.Bus,
lim limiters.Limiter,
opts ...Option,
) *httpWrapper {
var (
cc = populateConfig(opts...)
rt = customTransport{
n: pipeline{
middlewares: internalMiddleware(cc, counter),
middlewares: internalMiddleware(cc, counter, lim),
transport: defaultTransport(),
},
}
@ -72,10 +74,11 @@ func NewHTTPWrapper(
// can utilize it on a per-download basis.
func NewNoTimeoutHTTPWrapper(
counter *count.Bus,
lim limiters.Limiter,
opts ...Option,
) *httpWrapper {
opts = append(opts, NoTimeout())
return NewHTTPWrapper(counter, opts...)
return NewHTTPWrapper(counter, lim, opts...)
}
// ---------------------------------------------------------------------------
@ -188,6 +191,7 @@ func defaultTransport() http.RoundTripper {
func internalMiddleware(
cc *clientConfig,
counter *count.Bus,
lim limiters.Limiter,
) []khttp.Middleware {
throttler := &throttlingMiddleware{
tf: newTimedFence(),
@ -203,7 +207,9 @@ func internalMiddleware(
khttp.NewRedirectHandler(),
&LoggingMiddleware{},
throttler,
&RateLimiterMiddleware{},
&RateLimiterMiddleware{
lim: lim,
},
&MetricsMiddleware{
counter: counter,
},

View File

@ -16,6 +16,7 @@ import (
"github.com/alcionai/corso/src/internal/common/crash"
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/common/limiters"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/filters"
@ -106,6 +107,7 @@ func (s Service) Serialize(object serialization.Parsable) ([]byte, error) {
func CreateAdapter(
tenant, client, secret string,
counter *count.Bus,
lim limiters.Limiter,
opts ...Option,
) (abstractions.RequestAdapter, error) {
auth, err := GetAuth(tenant, client, secret)
@ -113,7 +115,7 @@ func CreateAdapter(
return nil, err
}
httpClient, cc := KiotaHTTPClient(counter, opts...)
httpClient, cc := KiotaHTTPClient(counter, lim, opts...)
adpt, err := msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient(
auth,
@ -152,12 +154,13 @@ func GetAuth(tenant string, client string, secret string) (*kauth.AzureIdentityA
// can utilize it on a per-download basis.
func KiotaHTTPClient(
counter *count.Bus,
lim limiters.Limiter,
opts ...Option,
) (*http.Client, *clientConfig) {
var (
clientOptions = msgraphsdkgo.GetDefaultClientOptions()
cc = populateConfig(opts...)
middlewares = kiotaMiddlewares(&clientOptions, cc, counter)
middlewares = kiotaMiddlewares(&clientOptions, cc, counter, lim)
httpClient = msgraphgocore.GetDefaultClient(&clientOptions, middlewares...)
)
@ -277,6 +280,7 @@ func kiotaMiddlewares(
options *msgraphgocore.GraphClientOptions,
cc *clientConfig,
counter *count.Bus,
lim limiters.Limiter,
) []khttp.Middleware {
mw := []khttp.Middleware{
msgraphgocore.NewGraphTelemetryHandler(options),
@ -305,7 +309,9 @@ func kiotaMiddlewares(
mw = append(
mw,
throttler,
&RateLimiterMiddleware{},
&RateLimiterMiddleware{
lim: lim,
},
&MetricsMiddleware{
counter: counter,
})

View File

@ -7,6 +7,7 @@ import (
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/common/limiters"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
@ -43,6 +44,9 @@ type Client struct {
counter *count.Bus
options control.Options
// rate limiter
lim limiters.Limiter
}
// NewClient produces a new exchange api client. Must be used in
@ -51,18 +55,19 @@ func NewClient(
creds account.M365Config,
co control.Options,
counter *count.Bus,
lim limiters.Limiter,
) (Client, error) {
s, err := NewService(creds, counter)
s, err := NewService(creds, counter, lim)
if err != nil {
return Client{}, err
}
li, err := newLargeItemService(creds, counter)
li, err := newLargeItemService(creds, counter, lim)
if err != nil {
return Client{}, err
}
rqr := graph.NewNoTimeoutHTTPWrapper(counter)
rqr := graph.NewNoTimeoutHTTPWrapper(counter, lim)
if co.DeltaPageSize < 1 || co.DeltaPageSize > maxDeltaPageSize {
co.DeltaPageSize = maxDeltaPageSize
@ -75,6 +80,7 @@ func NewClient(
Requester: rqr,
counter: counter,
options: co,
lim: lim,
}
return cli, nil
@ -100,6 +106,7 @@ func (c Client) Service(counter *count.Bus) (graph.Servicer, error) {
func NewService(
creds account.M365Config,
counter *count.Bus,
lim limiters.Limiter,
opts ...graph.Option,
) (*graph.Service, error) {
a, err := graph.CreateAdapter(
@ -107,6 +114,7 @@ func NewService(
creds.AzureClientID,
creds.AzureClientSecret,
counter,
lim,
opts...)
if err != nil {
return nil, clues.Wrap(err, "generating graph api adapter")
@ -118,8 +126,9 @@ func NewService(
func newLargeItemService(
creds account.M365Config,
counter *count.Bus,
lim limiters.Limiter,
) (*graph.Service, error) {
a, err := NewService(creds, counter, graph.NoTimeout())
a, err := NewService(creds, counter, lim, graph.NoTimeout())
if err != nil {
return nil, clues.Wrap(err, "generating no-timeout graph adapter")
}