From ff5d83c723b70dc6de220f54744dc1628171d138 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Wed, 15 Nov 2023 16:13:34 -0800 Subject: [PATCH] Move various limiter types behind a common interface (#4678) Move the token bucket limiter (`rate.Limiter`) package behind a corso owned `Limiter` interface. This allows callers to request tokens without knowing what type of limiter they are waiting on. --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [ ] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [x] :broom: Tech Debt/Cleanup #### Issue(s) * # #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- src/internal/common/limiters/token_bucket.go | 32 +++++++++++++++++++ .../m365/api/graph/concurrency_middleware.go | 8 ++--- .../m365/api/graph/middleware_test.go | 4 +-- 3 files changed, 38 insertions(+), 6 deletions(-) create mode 100644 src/internal/common/limiters/token_bucket.go diff --git a/src/internal/common/limiters/token_bucket.go b/src/internal/common/limiters/token_bucket.go new file mode 100644 index 000000000..91d573843 --- /dev/null +++ b/src/internal/common/limiters/token_bucket.go @@ -0,0 +1,32 @@ +package limiters + +import ( + "context" + + "golang.org/x/time/rate" +) + +var _ Limiter = &tokenBucket{} + +// Thin wrapper around the golang.org/x/time/rate token bucket rate limiter. +type tokenBucket struct { + *rate.Limiter +} + +func NewTokenBucketLimiter(r int, burst int) Limiter { + return &tokenBucket{ + Limiter: rate.NewLimiter(rate.Limit(r), burst), + } +} + +func (tb *tokenBucket) Wait(ctx context.Context) error { + return tb.Limiter.Wait(ctx) +} + +func (tb *tokenBucket) WaitN(ctx context.Context, n int) error { + return tb.Limiter.WaitN(ctx, n) +} + +// Reset and shutdown are no-ops for the token bucket limiter. +func (tb *tokenBucket) Reset() {} +func (tb *tokenBucket) Shutdown() {} diff --git a/src/pkg/services/m365/api/graph/concurrency_middleware.go b/src/pkg/services/m365/api/graph/concurrency_middleware.go index 60762ca32..0f694ca01 100644 --- a/src/pkg/services/m365/api/graph/concurrency_middleware.go +++ b/src/pkg/services/m365/api/graph/concurrency_middleware.go @@ -9,8 +9,8 @@ import ( "github.com/alcionai/clues" khttp "github.com/microsoft/kiota-http-go" - "golang.org/x/time/rate" + "github.com/alcionai/corso/src/internal/common/limiters" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -99,9 +99,9 @@ const ( ) var ( - driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap) + driveLimiter = limiters.NewTokenBucketLimiter(drivePerSecond, driveMaxCap) // also used as the exchange service limiter - defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap) + defaultLimiter = limiters.NewTokenBucketLimiter(defaultPerSecond, defaultMaxCap) ) type LimiterCfg struct { @@ -116,7 +116,7 @@ func BindRateLimiterConfig(ctx context.Context, lc LimiterCfg) context.Context { return context.WithValue(ctx, limiterCfgCtxKey, lc) } -func ctxLimiter(ctx context.Context) *rate.Limiter { +func ctxLimiter(ctx context.Context) limiters.Limiter { lc, ok := extractRateLimiterConfig(ctx) if !ok { return defaultLimiter diff --git a/src/pkg/services/m365/api/graph/middleware_test.go b/src/pkg/services/m365/api/graph/middleware_test.go index ddaf13cd7..9bd2255cb 100644 --- a/src/pkg/services/m365/api/graph/middleware_test.go +++ b/src/pkg/services/m365/api/graph/middleware_test.go @@ -19,8 +19,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "golang.org/x/time/rate" + "github.com/alcionai/corso/src/internal/common/limiters" "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" @@ -356,7 +356,7 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() { name string service path.ServiceType expectOK require.BoolAssertionFunc - expectLimiter *rate.Limiter + expectLimiter limiters.Limiter }{ { name: "exchange",