diff --git a/src/internal/common/limiters/sliding_window.go b/src/internal/common/limiters/sliding_window.go new file mode 100644 index 000000000..4a29f5fe0 --- /dev/null +++ b/src/internal/common/limiters/sliding_window.go @@ -0,0 +1,130 @@ +package limiters + +import ( + "context" + "sync" + "time" +) + +type ( + token struct{} + Limiter interface { + Wait(ctx context.Context) error + } +) + +// TODO: Expose interfaces for limiter and window +type window struct { + // TODO: See if we need to store start time. Without it there is no way + // to tell if the ticker is lagging behind ( due to contention from consumers or otherwise). + // Although with our use cases, at max we'd have 10k requests contending with the ticker which + // should be easily doable in fraction of 1 sec. Although we should benchmark this. + // start time.Time + count []int64 +} + +var _ Limiter = &slidingWindow{} + +type slidingWindow struct { + w time.Duration + slidingInterval time.Duration + capacity int64 + currentInterval int64 + numIntervals int64 + permits chan token + mu sync.Mutex + curr window + prev window +} + +// slidingInterval controls degree of movement of the sliding window from left to right +// Smaller slidingInterval means more frequent movement of the sliding window. +// TODO: Introduce an option to control token refresh frequency. Otherwise, if the sliding interval is +// large, it may slow down the token refresh rate. Not implementing this for simplicity, since for our +// use cases we are going to have a sliding interval of 1 sec which is good enough. +func NewLimiter(w time.Duration, slidingInterval time.Duration, capacity int64) Limiter { + ni := int64(w / slidingInterval) + + sw := &slidingWindow{ + w: w, + slidingInterval: slidingInterval, + capacity: capacity, + permits: make(chan token, capacity), + numIntervals: ni, + prev: window{ + count: make([]int64, ni), + }, + curr: window{ + count: make([]int64, ni), + }, + currentInterval: -1, + } + + // Initialize + sw.nextInterval() + + // Move the sliding window forward every slidingInterval + // TODO: fix leaking goroutine + go sw.run() + + // Prefill permits + for i := int64(0); i < capacity; i++ { + sw.permits <- token{} + } + + return sw +} + +// TODO: Implement stopping the ticker +func (s *slidingWindow) run() { + ticker := time.NewTicker(s.slidingInterval) + + for range ticker.C { + s.slide() + } +} + +func (s *slidingWindow) slide() { + // Slide into the next interval + s.nextInterval() + + // Remove permits from the previous window + for i := int64(0); i < s.prev.count[s.currentInterval]; i++ { + select { + case s.permits <- token{}: + default: + // Skip if permits are at capacity + return + } + } +} + +// next increments the current interval and resets the current window if needed +func (s *slidingWindow) nextInterval() { + s.mu.Lock() + // Increment current interval + s.currentInterval = (s.currentInterval + 1) % s.numIntervals + + // If it's the first interval, move curr window to prev window and reset curr window. + if s.currentInterval == 0 { + s.prev = s.curr + s.curr = window{ + count: make([]int64, s.numIntervals), + } + } + + s.mu.Unlock() +} + +// TODO: Implement WaitN +func (s *slidingWindow) Wait(ctx context.Context) error { + <-s.permits + + // Acquire mutex and increment current interval's count + s.mu.Lock() + defer s.mu.Unlock() + + s.curr.count[s.currentInterval]++ + + return nil +} diff --git a/src/internal/common/limiters/sliding_window_test.go b/src/internal/common/limiters/sliding_window_test.go new file mode 100644 index 000000000..60903c5a0 --- /dev/null +++ b/src/internal/common/limiters/sliding_window_test.go @@ -0,0 +1,42 @@ +package limiters + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/alcionai/corso/src/internal/tester" +) + +func BenchmarkSlidingWindowLimiter(b *testing.B) { + // 1 second window, 1 millisecond sliding interval, 1000 token capacity (1k per sec) + limiter := NewLimiter(1*time.Second, 1*time.Millisecond, 1000) + // If the allowed rate is 1k per sec, 4k goroutines should take 3.xx sec + numGoroutines := 4000 + + ctx, flush := tester.NewContext(b) + defer flush() + + var wg sync.WaitGroup + + b.ResetTimer() + b.StartTimer() + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + _ = limiter.Wait(ctx) + }() + } + + wg.Wait() + b.StopTimer() + + totalDuration := b.Elapsed() + + fmt.Printf("Total time taken: %v\n", totalDuration) +} diff --git a/src/internal/m365/graph/concurrency_middleware.go b/src/internal/m365/graph/concurrency_middleware.go index 60762ca32..bec525558 100644 --- a/src/internal/m365/graph/concurrency_middleware.go +++ b/src/internal/m365/graph/concurrency_middleware.go @@ -11,6 +11,7 @@ import ( khttp "github.com/microsoft/kiota-http-go" "golang.org/x/time/rate" + "github.com/alcionai/corso/src/internal/common/limiters" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -102,6 +103,9 @@ var ( driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap) // also used as the exchange service limiter defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap) + + // 10 min window, 1 second sliding interval, 10k capacity + exchangeLimiter = limiters.NewLimiter(10*time.Minute, 1*time.Second, 10000) ) type LimiterCfg struct { @@ -185,10 +189,12 @@ func ctxLimiterConsumption(ctx context.Context, defaultConsumption int) int { // calls-per-minute rate. Otherwise, the call will wait in a queue until // the next token set is available. func QueueRequest(ctx context.Context) { - limiter := ctxLimiter(ctx) - consume := ctxLimiterConsumption(ctx, defaultLC) - - if err := limiter.WaitN(ctx, consume); err != nil { + // limiter := ctxLimiter(ctx) + // consume := ctxLimiterConsumption(ctx, defaultLC) + // if err := limiter.WaitN(ctx, consume); err != nil { + // logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter") + // } + if err := exchangeLimiter.Wait(ctx); err != nil { logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter") } }