Add sliding window rate limiter
This commit is contained in:
parent
8b612c43d9
commit
eb7f63e1a3
@ -42,6 +42,7 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b // indirect
|
||||
github.com/VividCortex/ewma v1.2.0 // indirect
|
||||
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
|
||||
github.com/andybalholm/brotli v1.0.5 // indirect
|
||||
|
||||
@ -51,6 +51,8 @@ github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q
|
||||
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
|
||||
github.com/GehirnInc/crypt v0.0.0-20230320061759-8cc1b52080c5 h1:IEjq88XO4PuBDcvmjQJcQGg+w+UaafSy8G5Kcb5tBhI=
|
||||
github.com/GehirnInc/crypt v0.0.0-20230320061759-8cc1b52080c5/go.mod h1:exZ0C/1emQJAw5tHOaUDyY1ycttqBAPcxuzf7QbY6ec=
|
||||
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b h1:5/++qT1/z812ZqBvqQt6ToRswSuPZ/B33m6xVHRzADU=
|
||||
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4=
|
||||
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
|
||||
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
|
||||
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
khttp "github.com/microsoft/kiota-http-go"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
sw "github.com/RussellLuo/slidingwindow"
|
||||
"github.com/alcionai/corso/src/pkg/count"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
@ -102,6 +103,12 @@ var (
|
||||
driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap)
|
||||
// also used as the exchange service limiter
|
||||
defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap)
|
||||
|
||||
slideLimiter, _ = sw.NewLimiter(10*time.Minute, 10000, func() (sw.Window, sw.StopFunc) {
|
||||
// NewLocalWindow returns an empty stop function, so it's
|
||||
// unnecessary to call it later.
|
||||
return sw.NewLocalWindow()
|
||||
})
|
||||
)
|
||||
|
||||
type LimiterCfg struct {
|
||||
@ -185,11 +192,29 @@ func ctxLimiterConsumption(ctx context.Context, defaultConsumption int) int {
|
||||
// calls-per-minute rate. Otherwise, the call will wait in a queue until
|
||||
// the next token set is available.
|
||||
func QueueRequest(ctx context.Context) {
|
||||
limiter := ctxLimiter(ctx)
|
||||
consume := ctxLimiterConsumption(ctx, defaultLC)
|
||||
// limiter := ctxLimiter(ctx)
|
||||
// consume := ctxLimiterConsumption(ctx, defaultLC)
|
||||
|
||||
if err := limiter.WaitN(ctx, consume); err != nil {
|
||||
logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter")
|
||||
// if err := limiter.WaitN(ctx, consume); err != nil {
|
||||
// logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter")
|
||||
// }
|
||||
|
||||
// Add a do while loop to wait for the limiter to allow the request
|
||||
// to go through. This is necessary because the limiter.WaitN() call
|
||||
// will return immediately if the limiter is already full, and we
|
||||
// need to wait for the limiter to allow the request to go through.
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
a := slideLimiter.Allow()
|
||||
if a {
|
||||
// Tokens are available now; exit the loop.
|
||||
break
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Error("retrying - waiting for limiter tokens")
|
||||
|
||||
// Sleep for a second before the next retry.
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user