Implement WaitN

This commit is contained in:
Abhishek Pandey 2023-11-08 21:22:04 -08:00
parent a4d7f56396
commit 59c0e7d6d3
3 changed files with 85 additions and 53 deletions

View File

@ -4,5 +4,6 @@ import "context"
type Limiter interface { type Limiter interface {
Wait(ctx context.Context) error Wait(ctx context.Context) error
WaitN(ctx context.Context, N int) error
Shutdown() Shutdown()
} }

View File

@ -84,16 +84,23 @@ func NewSlidingWindowLimiter(
} }
// Wait blocks a request until a token is available or the context is cancelled. // Wait blocks a request until a token is available or the context is cancelled.
// TODO(pandeyabs): Implement WaitN. // Equivalent to calling WaitN(ctx, 1).
func (s *slidingWindow) Wait(ctx context.Context) error { func (s *slidingWindow) Wait(ctx context.Context) error {
return s.WaitN(ctx, 1)
}
// Wait blocks a request until N tokens are available or the context gets
// cancelled.
func (s *slidingWindow) WaitN(ctx context.Context, N int) error {
for i := 0; i < N; i++ {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return clues.Stack(ctx.Err()) return clues.Stack(ctx.Err())
case <-s.permits: case <-s.permits:
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock()
s.curr.count[s.currentInterval]++ s.curr.count[s.currentInterval]++
s.mu.Unlock()
}
} }
return nil return nil

View File

@ -81,28 +81,50 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() {
} }
// TestWaitSliding tests the sliding window functionality of the limiter with // TestWaitSliding tests the sliding window functionality of the limiter with
// time distributed Wait() calls. // time distributed WaitN() calls.
func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() { func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
var ( tests := []struct {
t = suite.T() Name string
windowSize = 1 * time.Second windowSize time.Duration
slideInterval = 10 * time.Millisecond slideInterval time.Duration
capacity = 100 capacity int
// Test will run for duration of 2 windowSize. numRequests int
numRequests = 2 * capacity N int
wg sync.WaitGroup }{
) {
Name: "Request 1 token",
windowSize: 1 * time.Second,
slideInterval: 10 * time.Millisecond,
capacity: 100,
numRequests: 200,
N: 1,
},
{
Name: "Request 5 tokens",
windowSize: 1 * time.Second,
slideInterval: 10 * time.Millisecond,
capacity: 100,
numRequests: 100,
N: 5,
},
}
for _, test := range tests {
suite.Run(test.Name, func() {
t := suite.T()
defer goleak.VerifyNone(t) defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity) s, err := NewSlidingWindowLimiter(test.windowSize, test.slideInterval, test.capacity)
require.NoError(t, err) require.NoError(t, err)
var wg sync.WaitGroup
// Make concurrent requests to the limiter // Make concurrent requests to the limiter
for i := 0; i < numRequests; i++ { for i := 0; i < test.numRequests; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
@ -114,7 +136,7 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
// of the 2 windows. Rest of the intervals will be empty. // of the 2 windows. Rest of the intervals will be empty.
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond) time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
err := s.Wait(ctx) err := s.WaitN(ctx, test.N)
require.NoError(t, err) require.NoError(t, err)
}() }()
} }
@ -131,7 +153,9 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
sums := slidingSums(data, sw.numIntervals) sums := slidingSums(data, sw.numIntervals)
for _, sum := range sums { for _, sum := range sums {
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity) require.True(t, sum <= test.capacity, "sum: %d, capacity: %d", sum, test.capacity)
}
})
} }
} }