From 59c0e7d6d3785defd500c7194c2b4ead47d72593 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Wed, 8 Nov 2023 21:22:04 -0800 Subject: [PATCH] Implement WaitN --- src/internal/common/limiters/limiter.go | 1 + .../common/limiters/sliding_window.go | 23 ++-- .../common/limiters/sliding_window_test.go | 114 +++++++++++------- 3 files changed, 85 insertions(+), 53 deletions(-) diff --git a/src/internal/common/limiters/limiter.go b/src/internal/common/limiters/limiter.go index f842bb957..d57c545f7 100644 --- a/src/internal/common/limiters/limiter.go +++ b/src/internal/common/limiters/limiter.go @@ -4,5 +4,6 @@ import "context" type Limiter interface { Wait(ctx context.Context) error + WaitN(ctx context.Context, N int) error Shutdown() } diff --git a/src/internal/common/limiters/sliding_window.go b/src/internal/common/limiters/sliding_window.go index 2cb65c1f9..58cb8b149 100644 --- a/src/internal/common/limiters/sliding_window.go +++ b/src/internal/common/limiters/sliding_window.go @@ -84,16 +84,23 @@ func NewSlidingWindowLimiter( } // Wait blocks a request until a token is available or the context is cancelled. -// TODO(pandeyabs): Implement WaitN. +// Equivalent to calling WaitN(ctx, 1). func (s *slidingWindow) Wait(ctx context.Context) error { - select { - case <-ctx.Done(): - return clues.Stack(ctx.Err()) - case <-s.permits: - s.mu.Lock() - defer s.mu.Unlock() + return s.WaitN(ctx, 1) +} - s.curr.count[s.currentInterval]++ +// Wait blocks a request until N tokens are available or the context gets +// cancelled. +func (s *slidingWindow) WaitN(ctx context.Context, N int) error { + for i := 0; i < N; i++ { + select { + case <-ctx.Done(): + return clues.Stack(ctx.Err()) + case <-s.permits: + s.mu.Lock() + s.curr.count[s.currentInterval]++ + s.mu.Unlock() + } } return nil diff --git a/src/internal/common/limiters/sliding_window_test.go b/src/internal/common/limiters/sliding_window_test.go index ba8b8aad4..6074677e9 100644 --- a/src/internal/common/limiters/sliding_window_test.go +++ b/src/internal/common/limiters/sliding_window_test.go @@ -81,57 +81,81 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() { } // TestWaitSliding tests the sliding window functionality of the limiter with -// time distributed Wait() calls. +// time distributed WaitN() calls. func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() { - var ( - t = suite.T() - windowSize = 1 * time.Second - slideInterval = 10 * time.Millisecond - capacity = 100 - // Test will run for duration of 2 windowSize. - numRequests = 2 * capacity - wg sync.WaitGroup - ) - - defer goleak.VerifyNone(t) - - ctx, flush := tester.NewContext(t) - defer flush() - - s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity) - require.NoError(t, err) - - // Make concurrent requests to the limiter - for i := 0; i < numRequests; i++ { - wg.Add(1) - - go func() { - defer wg.Done() - - // Sleep for a random duration to spread out requests over multiple slide - // intervals & windows, so that we can test the sliding window logic better. - // Without this, the requests will be bunched up in the very first intervals - // of the 2 windows. Rest of the intervals will be empty. - time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond) - - err := s.Wait(ctx) - require.NoError(t, err) - }() + tests := []struct { + Name string + windowSize time.Duration + slideInterval time.Duration + capacity int + numRequests int + N int + }{ + { + Name: "Request 1 token", + windowSize: 1 * time.Second, + slideInterval: 10 * time.Millisecond, + capacity: 100, + numRequests: 200, + N: 1, + }, + { + Name: "Request 5 tokens", + windowSize: 1 * time.Second, + slideInterval: 10 * time.Millisecond, + capacity: 100, + numRequests: 100, + N: 5, + }, } - wg.Wait() - // Shutdown the ticker before accessing the internal limiter state. - s.Shutdown() + for _, test := range tests { + suite.Run(test.Name, func() { + t := suite.T() - // Verify that number of requests allowed in each window is less than or equal - // to window capacity - sw := s.(*slidingWindow) - data := append(sw.prev.count, sw.curr.count...) + defer goleak.VerifyNone(t) - sums := slidingSums(data, sw.numIntervals) + ctx, flush := tester.NewContext(t) + defer flush() - for _, sum := range sums { - require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity) + s, err := NewSlidingWindowLimiter(test.windowSize, test.slideInterval, test.capacity) + require.NoError(t, err) + + var wg sync.WaitGroup + + // Make concurrent requests to the limiter + for i := 0; i < test.numRequests; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + // Sleep for a random duration to spread out requests over multiple slide + // intervals & windows, so that we can test the sliding window logic better. + // Without this, the requests will be bunched up in the very first intervals + // of the 2 windows. Rest of the intervals will be empty. + time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond) + + err := s.WaitN(ctx, test.N) + require.NoError(t, err) + }() + } + wg.Wait() + + // Shutdown the ticker before accessing the internal limiter state. + s.Shutdown() + + // Verify that number of requests allowed in each window is less than or equal + // to window capacity + sw := s.(*slidingWindow) + data := append(sw.prev.count, sw.curr.count...) + + sums := slidingSums(data, sw.numIntervals) + + for _, sum := range sums { + require.True(t, sum <= test.capacity, "sum: %d, capacity: %d", sum, test.capacity) + } + }) } }