Compare commits

...

3 Commits
main ... wait_n

Author SHA1 Message Date
Abhishek Pandey
f10a7bb7b7 Add logs 2023-11-08 22:11:50 -08:00
Abhishek Pandey
4df81e7fd7 Make lowercase arg 2023-11-08 21:38:21 -08:00
Abhishek Pandey
59c0e7d6d3 Implement WaitN 2023-11-08 21:22:04 -08:00
3 changed files with 90 additions and 53 deletions

View File

@ -4,5 +4,6 @@ import "context"
type Limiter interface {
Wait(ctx context.Context) error
WaitN(ctx context.Context, n int) error
Shutdown()
}

View File

@ -84,16 +84,28 @@ func NewSlidingWindowLimiter(
}
// Wait blocks a request until a token is available or the context is cancelled.
// TODO(pandeyabs): Implement WaitN.
// Equivalent to calling WaitN(ctx, 1).
func (s *slidingWindow) Wait(ctx context.Context) error {
select {
case <-ctx.Done():
return clues.Stack(ctx.Err())
case <-s.permits:
s.mu.Lock()
defer s.mu.Unlock()
return s.WaitN(ctx, 1)
}
s.curr.count[s.currentInterval]++
// WaitN blocks a request until n tokens are available or the context gets
// cancelled. WaitN should be called with n <= capacity otherwise it will block
// forever.
//
// TODO(pandeyabs): Enforce n <= capacity check. Not adding it right now because
// we are relying on capacity = 0 for ctx cancellation test, which would need
// some refactoring.
func (s *slidingWindow) WaitN(ctx context.Context, n int) error {
for i := 0; i < n; i++ {
select {
case <-ctx.Done():
return clues.Stack(ctx.Err())
case <-s.permits:
s.mu.Lock()
s.curr.count[s.currentInterval]++
s.mu.Unlock()
}
}
return nil

View File

@ -81,57 +81,81 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() {
}
// TestWaitSliding tests the sliding window functionality of the limiter with
// time distributed Wait() calls.
// time distributed WaitN() calls.
func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
var (
t = suite.T()
windowSize = 1 * time.Second
slideInterval = 10 * time.Millisecond
capacity = 100
// Test will run for duration of 2 windowSize.
numRequests = 2 * capacity
wg sync.WaitGroup
)
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t)
defer flush()
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
require.NoError(t, err)
// Make concurrent requests to the limiter
for i := 0; i < numRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Sleep for a random duration to spread out requests over multiple slide
// intervals & windows, so that we can test the sliding window logic better.
// Without this, the requests will be bunched up in the very first intervals
// of the 2 windows. Rest of the intervals will be empty.
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
err := s.Wait(ctx)
require.NoError(t, err)
}()
tests := []struct {
Name string
windowSize time.Duration
slideInterval time.Duration
capacity int
numRequests int
n int
}{
{
Name: "Request 1 token each",
windowSize: 1 * time.Second,
slideInterval: 10 * time.Millisecond,
capacity: 100,
numRequests: 200,
n: 1,
},
{
Name: "Request 5 tokens each",
windowSize: 1 * time.Second,
slideInterval: 10 * time.Millisecond,
capacity: 100,
numRequests: 100,
n: 5,
},
}
wg.Wait()
// Shutdown the ticker before accessing the internal limiter state.
s.Shutdown()
for _, test := range tests {
suite.Run(test.Name, func() {
t := suite.T()
// Verify that number of requests allowed in each window is less than or equal
// to window capacity
sw := s.(*slidingWindow)
data := append(sw.prev.count, sw.curr.count...)
defer goleak.VerifyNone(t)
sums := slidingSums(data, sw.numIntervals)
ctx, flush := tester.NewContext(t)
defer flush()
for _, sum := range sums {
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity)
s, err := NewSlidingWindowLimiter(test.windowSize, test.slideInterval, test.capacity)
require.NoError(t, err)
var wg sync.WaitGroup
// Make concurrent requests to the limiter
for i := 0; i < test.numRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Sleep for a random duration to spread out requests over multiple slide
// intervals & windows, so that we can test the sliding window logic better.
// Without this, the requests will be bunched up in the very first intervals
// of the 2 windows. Rest of the intervals will be empty.
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
err := s.WaitN(ctx, test.n)
require.NoError(t, err)
}()
}
wg.Wait()
// Shutdown the ticker before accessing the internal limiter state.
s.Shutdown()
// Verify that number of requests allowed in each window is less than or equal
// to window capacity
sw := s.(*slidingWindow)
data := append(sw.prev.count, sw.curr.count...)
sums := slidingSums(data, sw.numIntervals)
for _, sum := range sums {
require.True(t, sum <= test.capacity, "sum: %d, capacity: %d", sum, test.capacity)
}
})
}
}