From 2e6cb20a592d5c351ed27345abf473ae8db4a507 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Tue, 14 Nov 2023 14:18:34 -0800 Subject: [PATCH] Add limiter reset functionality (#4674) This PR adds `Reset` support to limiter. In summary, it provides 2 things: 1. It resets limiter state & refills tokens to original capacity. 2. Reset is transparent to request senders, they can continue to call `Wait` even when reset is in progress. This is done using a new request level lock. It'll be useful towards `WaitN` implementation as well in a following PR. Reason for adding `Reset`: 1. Corso CLI supports backing up the entire tenant. Since exchange service limits are defined on a per app per mailbox basis, if we don't reset the sliding window limiter in between n exchange resources, it'll artificially slow down n-1 resource backups. 2. This is because sliding window limiter front allows front loading the requests. This is not a big problem with token bucket limiter, since it refills tokens at a constant rate of 16 per second. 3. Ideally the rate limiter instance should be scoped to an exchange mailbox, and no `Reset` should be required. However, this would need some major refactoring of graph controller. Currently we use global rate limiter instances, and also the graph controller stays common for all resource backup operations. `Reset` allows us to keep using the global approach. --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [x] :clock1: Yes, but in a later PR - [ ] :no_entry: No #### Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) * # #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- src/internal/common/limiters/limiter.go | 1 + .../common/limiters/sliding_window.go | 42 +++++++ .../common/limiters/sliding_window_test.go | 111 ++++++++++++++++++ 3 files changed, 154 insertions(+) diff --git a/src/internal/common/limiters/limiter.go b/src/internal/common/limiters/limiter.go index f842bb957..e653a7aa1 100644 --- a/src/internal/common/limiters/limiter.go +++ b/src/internal/common/limiters/limiter.go @@ -5,4 +5,5 @@ import "context" type Limiter interface { Wait(ctx context.Context) error Shutdown() + Reset() } diff --git a/src/internal/common/limiters/sliding_window.go b/src/internal/common/limiters/sliding_window.go index 2cb65c1f9..d657fa01e 100644 --- a/src/internal/common/limiters/sliding_window.go +++ b/src/internal/common/limiters/sliding_window.go @@ -47,6 +47,11 @@ type slidingWindow struct { // mu synchronizes access to the curr and prev windows mu sync.Mutex + // requestMu synchronizes access between requests. This is especially needed + // for WaitN to ensure that all N tokens are granted to the same request. + // It also allows us to reset the limiter transparently without affecting + // any pending requests. + requestMu sync.Mutex // stopTicker stops the recurring slide ticker stopTicker chan struct{} closeOnce sync.Once @@ -86,6 +91,9 @@ func NewSlidingWindowLimiter( // Wait blocks a request until a token is available or the context is cancelled. // TODO(pandeyabs): Implement WaitN. func (s *slidingWindow) Wait(ctx context.Context) error { + s.requestMu.Lock() + defer s.requestMu.Unlock() + select { case <-ctx.Done(): return clues.Stack(ctx.Err()) @@ -107,8 +115,42 @@ func (s *slidingWindow) Shutdown() { }) } +// Reset clears the limiter state & refills tokens to original capacity. +// Reset is transparent to request senders, they can continue to call Wait +// even while reset is in progress. +func (s *slidingWindow) Reset() { + // Acquire request mutex and slide mutex in order like Wait does. + s.requestMu.Lock() + defer s.requestMu.Unlock() + + s.mu.Lock() + defer s.mu.Unlock() + + // Clear all existing counts + s.prev = fixedWindow{ + count: make([]int, s.numIntervals), + } + s.curr = fixedWindow{ + count: make([]int, s.numIntervals), + } + + // Reset permits + close(s.permits) + s.permits = make(chan token, s.capacity) + + // Prefill permits to allow tokens to be granted immediately + for i := 0; i < s.capacity; i++ { + s.permits <- token{} + } +} + // initialize starts the slide goroutine and prefills tokens to full capacity. func (s *slidingWindow) initialize() { + // Hold request mutex to ensure that no requests get processed until + // we are done initializing. + s.requestMu.Lock() + defer s.requestMu.Unlock() + // Ok to not hold the mutex here since nothing else is running yet. s.nextInterval() diff --git a/src/internal/common/limiters/sliding_window_test.go b/src/internal/common/limiters/sliding_window_test.go index ba8b8aad4..def355ea5 100644 --- a/src/internal/common/limiters/sliding_window_test.go +++ b/src/internal/common/limiters/sliding_window_test.go @@ -283,3 +283,114 @@ func (suite *SlidingWindowUnitTestSuite) TestShutdown() { // Second call to Shutdown() should be a no-op. s.Shutdown() } + +// TestReset tests if limiter state is cleared and all tokens are available for +// use post reset. +func (suite *SlidingWindowUnitTestSuite) TestReset() { + var ( + t = suite.T() + windowSize = 100 * time.Millisecond + slideInterval = 10 * time.Millisecond + capacity = 10 + numRequests = capacity + wg sync.WaitGroup + ) + + defer goleak.VerifyNone(t) + + ctx, flush := tester.NewContext(t) + defer flush() + + s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity) + require.NoError(t, err) + + // Make some requests to the limiter. + for i := 0; i < numRequests; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + err := s.Wait(ctx) + require.NoError(t, err) + }() + } + wg.Wait() + + // Reset the limiter. + s.Reset() + + // Shutdown the ticker before accessing the internal limiter state. + s.Shutdown() + + sw := s.(*slidingWindow) + + // Check if state is cleared, and all tokens are available for use post reset. + require.Equal(t, capacity, len(sw.permits)) + + for i := 0; i < sw.numIntervals; i++ { + require.Equal(t, 0, sw.prev.count[i]) + require.Equal(t, 0, sw.curr.count[i]) + } +} + +// TestResetDuringActiveRequests tests if reset is transparent to any active +// requests and they are not affected. It also checks that limiter stays +// within capacity limits post reset. +func (suite *SlidingWindowUnitTestSuite) TestResetDuringActiveRequests() { + var ( + t = suite.T() + windowSize = 100 * time.Millisecond + slideInterval = 10 * time.Millisecond + capacity = 10 + numRequests = 10 * capacity + wg sync.WaitGroup + ) + + defer goleak.VerifyNone(t) + + ctx, flush := tester.NewContext(t) + defer flush() + + s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity) + require.NoError(t, err) + + // Make some requests to the limiter as well as reset it concurrently + // in 10:1 request to reset ratio. + for i := 0; i < numRequests; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + err := s.Wait(ctx) + require.NoError(t, err) + }() + + // Launch a reset every 10th iteration. + if i%10 == 0 { + wg.Add(1) + + go func() { + defer wg.Done() + + s.Reset() + }() + } + } + wg.Wait() + + // Shutdown the ticker before accessing the internal limiter state. + s.Shutdown() + + // Verify that number of requests allowed in each window is less than or equal + // to window capacity + sw := s.(*slidingWindow) + data := append(sw.prev.count, sw.curr.count...) + + sums := slidingSums(data, sw.numIntervals) + + for _, sum := range sums { + require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity) + } +}