Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1cf521d3bb | ||
|
|
eb7f63e1a3 |
@ -24,6 +24,7 @@ require (
|
||||
github.com/microsoftgraph/msgraph-sdk-go-core v1.0.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/puzpuzpuz/xsync/v2 v2.5.1
|
||||
github.com/reugn/equalizer v0.0.0-20210216135016-a959c509d7ad
|
||||
github.com/rudderlabs/analytics-go v3.3.3+incompatible
|
||||
github.com/spatialcurrent/go-lazy v0.0.0-20211115014721-47315cc003d1
|
||||
github.com/spf13/cast v1.5.1
|
||||
|
||||
@ -369,6 +369,8 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa
|
||||
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
|
||||
github.com/puzpuzpuz/xsync/v2 v2.5.1 h1:mVGYAvzDSu52+zaGyNjC+24Xw2bQi3kTr4QJ6N9pIIU=
|
||||
github.com/puzpuzpuz/xsync/v2 v2.5.1/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
|
||||
github.com/reugn/equalizer v0.0.0-20210216135016-a959c509d7ad h1:WtSUHi5zthjudjIi3L6QmL/V9vpJPbc/j/F2u55d3fs=
|
||||
github.com/reugn/equalizer v0.0.0-20210216135016-a959c509d7ad/go.mod h1:h0+DiDRe2Y+6iHTjIq/9HzUq7NII/Nffp0HkFrsAKq4=
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
|
||||
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/count"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/reugn/equalizer"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@ -102,6 +103,14 @@ var (
|
||||
driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap)
|
||||
// also used as the exchange service limiter
|
||||
defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap)
|
||||
|
||||
// slideLimiter, _ = sw.NewLimiter(10*time.Minute, 10000, func() (sw.Window, sw.StopFunc) {
|
||||
// // NewLocalWindow returns an empty stop function, so it's
|
||||
// // unnecessary to call it later.
|
||||
// return sw.NewLocalWindow()
|
||||
// })
|
||||
|
||||
slider = equalizer.NewSlider(10*time.Minute, time.Minute, 10000)
|
||||
)
|
||||
|
||||
type LimiterCfg struct {
|
||||
@ -185,12 +194,32 @@ func ctxLimiterConsumption(ctx context.Context, defaultConsumption int) int {
|
||||
// calls-per-minute rate. Otherwise, the call will wait in a queue until
|
||||
// the next token set is available.
|
||||
func QueueRequest(ctx context.Context) {
|
||||
limiter := ctxLimiter(ctx)
|
||||
consume := ctxLimiterConsumption(ctx, defaultLC)
|
||||
// limiter := ctxLimiter(ctx)
|
||||
// consume := ctxLimiterConsumption(ctx, defaultLC)
|
||||
|
||||
if err := limiter.WaitN(ctx, consume); err != nil {
|
||||
logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter")
|
||||
}
|
||||
// if err := limiter.WaitN(ctx, consume); err != nil {
|
||||
// logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter")
|
||||
// }
|
||||
|
||||
// Add a do while loop to wait for the limiter to allow the request
|
||||
// to go through. This is necessary because the limiter.WaitN() call
|
||||
// will return immediately if the limiter is already full, and we
|
||||
// need to wait for the limiter to allow the request to go through.
|
||||
|
||||
// for i := 0; i < 10; i++ {
|
||||
// a := slideLimiter.Allow()
|
||||
// if a {
|
||||
// // Tokens are available now; exit the loop.
|
||||
// break
|
||||
// }
|
||||
|
||||
// logger.Ctx(ctx).Error("retrying - waiting for limiter tokens")
|
||||
|
||||
// // Sleep for a second before the next retry.
|
||||
// time.Sleep(time.Second)
|
||||
// }
|
||||
|
||||
slider.Take()
|
||||
}
|
||||
|
||||
// RateLimiterMiddleware is used to ensure we don't overstep per-min request limits.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user