From 1cf521d3bb17a1d8348ce891962e82ff3d8309c1 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Tue, 31 Oct 2023 12:04:28 -0700 Subject: [PATCH] Add equalizer pkg --- src/go.mod | 2 +- src/go.sum | 4 +-- .../m365/graph/concurrency_middleware.go | 36 ++++++++++--------- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/src/go.mod b/src/go.mod index dc3d38859..6cf682abc 100644 --- a/src/go.mod +++ b/src/go.mod @@ -24,6 +24,7 @@ require ( github.com/microsoftgraph/msgraph-sdk-go-core v1.0.0 github.com/pkg/errors v0.9.1 github.com/puzpuzpuz/xsync/v2 v2.5.1 + github.com/reugn/equalizer v0.0.0-20210216135016-a959c509d7ad github.com/rudderlabs/analytics-go v3.3.3+incompatible github.com/spatialcurrent/go-lazy v0.0.0-20211115014721-47315cc003d1 github.com/spf13/cast v1.5.1 @@ -42,7 +43,6 @@ require ( ) require ( - github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b // indirect github.com/VividCortex/ewma v1.2.0 // indirect github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect github.com/andybalholm/brotli v1.0.5 // indirect diff --git a/src/go.sum b/src/go.sum index 1f68d1dd3..457434c02 100644 --- a/src/go.sum +++ b/src/go.sum @@ -51,8 +51,6 @@ github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/GehirnInc/crypt v0.0.0-20230320061759-8cc1b52080c5 h1:IEjq88XO4PuBDcvmjQJcQGg+w+UaafSy8G5Kcb5tBhI= github.com/GehirnInc/crypt v0.0.0-20230320061759-8cc1b52080c5/go.mod h1:exZ0C/1emQJAw5tHOaUDyY1ycttqBAPcxuzf7QbY6ec= -github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b h1:5/++qT1/z812ZqBvqQt6ToRswSuPZ/B33m6xVHRzADU= -github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b/go.mod h1:4+EPqMRApwwE/6yo6CxiHoSnBzjRr3jsqer7frxP8y4= github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= @@ -371,6 +369,8 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= github.com/puzpuzpuz/xsync/v2 v2.5.1 h1:mVGYAvzDSu52+zaGyNjC+24Xw2bQi3kTr4QJ6N9pIIU= github.com/puzpuzpuz/xsync/v2 v2.5.1/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU= +github.com/reugn/equalizer v0.0.0-20210216135016-a959c509d7ad h1:WtSUHi5zthjudjIi3L6QmL/V9vpJPbc/j/F2u55d3fs= +github.com/reugn/equalizer v0.0.0-20210216135016-a959c509d7ad/go.mod h1:h0+DiDRe2Y+6iHTjIq/9HzUq7NII/Nffp0HkFrsAKq4= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= diff --git a/src/internal/m365/graph/concurrency_middleware.go b/src/internal/m365/graph/concurrency_middleware.go index 77e3ff1ff..b8d4bfb9d 100644 --- a/src/internal/m365/graph/concurrency_middleware.go +++ b/src/internal/m365/graph/concurrency_middleware.go @@ -11,10 +11,10 @@ import ( khttp "github.com/microsoft/kiota-http-go" "golang.org/x/time/rate" - sw "github.com/RussellLuo/slidingwindow" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" + "github.com/reugn/equalizer" ) // --------------------------------------------------------------------------- @@ -104,11 +104,13 @@ var ( // also used as the exchange service limiter defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap) - slideLimiter, _ = sw.NewLimiter(10*time.Minute, 10000, func() (sw.Window, sw.StopFunc) { - // NewLocalWindow returns an empty stop function, so it's - // unnecessary to call it later. - return sw.NewLocalWindow() - }) + // slideLimiter, _ = sw.NewLimiter(10*time.Minute, 10000, func() (sw.Window, sw.StopFunc) { + // // NewLocalWindow returns an empty stop function, so it's + // // unnecessary to call it later. + // return sw.NewLocalWindow() + // }) + + slider = equalizer.NewSlider(10*time.Minute, time.Minute, 10000) ) type LimiterCfg struct { @@ -204,18 +206,20 @@ func QueueRequest(ctx context.Context) { // will return immediately if the limiter is already full, and we // need to wait for the limiter to allow the request to go through. - for i := 0; i < 10; i++ { - a := slideLimiter.Allow() - if a { - // Tokens are available now; exit the loop. - break - } + // for i := 0; i < 10; i++ { + // a := slideLimiter.Allow() + // if a { + // // Tokens are available now; exit the loop. + // break + // } - logger.Ctx(ctx).Error("retrying - waiting for limiter tokens") + // logger.Ctx(ctx).Error("retrying - waiting for limiter tokens") - // Sleep for a second before the next retry. - time.Sleep(time.Second) - } + // // Sleep for a second before the next retry. + // time.Sleep(time.Second) + // } + + slider.Take() } // RateLimiterMiddleware is used to ensure we don't overstep per-min request limits.