Add WaitN support to limiter (#4675)

Adds `WaitN` support in sliding window limiter. WaitN blocks the request until all N tokens are acquired.  It addresses a concern raised [here](https://github.com/alcionai/corso/pull/4636#discussion_r1389686910) in an earlier `WaitN` implementation by ensuring that all N tokens get served to each request atomically. 

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [x] 🕐 Yes, but in a later PR
- [ ]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abhishek Pandey 2023-11-15 09:55:17 -08:00 committed by GitHub
parent 551bfd2b13
commit bc9d258ca0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 96 additions and 55 deletions

View File

@ -4,6 +4,7 @@ import "context"
type Limiter interface { type Limiter interface {
Wait(ctx context.Context) error Wait(ctx context.Context) error
WaitN(ctx context.Context, n int) error
Shutdown() Shutdown()
Reset() Reset()
} }

View File

@ -89,21 +89,35 @@ func NewSlidingWindowLimiter(
} }
// Wait blocks a request until a token is available or the context is cancelled. // Wait blocks a request until a token is available or the context is cancelled.
// TODO(pandeyabs): Implement WaitN.
func (s *slidingWindow) Wait(ctx context.Context) error { func (s *slidingWindow) Wait(ctx context.Context) error {
return s.WaitN(ctx, 1)
}
// WaitN blocks a request until n tokens are available or the context gets
// cancelled. WaitN should be called with n <= capacity otherwise it will block
// forever.
//
// TODO(pandeyabs): Enforce n <= capacity check. Not adding it right now because
// we are relying on capacity = 0 for ctx cancellation test, which would need
// some refactoring.
func (s *slidingWindow) WaitN(ctx context.Context, n int) error {
// Acquire request mutex and slide mutex in order.
s.requestMu.Lock() s.requestMu.Lock()
defer s.requestMu.Unlock() defer s.requestMu.Unlock()
select { for i := 0; i < n; i++ {
case <-ctx.Done(): select {
return clues.Stack(ctx.Err()) case <-ctx.Done():
case <-s.permits: return clues.Stack(ctx.Err())
s.mu.Lock() case <-s.permits:
defer s.mu.Unlock() }
s.curr.count[s.currentInterval]++
} }
// Mark n tokens as granted in the current interval.
s.mu.Lock()
defer s.mu.Unlock()
s.curr.count[s.currentInterval] += n
return nil return nil
} }

View File

@ -81,57 +81,83 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() {
} }
// TestWaitSliding tests the sliding window functionality of the limiter with // TestWaitSliding tests the sliding window functionality of the limiter with
// time distributed Wait() calls. // time distributed WaitN() calls.
func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() { func (suite *SlidingWindowUnitTestSuite) TestWaitNSliding() {
var ( tests := []struct {
t = suite.T() Name string
windowSize = 1 * time.Second windowSize time.Duration
slideInterval = 10 * time.Millisecond slideInterval time.Duration
capacity = 100 capacity int
// Test will run for duration of 2 windowSize. numRequests int
numRequests = 2 * capacity n int
wg sync.WaitGroup }{
) {
Name: "Request 1 token each",
defer goleak.VerifyNone(t) windowSize: 100 * time.Millisecond,
slideInterval: 10 * time.Millisecond,
ctx, flush := tester.NewContext(t) capacity: 100,
defer flush() numRequests: 200,
n: 1,
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity) },
require.NoError(t, err) {
Name: "Request N tokens each",
// Make concurrent requests to the limiter windowSize: 100 * time.Millisecond,
for i := 0; i < numRequests; i++ { slideInterval: 10 * time.Millisecond,
wg.Add(1) capacity: 1000,
numRequests: 200,
go func() { n: 10,
defer wg.Done() },
// Sleep for a random duration to spread out requests over multiple slide
// intervals & windows, so that we can test the sliding window logic better.
// Without this, the requests will be bunched up in the very first intervals
// of the 2 windows. Rest of the intervals will be empty.
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
err := s.Wait(ctx)
require.NoError(t, err)
}()
} }
wg.Wait()
// Shutdown the ticker before accessing the internal limiter state. for _, test := range tests {
s.Shutdown() suite.Run(test.Name, func() {
t := suite.T()
// Verify that number of requests allowed in each window is less than or equal defer goleak.VerifyNone(t)
// to window capacity
sw := s.(*slidingWindow)
data := append(sw.prev.count, sw.curr.count...)
sums := slidingSums(data, sw.numIntervals) ctx, flush := tester.NewContext(t)
defer flush()
for _, sum := range sums { s, err := NewSlidingWindowLimiter(test.windowSize, test.slideInterval, test.capacity)
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity) require.NoError(t, err)
var wg sync.WaitGroup
// Make concurrent requests to the limiter
for i := 0; i < test.numRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Sleep for a random duration to spread out requests over
// multiple slide intervals & windows, so that we can test
// the sliding window logic better.
// Without this, the requests will be bunched up in the very
// first interval of the 2 windows. Rest of the intervals
// will be empty.
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
err := s.WaitN(ctx, test.n)
require.NoError(t, err)
}()
}
wg.Wait()
// Shutdown the ticker before accessing the internal limiter state.
s.Shutdown()
// Verify that number of requests allowed in each window is less than or equal
// to window capacity
sw := s.(*slidingWindow)
data := append(sw.prev.count, sw.curr.count...)
sums := slidingSums(data, sw.numIntervals)
for _, sum := range sums {
require.True(t, sum <= test.capacity, "sum: %d, capacity: %d", sum, test.capacity)
}
})
} }
} }