Add limiter reset functionality (#4674)

This PR adds `Reset` support to limiter. In summary, it provides 2 things:
1. It resets limiter state & refills tokens to original capacity.
2. Reset is transparent to request senders, they can continue to call `Wait` even when reset is in progress. This is done using a new request level lock. It'll be useful towards `WaitN` implementation as well in a following PR. 

Reason for adding `Reset`:
1. Corso CLI supports backing up the entire tenant. Since exchange service limits are defined on a per app per mailbox basis, if we don't reset the sliding window limiter in between  n exchange resources, it'll artificially slow down n-1 resource backups. 
2. This is because sliding window limiter front allows front loading the requests. This is not a big problem with token bucket limiter, since it refills tokens at a constant rate of 16 per second. 
3. Ideally the rate limiter instance should be scoped to an exchange mailbox, and no `Reset` should be required. However, this would need some major refactoring of graph controller. Currently we use global rate limiter instances, and also the graph controller stays common for all resource backup operations. `Reset` allows us to keep using the global approach. 

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [x] 🕐 Yes, but in a later PR
- [ ]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abhishek Pandey 2023-11-14 14:18:34 -08:00 committed by GitHub
parent 65d0ba425c
commit 2e6cb20a59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 154 additions and 0 deletions

View File

@ -5,4 +5,5 @@ import "context"
type Limiter interface {
Wait(ctx context.Context) error
Shutdown()
Reset()
}

View File

@ -47,6 +47,11 @@ type slidingWindow struct {
// mu synchronizes access to the curr and prev windows
mu sync.Mutex
// requestMu synchronizes access between requests. This is especially needed
// for WaitN to ensure that all N tokens are granted to the same request.
// It also allows us to reset the limiter transparently without affecting
// any pending requests.
requestMu sync.Mutex
// stopTicker stops the recurring slide ticker
stopTicker chan struct{}
closeOnce sync.Once
@ -86,6 +91,9 @@ func NewSlidingWindowLimiter(
// Wait blocks a request until a token is available or the context is cancelled.
// TODO(pandeyabs): Implement WaitN.
func (s *slidingWindow) Wait(ctx context.Context) error {
s.requestMu.Lock()
defer s.requestMu.Unlock()
select {
case <-ctx.Done():
return clues.Stack(ctx.Err())
@ -107,8 +115,42 @@ func (s *slidingWindow) Shutdown() {
})
}
// Reset clears the limiter state & refills tokens to original capacity.
// Reset is transparent to request senders, they can continue to call Wait
// even while reset is in progress.
func (s *slidingWindow) Reset() {
// Acquire request mutex and slide mutex in order like Wait does.
s.requestMu.Lock()
defer s.requestMu.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
// Clear all existing counts
s.prev = fixedWindow{
count: make([]int, s.numIntervals),
}
s.curr = fixedWindow{
count: make([]int, s.numIntervals),
}
// Reset permits
close(s.permits)
s.permits = make(chan token, s.capacity)
// Prefill permits to allow tokens to be granted immediately
for i := 0; i < s.capacity; i++ {
s.permits <- token{}
}
}
// initialize starts the slide goroutine and prefills tokens to full capacity.
func (s *slidingWindow) initialize() {
// Hold request mutex to ensure that no requests get processed until
// we are done initializing.
s.requestMu.Lock()
defer s.requestMu.Unlock()
// Ok to not hold the mutex here since nothing else is running yet.
s.nextInterval()

View File

@ -283,3 +283,114 @@ func (suite *SlidingWindowUnitTestSuite) TestShutdown() {
// Second call to Shutdown() should be a no-op.
s.Shutdown()
}
// TestReset tests if limiter state is cleared and all tokens are available for
// use post reset.
func (suite *SlidingWindowUnitTestSuite) TestReset() {
var (
t = suite.T()
windowSize = 100 * time.Millisecond
slideInterval = 10 * time.Millisecond
capacity = 10
numRequests = capacity
wg sync.WaitGroup
)
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t)
defer flush()
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
require.NoError(t, err)
// Make some requests to the limiter.
for i := 0; i < numRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := s.Wait(ctx)
require.NoError(t, err)
}()
}
wg.Wait()
// Reset the limiter.
s.Reset()
// Shutdown the ticker before accessing the internal limiter state.
s.Shutdown()
sw := s.(*slidingWindow)
// Check if state is cleared, and all tokens are available for use post reset.
require.Equal(t, capacity, len(sw.permits))
for i := 0; i < sw.numIntervals; i++ {
require.Equal(t, 0, sw.prev.count[i])
require.Equal(t, 0, sw.curr.count[i])
}
}
// TestResetDuringActiveRequests tests if reset is transparent to any active
// requests and they are not affected. It also checks that limiter stays
// within capacity limits post reset.
func (suite *SlidingWindowUnitTestSuite) TestResetDuringActiveRequests() {
var (
t = suite.T()
windowSize = 100 * time.Millisecond
slideInterval = 10 * time.Millisecond
capacity = 10
numRequests = 10 * capacity
wg sync.WaitGroup
)
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t)
defer flush()
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
require.NoError(t, err)
// Make some requests to the limiter as well as reset it concurrently
// in 10:1 request to reset ratio.
for i := 0; i < numRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := s.Wait(ctx)
require.NoError(t, err)
}()
// Launch a reset every 10th iteration.
if i%10 == 0 {
wg.Add(1)
go func() {
defer wg.Done()
s.Reset()
}()
}
}
wg.Wait()
// Shutdown the ticker before accessing the internal limiter state.
s.Shutdown()
// Verify that number of requests allowed in each window is less than or equal
// to window capacity
sw := s.(*slidingWindow)
data := append(sw.prev.count, sw.curr.count...)
sums := slidingSums(data, sw.numIntervals)
for _, sum := range sums {
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity)
}
}