Add sliding window rate limiter

This commit is contained in:
Abhishek Pandey 2023-11-02 03:08:55 -07:00
parent e46cf645e5
commit 07740938fb
3 changed files with 182 additions and 4 deletions

View File

@ -0,0 +1,130 @@
package limiters
import (
"context"
"sync"
"time"
)
type (
token struct{}
Limiter interface {
Wait(ctx context.Context) error
}
)
// TODO: Expose interfaces for limiter and window
type window struct {
// TODO: See if we need to store start time. Without it there is no way
// to tell if the ticker is lagging behind ( due to contention from consumers or otherwise).
// Although with our use cases, at max we'd have 10k requests contending with the ticker which
// should be easily doable in fraction of 1 sec. Although we should benchmark this.
// start time.Time
count []int64
}
var _ Limiter = &slidingWindow{}
type slidingWindow struct {
w time.Duration
slidingInterval time.Duration
capacity int64
currentInterval int64
numIntervals int64
permits chan token
mu sync.Mutex
curr window
prev window
}
// slidingInterval controls degree of movement of the sliding window from left to right
// Smaller slidingInterval means more frequent movement of the sliding window.
// TODO: Introduce an option to control token refresh frequency. Otherwise, if the sliding interval is
// large, it may slow down the token refresh rate. Not implementing this for simplicity, since for our
// use cases we are going to have a sliding interval of 1 sec which is good enough.
func NewLimiter(w time.Duration, slidingInterval time.Duration, capacity int64) Limiter {
ni := int64(w / slidingInterval)
sw := &slidingWindow{
w: w,
slidingInterval: slidingInterval,
capacity: capacity,
permits: make(chan token, capacity),
numIntervals: ni,
prev: window{
count: make([]int64, ni),
},
curr: window{
count: make([]int64, ni),
},
currentInterval: -1,
}
// Initialize
sw.nextInterval()
// Move the sliding window forward every slidingInterval
// TODO: fix leaking goroutine
go sw.run()
// Prefill permits
for i := int64(0); i < capacity; i++ {
sw.permits <- token{}
}
return sw
}
// TODO: Implement stopping the ticker
func (s *slidingWindow) run() {
ticker := time.NewTicker(s.slidingInterval)
for range ticker.C {
s.slide()
}
}
func (s *slidingWindow) slide() {
// Slide into the next interval
s.nextInterval()
// Remove permits from the previous window
for i := int64(0); i < s.prev.count[s.currentInterval]; i++ {
select {
case s.permits <- token{}:
default:
// Skip if permits are at capacity
return
}
}
}
// next increments the current interval and resets the current window if needed
func (s *slidingWindow) nextInterval() {
s.mu.Lock()
// Increment current interval
s.currentInterval = (s.currentInterval + 1) % s.numIntervals
// If it's the first interval, move curr window to prev window and reset curr window.
if s.currentInterval == 0 {
s.prev = s.curr
s.curr = window{
count: make([]int64, s.numIntervals),
}
}
s.mu.Unlock()
}
// TODO: Implement WaitN
func (s *slidingWindow) Wait(ctx context.Context) error {
<-s.permits
// Acquire mutex and increment current interval's count
s.mu.Lock()
defer s.mu.Unlock()
s.curr.count[s.currentInterval]++
return nil
}

View File

@ -0,0 +1,42 @@
package limiters
import (
"fmt"
"sync"
"testing"
"time"
"github.com/alcionai/corso/src/internal/tester"
)
func BenchmarkSlidingWindowLimiter(b *testing.B) {
// 1 second window, 1 millisecond sliding interval, 1000 token capacity (1k per sec)
limiter := NewLimiter(1*time.Second, 1*time.Millisecond, 1000)
// If the allowed rate is 1k per sec, 4k goroutines should take 3.xx sec
numGoroutines := 4000
ctx, flush := tester.NewContext(b)
defer flush()
var wg sync.WaitGroup
b.ResetTimer()
b.StartTimer()
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = limiter.Wait(ctx)
}()
}
wg.Wait()
b.StopTimer()
totalDuration := b.Elapsed()
fmt.Printf("Total time taken: %v\n", totalDuration)
}

View File

@ -11,6 +11,7 @@ import (
khttp "github.com/microsoft/kiota-http-go"
"golang.org/x/time/rate"
"github.com/alcionai/corso/src/internal/common/limiters"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -102,6 +103,9 @@ var (
driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap)
// also used as the exchange service limiter
defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap)
// 10 min window, 1 second sliding interval, 10k capacity
exchangeLimiter = limiters.NewLimiter(10*time.Minute, 1*time.Second, 10000)
)
type LimiterCfg struct {
@ -185,10 +189,12 @@ func ctxLimiterConsumption(ctx context.Context, defaultConsumption int) int {
// calls-per-minute rate. Otherwise, the call will wait in a queue until
// the next token set is available.
func QueueRequest(ctx context.Context) {
limiter := ctxLimiter(ctx)
consume := ctxLimiterConsumption(ctx, defaultLC)
if err := limiter.WaitN(ctx, consume); err != nil {
// limiter := ctxLimiter(ctx)
// consume := ctxLimiterConsumption(ctx, defaultLC)
// if err := limiter.WaitN(ctx, consume); err != nil {
// logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter")
// }
if err := exchangeLimiter.Wait(ctx); err != nil {
logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter")
}
}