From eb7f63e1a3f5282e7960bcc1a1d45a0b69c381f2 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Fri, 27 Oct 2023 18:55:22 -0700 Subject: [PATCH] Add sliding window rate limiter --- src/go.mod | 1 + src/go.sum | 2 ++ .../m365/graph/concurrency_middleware.go | 33 ++++++++++++++++--- 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/src/go.mod b/src/go.mod index 324dbf9b5..dc3d38859 100644 --- a/src/go.mod +++ b/src/go.mod @@ -42,6 +42,7 @@ require ( ) require ( + github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b // indirect github.com/VividCortex/ewma v1.2.0 // indirect github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect github.com/andybalholm/brotli v1.0.5 // indirect diff --git a/src/go.sum b/src/go.sum index 1bd2296d1..1f68d1dd3 100644 --- a/src/go.sum +++ b/src/go.sum @@ -51,6 +51,8 @@ github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/GehirnInc/crypt v0.0.0-20230320061759-8cc1b52080c5 h1:IEjq88XO4PuBDcvmjQJcQGg+w+UaafSy8G5Kcb5tBhI= github.com/GehirnInc/crypt v0.0.0-20230320061759-8cc1b52080c5/go.mod h1:exZ0C/1emQJAw5tHOaUDyY1ycttqBAPcxuzf7QbY6ec= +github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b h1:5/++qT1/z812ZqBvqQt6ToRswSuPZ/B33m6xVHRzADU= +github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= diff --git a/src/internal/m365/graph/concurrency_middleware.go b/src/internal/m365/graph/concurrency_middleware.go index c470fc1cf..77e3ff1ff 100644 --- a/src/internal/m365/graph/concurrency_middleware.go +++ b/src/internal/m365/graph/concurrency_middleware.go @@ -11,6 +11,7 @@ import ( khttp "github.com/microsoft/kiota-http-go" "golang.org/x/time/rate" + sw "github.com/RussellLuo/slidingwindow" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -102,6 +103,12 @@ var ( driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap) // also used as the exchange service limiter defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap) + + slideLimiter, _ = sw.NewLimiter(10*time.Minute, 10000, func() (sw.Window, sw.StopFunc) { + // NewLocalWindow returns an empty stop function, so it's + // unnecessary to call it later. + return sw.NewLocalWindow() + }) ) type LimiterCfg struct { @@ -185,11 +192,29 @@ func ctxLimiterConsumption(ctx context.Context, defaultConsumption int) int { // calls-per-minute rate. Otherwise, the call will wait in a queue until // the next token set is available. func QueueRequest(ctx context.Context) { - limiter := ctxLimiter(ctx) - consume := ctxLimiterConsumption(ctx, defaultLC) + // limiter := ctxLimiter(ctx) + // consume := ctxLimiterConsumption(ctx, defaultLC) - if err := limiter.WaitN(ctx, consume); err != nil { - logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter") + // if err := limiter.WaitN(ctx, consume); err != nil { + // logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter") + // } + + // Add a do while loop to wait for the limiter to allow the request + // to go through. This is necessary because the limiter.WaitN() call + // will return immediately if the limiter is already full, and we + // need to wait for the limiter to allow the request to go through. + + for i := 0; i < 10; i++ { + a := slideLimiter.Allow() + if a { + // Tokens are available now; exit the loop. + break + } + + logger.Ctx(ctx).Error("retrying - waiting for limiter tokens") + + // Sleep for a second before the next retry. + time.Sleep(time.Second) } }