Implement a sliding window rate limiter (#4618)
<!-- PR description--> Create a sliding window rate limiter which provides 2 guarantees: 1. Ensures we stay within service limits for a given window duration. 2. Allows token tracking and window sliding at a finer resolution. This allows for more accuracy at the cost of more frequent sliding & more memory usage. This implementation is partially inspired by the centralized token grant mechanism in https://github.com/reugn/equalizer#slider. --- #### Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [x] 🕐 Yes, but in a later PR - [ ] ⛔ No #### Type of change <!--- Please check the type of change your PR introduces: ---> - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Supportability/Tests - [ ] 💻 CI/Deployment - [ ] 🧹 Tech Debt/Cleanup #### Issue(s) <!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. --> * #<issue> #### Test Plan <!-- How will this be tested prior to merging.--> - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
85f8c9b735
commit
a4d7f56396
@ -34,6 +34,7 @@ require (
|
||||
github.com/tidwall/pretty v1.2.1
|
||||
github.com/tomlazar/table v0.1.2
|
||||
github.com/vbauerster/mpb/v8 v8.1.6
|
||||
go.uber.org/goleak v1.3.0
|
||||
go.uber.org/zap v1.26.0
|
||||
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
|
||||
golang.org/x/time v0.4.0
|
||||
|
||||
@ -468,8 +468,8 @@ go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPi
|
||||
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
|
||||
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
|
||||
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
|
||||
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
|
||||
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
||||
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=
|
||||
|
||||
8
src/internal/common/limiters/limiter.go
Normal file
8
src/internal/common/limiters/limiter.go
Normal file
@ -0,0 +1,8 @@
|
||||
package limiters
|
||||
|
||||
import "context"
|
||||
|
||||
type Limiter interface {
|
||||
Wait(ctx context.Context) error
|
||||
Shutdown()
|
||||
}
|
||||
191
src/internal/common/limiters/sliding_window.go
Normal file
191
src/internal/common/limiters/sliding_window.go
Normal file
@ -0,0 +1,191 @@
|
||||
package limiters
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
)
|
||||
|
||||
type token struct{}
|
||||
|
||||
type fixedWindow struct {
|
||||
count []int
|
||||
}
|
||||
|
||||
var _ Limiter = &slidingWindow{}
|
||||
|
||||
type slidingWindow struct {
|
||||
// capacity is the maximum number of requests allowed in a sliding window at
|
||||
// any given time.
|
||||
capacity int
|
||||
// windowSize is the total duration of the sliding window. Limiter will allow
|
||||
// at most capacity requests in this duration.
|
||||
windowSize time.Duration
|
||||
// slideInterval controls how frequently the window slides. Smaller interval
|
||||
// provides better accuracy at the cost of more frequent sliding & more
|
||||
// memory usage.
|
||||
slideInterval time.Duration
|
||||
|
||||
// numIntervals is the number of intervals in the window. Calculated as
|
||||
// windowSize / slideInterval.
|
||||
numIntervals int
|
||||
// currentInterval tracks the current slide interval
|
||||
currentInterval int
|
||||
|
||||
// Each request acquires a token from the permits channel. If the channel
|
||||
// is empty, the request is blocked until a permit is available or if the
|
||||
// context is cancelled.
|
||||
permits chan token
|
||||
|
||||
// curr and prev are fixed windows of size windowSize. Each window contains
|
||||
// a slice of intervals which hold a count of the number of tokens granted
|
||||
// during that interval.
|
||||
curr fixedWindow
|
||||
prev fixedWindow
|
||||
|
||||
// mu synchronizes access to the curr and prev windows
|
||||
mu sync.Mutex
|
||||
// stopTicker stops the recurring slide ticker
|
||||
stopTicker chan struct{}
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
func NewSlidingWindowLimiter(
|
||||
windowSize, slideInterval time.Duration,
|
||||
capacity int,
|
||||
) (Limiter, error) {
|
||||
if err := validate(windowSize, slideInterval, capacity); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ni := int(windowSize / slideInterval)
|
||||
|
||||
s := &slidingWindow{
|
||||
windowSize: windowSize,
|
||||
slideInterval: slideInterval,
|
||||
capacity: capacity,
|
||||
permits: make(chan token, capacity),
|
||||
numIntervals: ni,
|
||||
prev: fixedWindow{
|
||||
count: make([]int, ni),
|
||||
},
|
||||
curr: fixedWindow{
|
||||
count: make([]int, ni),
|
||||
},
|
||||
currentInterval: -1,
|
||||
stopTicker: make(chan struct{}),
|
||||
}
|
||||
|
||||
s.initialize()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Wait blocks a request until a token is available or the context is cancelled.
|
||||
// TODO(pandeyabs): Implement WaitN.
|
||||
func (s *slidingWindow) Wait(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return clues.Stack(ctx.Err())
|
||||
case <-s.permits:
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.curr.count[s.currentInterval]++
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown cleans up the slide goroutine. If shutdown is not called, the slide
|
||||
// goroutine will continue to run until the program exits.
|
||||
func (s *slidingWindow) Shutdown() {
|
||||
s.closeOnce.Do(func() {
|
||||
close(s.stopTicker)
|
||||
})
|
||||
}
|
||||
|
||||
// initialize starts the slide goroutine and prefills tokens to full capacity.
|
||||
func (s *slidingWindow) initialize() {
|
||||
// Ok to not hold the mutex here since nothing else is running yet.
|
||||
s.nextInterval()
|
||||
|
||||
// Start a goroutine which runs every slideInterval. This goroutine will
|
||||
// continue to run until the program exits or until Shutdown is called.
|
||||
go func() {
|
||||
ticker := time.NewTicker(s.slideInterval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.slide()
|
||||
case <-s.stopTicker:
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Prefill permits to allow tokens to be granted immediately
|
||||
for i := 0; i < s.capacity; i++ {
|
||||
s.permits <- token{}
|
||||
}
|
||||
}
|
||||
|
||||
// nextInterval increments the current interval and slides the fixed
|
||||
// windows if needed. Should be called with the mutex held.
|
||||
func (s *slidingWindow) nextInterval() {
|
||||
// Increment current interval
|
||||
s.currentInterval = (s.currentInterval + 1) % s.numIntervals
|
||||
|
||||
// Slide the fixed windows if windowSize time has elapsed.
|
||||
if s.currentInterval == 0 {
|
||||
s.prev = s.curr
|
||||
s.curr = fixedWindow{
|
||||
count: make([]int, s.numIntervals),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// slide moves the window forward by one interval. It reclaims tokens from the
|
||||
// interval that we slid past and adds them back to available permits.
|
||||
func (s *slidingWindow) slide() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.nextInterval()
|
||||
|
||||
for i := 0; i < s.prev.count[s.currentInterval]; i++ {
|
||||
s.permits <- token{}
|
||||
}
|
||||
}
|
||||
|
||||
func validate(
|
||||
windowSize, slideInterval time.Duration,
|
||||
capacity int,
|
||||
) error {
|
||||
if windowSize <= 0 {
|
||||
return clues.New("invalid window size")
|
||||
}
|
||||
|
||||
if slideInterval <= 0 {
|
||||
return clues.New("invalid slide interval")
|
||||
}
|
||||
|
||||
// Allow capacity to be 0 for testing purposes
|
||||
if capacity < 0 {
|
||||
return clues.New("invalid window capacity")
|
||||
}
|
||||
|
||||
if windowSize < slideInterval {
|
||||
return clues.New("window too small to fit intervals")
|
||||
}
|
||||
|
||||
if windowSize%slideInterval != 0 {
|
||||
return clues.New("window not divisible by slide interval")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
285
src/internal/common/limiters/sliding_window_test.go
Normal file
285
src/internal/common/limiters/sliding_window_test.go
Normal file
@ -0,0 +1,285 @@
|
||||
package limiters
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/goleak"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
)
|
||||
|
||||
type SlidingWindowUnitTestSuite struct {
|
||||
tester.Suite
|
||||
}
|
||||
|
||||
func TestSlidingWindowLimiterSuite(t *testing.T) {
|
||||
suite.Run(t, &SlidingWindowUnitTestSuite{Suite: tester.NewUnitSuite(t)})
|
||||
}
|
||||
|
||||
// TestWaitBasic tests the Wait() functionality of the limiter with multiple
|
||||
// concurrent requests.
|
||||
func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() {
|
||||
var (
|
||||
t = suite.T()
|
||||
windowSize = 1 * time.Second
|
||||
// Assume slide interval is equal to window size for simplicity.
|
||||
slideInterval = 1 * time.Second
|
||||
capacity = 100
|
||||
startTime = time.Now()
|
||||
numRequests = 3 * capacity
|
||||
wg sync.WaitGroup
|
||||
mu sync.Mutex
|
||||
intervalToCount = make(map[time.Duration]int)
|
||||
)
|
||||
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer s.Shutdown()
|
||||
|
||||
// Check if all tokens are available for use post initialization.
|
||||
require.Equal(t, capacity, len(s.(*slidingWindow).permits))
|
||||
|
||||
// Make concurrent requests to the limiter
|
||||
for i := 0; i < numRequests; i++ {
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
err := s.Wait(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Number of seconds since startTime
|
||||
bucket := time.Since(startTime).Truncate(windowSize)
|
||||
|
||||
mu.Lock()
|
||||
intervalToCount[bucket]++
|
||||
mu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Verify that number of requests allowed in each window is less than or equal
|
||||
// to window capacity
|
||||
for _, c := range intervalToCount {
|
||||
require.True(t, c <= capacity, "count: %d, capacity: %d", c, capacity)
|
||||
}
|
||||
}
|
||||
|
||||
// TestWaitSliding tests the sliding window functionality of the limiter with
|
||||
// time distributed Wait() calls.
|
||||
func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
|
||||
var (
|
||||
t = suite.T()
|
||||
windowSize = 1 * time.Second
|
||||
slideInterval = 10 * time.Millisecond
|
||||
capacity = 100
|
||||
// Test will run for duration of 2 windowSize.
|
||||
numRequests = 2 * capacity
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Make concurrent requests to the limiter
|
||||
for i := 0; i < numRequests; i++ {
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
// Sleep for a random duration to spread out requests over multiple slide
|
||||
// intervals & windows, so that we can test the sliding window logic better.
|
||||
// Without this, the requests will be bunched up in the very first intervals
|
||||
// of the 2 windows. Rest of the intervals will be empty.
|
||||
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
|
||||
|
||||
err := s.Wait(ctx)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Shutdown the ticker before accessing the internal limiter state.
|
||||
s.Shutdown()
|
||||
|
||||
// Verify that number of requests allowed in each window is less than or equal
|
||||
// to window capacity
|
||||
sw := s.(*slidingWindow)
|
||||
data := append(sw.prev.count, sw.curr.count...)
|
||||
|
||||
sums := slidingSums(data, sw.numIntervals)
|
||||
|
||||
for _, sum := range sums {
|
||||
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity)
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *SlidingWindowUnitTestSuite) TestContextCancellation() {
|
||||
t := suite.T()
|
||||
|
||||
// Since this test can infinitely block on failure conditions, run it within
|
||||
// a time contained eventually block.
|
||||
assert.Eventually(t, func() bool {
|
||||
var (
|
||||
windowSize = 100 * time.Millisecond
|
||||
slideInterval = 10 * time.Millisecond
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
// Initialize limiter with capacity = 0 to test context cancellations.
|
||||
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer s.Shutdown()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 2*windowSize)
|
||||
defer cancel()
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
err := s.Wait(ctx)
|
||||
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return true
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
func (suite *SlidingWindowUnitTestSuite) TestNewSlidingWindowLimiter() {
|
||||
tests := []struct {
|
||||
name string
|
||||
windowSize time.Duration
|
||||
slideInterval time.Duration
|
||||
capacity int
|
||||
expectErr assert.ErrorAssertionFunc
|
||||
}{
|
||||
{
|
||||
name: "Invalid window size",
|
||||
windowSize: 0,
|
||||
slideInterval: 10 * time.Millisecond,
|
||||
capacity: 100,
|
||||
expectErr: assert.Error,
|
||||
},
|
||||
{
|
||||
name: "Invalid slide interval",
|
||||
windowSize: 100 * time.Millisecond,
|
||||
slideInterval: 0,
|
||||
capacity: 100,
|
||||
expectErr: assert.Error,
|
||||
},
|
||||
{
|
||||
name: "Slide interval > window size",
|
||||
windowSize: 10 * time.Millisecond,
|
||||
slideInterval: 100 * time.Millisecond,
|
||||
capacity: 100,
|
||||
expectErr: assert.Error,
|
||||
},
|
||||
{
|
||||
name: "Invalid capacity",
|
||||
windowSize: 100 * time.Millisecond,
|
||||
slideInterval: 10 * time.Millisecond,
|
||||
capacity: -1,
|
||||
expectErr: assert.Error,
|
||||
},
|
||||
{
|
||||
name: "Window not divisible by slide interval",
|
||||
windowSize: 100 * time.Millisecond,
|
||||
slideInterval: 11 * time.Millisecond,
|
||||
capacity: 100,
|
||||
expectErr: assert.Error,
|
||||
},
|
||||
{
|
||||
name: "Valid parameters",
|
||||
windowSize: 100 * time.Millisecond,
|
||||
slideInterval: 10 * time.Millisecond,
|
||||
capacity: 100,
|
||||
expectErr: assert.NoError,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
suite.Run(test.name, func() {
|
||||
t := suite.T()
|
||||
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
s, err := NewSlidingWindowLimiter(
|
||||
test.windowSize,
|
||||
test.slideInterval,
|
||||
test.capacity)
|
||||
if s != nil {
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
test.expectErr(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func slidingSums(data []int, w int) []int {
|
||||
var (
|
||||
sum = 0
|
||||
res = make([]int, len(data)-w+1)
|
||||
)
|
||||
|
||||
for i := 0; i < w; i++ {
|
||||
sum += data[i]
|
||||
}
|
||||
|
||||
res[0] = sum
|
||||
|
||||
for i := 1; i < len(data)-w+1; i++ {
|
||||
sum = sum - data[i-1] + data[i+w-1]
|
||||
res[i] = sum
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func (suite *SlidingWindowUnitTestSuite) TestShutdown() {
|
||||
var (
|
||||
t = suite.T()
|
||||
windowSize = 1 * time.Second
|
||||
slideInterval = 1 * time.Second
|
||||
capacity = 100
|
||||
)
|
||||
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
|
||||
require.NoError(t, err)
|
||||
|
||||
s.Shutdown()
|
||||
|
||||
// Second call to Shutdown() should be a no-op.
|
||||
s.Shutdown()
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user