Move various limiter types behind a common interface (#4678)

Move the token bucket limiter (`rate.Limiter`) package behind a corso owned `Limiter` interface. This allows callers to request tokens without knowing what type of limiter they are waiting on.


---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abhishek Pandey 2023-11-15 16:13:34 -08:00 committed by GitHub
parent 4c72e9eab7
commit ff5d83c723
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 38 additions and 6 deletions

View File

@ -0,0 +1,32 @@
package limiters
import (
"context"
"golang.org/x/time/rate"
)
var _ Limiter = &tokenBucket{}
// Thin wrapper around the golang.org/x/time/rate token bucket rate limiter.
type tokenBucket struct {
*rate.Limiter
}
func NewTokenBucketLimiter(r int, burst int) Limiter {
return &tokenBucket{
Limiter: rate.NewLimiter(rate.Limit(r), burst),
}
}
func (tb *tokenBucket) Wait(ctx context.Context) error {
return tb.Limiter.Wait(ctx)
}
func (tb *tokenBucket) WaitN(ctx context.Context, n int) error {
return tb.Limiter.WaitN(ctx, n)
}
// Reset and shutdown are no-ops for the token bucket limiter.
func (tb *tokenBucket) Reset() {}
func (tb *tokenBucket) Shutdown() {}

View File

@ -9,8 +9,8 @@ import (
"github.com/alcionai/clues"
khttp "github.com/microsoft/kiota-http-go"
"golang.org/x/time/rate"
"github.com/alcionai/corso/src/internal/common/limiters"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -99,9 +99,9 @@ const (
)
var (
driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap)
driveLimiter = limiters.NewTokenBucketLimiter(drivePerSecond, driveMaxCap)
// also used as the exchange service limiter
defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap)
defaultLimiter = limiters.NewTokenBucketLimiter(defaultPerSecond, defaultMaxCap)
)
type LimiterCfg struct {
@ -116,7 +116,7 @@ func BindRateLimiterConfig(ctx context.Context, lc LimiterCfg) context.Context {
return context.WithValue(ctx, limiterCfgCtxKey, lc)
}
func ctxLimiter(ctx context.Context) *rate.Limiter {
func ctxLimiter(ctx context.Context) limiters.Limiter {
lc, ok := extractRateLimiterConfig(ctx)
if !ok {
return defaultLimiter

View File

@ -19,8 +19,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/time/rate"
"github.com/alcionai/corso/src/internal/common/limiters"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
@ -356,7 +356,7 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() {
name string
service path.ServiceType
expectOK require.BoolAssertionFunc
expectLimiter *rate.Limiter
expectLimiter limiters.Limiter
}{
{
name: "exchange",