diff --git a/src/internal/common/limiters/limiter.go b/src/internal/common/limiters/limiter.go index f842bb957..e653a7aa1 100644 --- a/src/internal/common/limiters/limiter.go +++ b/src/internal/common/limiters/limiter.go @@ -5,4 +5,5 @@ import "context" type Limiter interface { Wait(ctx context.Context) error Shutdown() + Reset() } diff --git a/src/internal/common/limiters/sliding_window.go b/src/internal/common/limiters/sliding_window.go index 2cb65c1f9..d657fa01e 100644 --- a/src/internal/common/limiters/sliding_window.go +++ b/src/internal/common/limiters/sliding_window.go @@ -47,6 +47,11 @@ type slidingWindow struct { // mu synchronizes access to the curr and prev windows mu sync.Mutex + // requestMu synchronizes access between requests. This is especially needed + // for WaitN to ensure that all N tokens are granted to the same request. + // It also allows us to reset the limiter transparently without affecting + // any pending requests. + requestMu sync.Mutex // stopTicker stops the recurring slide ticker stopTicker chan struct{} closeOnce sync.Once @@ -86,6 +91,9 @@ func NewSlidingWindowLimiter( // Wait blocks a request until a token is available or the context is cancelled. // TODO(pandeyabs): Implement WaitN. func (s *slidingWindow) Wait(ctx context.Context) error { + s.requestMu.Lock() + defer s.requestMu.Unlock() + select { case <-ctx.Done(): return clues.Stack(ctx.Err()) @@ -107,8 +115,42 @@ func (s *slidingWindow) Shutdown() { }) } +// Reset clears the limiter state & refills tokens to original capacity. +// Reset is transparent to request senders, they can continue to call Wait +// even while reset is in progress. +func (s *slidingWindow) Reset() { + // Acquire request mutex and slide mutex in order like Wait does. + s.requestMu.Lock() + defer s.requestMu.Unlock() + + s.mu.Lock() + defer s.mu.Unlock() + + // Clear all existing counts + s.prev = fixedWindow{ + count: make([]int, s.numIntervals), + } + s.curr = fixedWindow{ + count: make([]int, s.numIntervals), + } + + // Reset permits + close(s.permits) + s.permits = make(chan token, s.capacity) + + // Prefill permits to allow tokens to be granted immediately + for i := 0; i < s.capacity; i++ { + s.permits <- token{} + } +} + // initialize starts the slide goroutine and prefills tokens to full capacity. func (s *slidingWindow) initialize() { + // Hold request mutex to ensure that no requests get processed until + // we are done initializing. + s.requestMu.Lock() + defer s.requestMu.Unlock() + // Ok to not hold the mutex here since nothing else is running yet. s.nextInterval() diff --git a/src/internal/common/limiters/sliding_window_test.go b/src/internal/common/limiters/sliding_window_test.go index ba8b8aad4..def355ea5 100644 --- a/src/internal/common/limiters/sliding_window_test.go +++ b/src/internal/common/limiters/sliding_window_test.go @@ -283,3 +283,114 @@ func (suite *SlidingWindowUnitTestSuite) TestShutdown() { // Second call to Shutdown() should be a no-op. s.Shutdown() } + +// TestReset tests if limiter state is cleared and all tokens are available for +// use post reset. +func (suite *SlidingWindowUnitTestSuite) TestReset() { + var ( + t = suite.T() + windowSize = 100 * time.Millisecond + slideInterval = 10 * time.Millisecond + capacity = 10 + numRequests = capacity + wg sync.WaitGroup + ) + + defer goleak.VerifyNone(t) + + ctx, flush := tester.NewContext(t) + defer flush() + + s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity) + require.NoError(t, err) + + // Make some requests to the limiter. + for i := 0; i < numRequests; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + err := s.Wait(ctx) + require.NoError(t, err) + }() + } + wg.Wait() + + // Reset the limiter. + s.Reset() + + // Shutdown the ticker before accessing the internal limiter state. + s.Shutdown() + + sw := s.(*slidingWindow) + + // Check if state is cleared, and all tokens are available for use post reset. + require.Equal(t, capacity, len(sw.permits)) + + for i := 0; i < sw.numIntervals; i++ { + require.Equal(t, 0, sw.prev.count[i]) + require.Equal(t, 0, sw.curr.count[i]) + } +} + +// TestResetDuringActiveRequests tests if reset is transparent to any active +// requests and they are not affected. It also checks that limiter stays +// within capacity limits post reset. +func (suite *SlidingWindowUnitTestSuite) TestResetDuringActiveRequests() { + var ( + t = suite.T() + windowSize = 100 * time.Millisecond + slideInterval = 10 * time.Millisecond + capacity = 10 + numRequests = 10 * capacity + wg sync.WaitGroup + ) + + defer goleak.VerifyNone(t) + + ctx, flush := tester.NewContext(t) + defer flush() + + s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity) + require.NoError(t, err) + + // Make some requests to the limiter as well as reset it concurrently + // in 10:1 request to reset ratio. + for i := 0; i < numRequests; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + err := s.Wait(ctx) + require.NoError(t, err) + }() + + // Launch a reset every 10th iteration. + if i%10 == 0 { + wg.Add(1) + + go func() { + defer wg.Done() + + s.Reset() + }() + } + } + wg.Wait() + + // Shutdown the ticker before accessing the internal limiter state. + s.Shutdown() + + // Verify that number of requests allowed in each window is less than or equal + // to window capacity + sw := s.(*slidingWindow) + data := append(sw.prev.count, sw.curr.count...) + + sums := slidingSums(data, sw.numIntervals) + + for _, sum := range sums { + require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity) + } +}