From a4d7f56396d4a1800ea484d4114930ad8d9eb328 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Wed, 8 Nov 2023 19:00:08 -0800 Subject: [PATCH] Implement a sliding window rate limiter (#4618) Create a sliding window rate limiter which provides 2 guarantees: 1. Ensures we stay within service limits for a given window duration. 2. Allows token tracking and window sliding at a finer resolution. This allows for more accuracy at the cost of more frequent sliding & more memory usage. This implementation is partially inspired by the centralized token grant mechanism in https://github.com/reugn/equalizer#slider. --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [x] :clock1: Yes, but in a later PR - [ ] :no_entry: No #### Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) * # #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- src/go.mod | 1 + src/go.sum | 4 +- src/internal/common/limiters/limiter.go | 8 + .../common/limiters/sliding_window.go | 191 ++++++++++++ .../common/limiters/sliding_window_test.go | 285 ++++++++++++++++++ 5 files changed, 487 insertions(+), 2 deletions(-) create mode 100644 src/internal/common/limiters/limiter.go create mode 100644 src/internal/common/limiters/sliding_window.go create mode 100644 src/internal/common/limiters/sliding_window_test.go diff --git a/src/go.mod b/src/go.mod index a3d340f30..deeef50b3 100644 --- a/src/go.mod +++ b/src/go.mod @@ -34,6 +34,7 @@ require ( github.com/tidwall/pretty v1.2.1 github.com/tomlazar/table v0.1.2 github.com/vbauerster/mpb/v8 v8.1.6 + go.uber.org/goleak v1.3.0 go.uber.org/zap v1.26.0 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/time v0.4.0 diff --git a/src/go.sum b/src/go.sum index 51410de63..480b9ecfb 100644 --- a/src/go.sum +++ b/src/go.sum @@ -468,8 +468,8 @@ go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPi go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= -go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= diff --git a/src/internal/common/limiters/limiter.go b/src/internal/common/limiters/limiter.go new file mode 100644 index 000000000..f842bb957 --- /dev/null +++ b/src/internal/common/limiters/limiter.go @@ -0,0 +1,8 @@ +package limiters + +import "context" + +type Limiter interface { + Wait(ctx context.Context) error + Shutdown() +} diff --git a/src/internal/common/limiters/sliding_window.go b/src/internal/common/limiters/sliding_window.go new file mode 100644 index 000000000..2cb65c1f9 --- /dev/null +++ b/src/internal/common/limiters/sliding_window.go @@ -0,0 +1,191 @@ +package limiters + +import ( + "context" + "sync" + "time" + + "github.com/alcionai/clues" +) + +type token struct{} + +type fixedWindow struct { + count []int +} + +var _ Limiter = &slidingWindow{} + +type slidingWindow struct { + // capacity is the maximum number of requests allowed in a sliding window at + // any given time. + capacity int + // windowSize is the total duration of the sliding window. Limiter will allow + // at most capacity requests in this duration. + windowSize time.Duration + // slideInterval controls how frequently the window slides. Smaller interval + // provides better accuracy at the cost of more frequent sliding & more + // memory usage. + slideInterval time.Duration + + // numIntervals is the number of intervals in the window. Calculated as + // windowSize / slideInterval. + numIntervals int + // currentInterval tracks the current slide interval + currentInterval int + + // Each request acquires a token from the permits channel. If the channel + // is empty, the request is blocked until a permit is available or if the + // context is cancelled. + permits chan token + + // curr and prev are fixed windows of size windowSize. Each window contains + // a slice of intervals which hold a count of the number of tokens granted + // during that interval. + curr fixedWindow + prev fixedWindow + + // mu synchronizes access to the curr and prev windows + mu sync.Mutex + // stopTicker stops the recurring slide ticker + stopTicker chan struct{} + closeOnce sync.Once +} + +func NewSlidingWindowLimiter( + windowSize, slideInterval time.Duration, + capacity int, +) (Limiter, error) { + if err := validate(windowSize, slideInterval, capacity); err != nil { + return nil, err + } + + ni := int(windowSize / slideInterval) + + s := &slidingWindow{ + windowSize: windowSize, + slideInterval: slideInterval, + capacity: capacity, + permits: make(chan token, capacity), + numIntervals: ni, + prev: fixedWindow{ + count: make([]int, ni), + }, + curr: fixedWindow{ + count: make([]int, ni), + }, + currentInterval: -1, + stopTicker: make(chan struct{}), + } + + s.initialize() + + return s, nil +} + +// Wait blocks a request until a token is available or the context is cancelled. +// TODO(pandeyabs): Implement WaitN. +func (s *slidingWindow) Wait(ctx context.Context) error { + select { + case <-ctx.Done(): + return clues.Stack(ctx.Err()) + case <-s.permits: + s.mu.Lock() + defer s.mu.Unlock() + + s.curr.count[s.currentInterval]++ + } + + return nil +} + +// Shutdown cleans up the slide goroutine. If shutdown is not called, the slide +// goroutine will continue to run until the program exits. +func (s *slidingWindow) Shutdown() { + s.closeOnce.Do(func() { + close(s.stopTicker) + }) +} + +// initialize starts the slide goroutine and prefills tokens to full capacity. +func (s *slidingWindow) initialize() { + // Ok to not hold the mutex here since nothing else is running yet. + s.nextInterval() + + // Start a goroutine which runs every slideInterval. This goroutine will + // continue to run until the program exits or until Shutdown is called. + go func() { + ticker := time.NewTicker(s.slideInterval) + + for { + select { + case <-ticker.C: + s.slide() + case <-s.stopTicker: + ticker.Stop() + return + } + } + }() + + // Prefill permits to allow tokens to be granted immediately + for i := 0; i < s.capacity; i++ { + s.permits <- token{} + } +} + +// nextInterval increments the current interval and slides the fixed +// windows if needed. Should be called with the mutex held. +func (s *slidingWindow) nextInterval() { + // Increment current interval + s.currentInterval = (s.currentInterval + 1) % s.numIntervals + + // Slide the fixed windows if windowSize time has elapsed. + if s.currentInterval == 0 { + s.prev = s.curr + s.curr = fixedWindow{ + count: make([]int, s.numIntervals), + } + } +} + +// slide moves the window forward by one interval. It reclaims tokens from the +// interval that we slid past and adds them back to available permits. +func (s *slidingWindow) slide() { + s.mu.Lock() + defer s.mu.Unlock() + + s.nextInterval() + + for i := 0; i < s.prev.count[s.currentInterval]; i++ { + s.permits <- token{} + } +} + +func validate( + windowSize, slideInterval time.Duration, + capacity int, +) error { + if windowSize <= 0 { + return clues.New("invalid window size") + } + + if slideInterval <= 0 { + return clues.New("invalid slide interval") + } + + // Allow capacity to be 0 for testing purposes + if capacity < 0 { + return clues.New("invalid window capacity") + } + + if windowSize < slideInterval { + return clues.New("window too small to fit intervals") + } + + if windowSize%slideInterval != 0 { + return clues.New("window not divisible by slide interval") + } + + return nil +} diff --git a/src/internal/common/limiters/sliding_window_test.go b/src/internal/common/limiters/sliding_window_test.go new file mode 100644 index 000000000..ba8b8aad4 --- /dev/null +++ b/src/internal/common/limiters/sliding_window_test.go @@ -0,0 +1,285 @@ +package limiters + +import ( + "context" + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.uber.org/goleak" + + "github.com/alcionai/corso/src/internal/tester" +) + +type SlidingWindowUnitTestSuite struct { + tester.Suite +} + +func TestSlidingWindowLimiterSuite(t *testing.T) { + suite.Run(t, &SlidingWindowUnitTestSuite{Suite: tester.NewUnitSuite(t)}) +} + +// TestWaitBasic tests the Wait() functionality of the limiter with multiple +// concurrent requests. +func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() { + var ( + t = suite.T() + windowSize = 1 * time.Second + // Assume slide interval is equal to window size for simplicity. + slideInterval = 1 * time.Second + capacity = 100 + startTime = time.Now() + numRequests = 3 * capacity + wg sync.WaitGroup + mu sync.Mutex + intervalToCount = make(map[time.Duration]int) + ) + + defer goleak.VerifyNone(t) + + ctx, flush := tester.NewContext(t) + defer flush() + + s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity) + require.NoError(t, err) + + defer s.Shutdown() + + // Check if all tokens are available for use post initialization. + require.Equal(t, capacity, len(s.(*slidingWindow).permits)) + + // Make concurrent requests to the limiter + for i := 0; i < numRequests; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + err := s.Wait(ctx) + require.NoError(t, err) + + // Number of seconds since startTime + bucket := time.Since(startTime).Truncate(windowSize) + + mu.Lock() + intervalToCount[bucket]++ + mu.Unlock() + }() + } + + wg.Wait() + + // Verify that number of requests allowed in each window is less than or equal + // to window capacity + for _, c := range intervalToCount { + require.True(t, c <= capacity, "count: %d, capacity: %d", c, capacity) + } +} + +// TestWaitSliding tests the sliding window functionality of the limiter with +// time distributed Wait() calls. +func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() { + var ( + t = suite.T() + windowSize = 1 * time.Second + slideInterval = 10 * time.Millisecond + capacity = 100 + // Test will run for duration of 2 windowSize. + numRequests = 2 * capacity + wg sync.WaitGroup + ) + + defer goleak.VerifyNone(t) + + ctx, flush := tester.NewContext(t) + defer flush() + + s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity) + require.NoError(t, err) + + // Make concurrent requests to the limiter + for i := 0; i < numRequests; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + // Sleep for a random duration to spread out requests over multiple slide + // intervals & windows, so that we can test the sliding window logic better. + // Without this, the requests will be bunched up in the very first intervals + // of the 2 windows. Rest of the intervals will be empty. + time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond) + + err := s.Wait(ctx) + require.NoError(t, err) + }() + } + wg.Wait() + + // Shutdown the ticker before accessing the internal limiter state. + s.Shutdown() + + // Verify that number of requests allowed in each window is less than or equal + // to window capacity + sw := s.(*slidingWindow) + data := append(sw.prev.count, sw.curr.count...) + + sums := slidingSums(data, sw.numIntervals) + + for _, sum := range sums { + require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity) + } +} + +func (suite *SlidingWindowUnitTestSuite) TestContextCancellation() { + t := suite.T() + + // Since this test can infinitely block on failure conditions, run it within + // a time contained eventually block. + assert.Eventually(t, func() bool { + var ( + windowSize = 100 * time.Millisecond + slideInterval = 10 * time.Millisecond + wg sync.WaitGroup + ) + + ctx, flush := tester.NewContext(t) + defer flush() + + // Initialize limiter with capacity = 0 to test context cancellations. + s, err := NewSlidingWindowLimiter(windowSize, slideInterval, 0) + require.NoError(t, err) + + defer s.Shutdown() + + ctx, cancel := context.WithTimeout(ctx, 2*windowSize) + defer cancel() + + wg.Add(1) + + go func() { + defer wg.Done() + + err := s.Wait(ctx) + require.ErrorIs(t, err, context.DeadlineExceeded) + }() + + wg.Wait() + + return true + }, 3*time.Second, 100*time.Millisecond) +} + +func (suite *SlidingWindowUnitTestSuite) TestNewSlidingWindowLimiter() { + tests := []struct { + name string + windowSize time.Duration + slideInterval time.Duration + capacity int + expectErr assert.ErrorAssertionFunc + }{ + { + name: "Invalid window size", + windowSize: 0, + slideInterval: 10 * time.Millisecond, + capacity: 100, + expectErr: assert.Error, + }, + { + name: "Invalid slide interval", + windowSize: 100 * time.Millisecond, + slideInterval: 0, + capacity: 100, + expectErr: assert.Error, + }, + { + name: "Slide interval > window size", + windowSize: 10 * time.Millisecond, + slideInterval: 100 * time.Millisecond, + capacity: 100, + expectErr: assert.Error, + }, + { + name: "Invalid capacity", + windowSize: 100 * time.Millisecond, + slideInterval: 10 * time.Millisecond, + capacity: -1, + expectErr: assert.Error, + }, + { + name: "Window not divisible by slide interval", + windowSize: 100 * time.Millisecond, + slideInterval: 11 * time.Millisecond, + capacity: 100, + expectErr: assert.Error, + }, + { + name: "Valid parameters", + windowSize: 100 * time.Millisecond, + slideInterval: 10 * time.Millisecond, + capacity: 100, + expectErr: assert.NoError, + }, + } + + for _, test := range tests { + suite.Run(test.name, func() { + t := suite.T() + + defer goleak.VerifyNone(t) + + s, err := NewSlidingWindowLimiter( + test.windowSize, + test.slideInterval, + test.capacity) + if s != nil { + s.Shutdown() + } + + test.expectErr(t, err) + }) + } +} + +func slidingSums(data []int, w int) []int { + var ( + sum = 0 + res = make([]int, len(data)-w+1) + ) + + for i := 0; i < w; i++ { + sum += data[i] + } + + res[0] = sum + + for i := 1; i < len(data)-w+1; i++ { + sum = sum - data[i-1] + data[i+w-1] + res[i] = sum + } + + return res +} + +func (suite *SlidingWindowUnitTestSuite) TestShutdown() { + var ( + t = suite.T() + windowSize = 1 * time.Second + slideInterval = 1 * time.Second + capacity = 100 + ) + + defer goleak.VerifyNone(t) + + s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity) + require.NoError(t, err) + + s.Shutdown() + + // Second call to Shutdown() should be a no-op. + s.Shutdown() +}