Add limiter reset functionality

This commit is contained in:
Abhishek Pandey 2023-11-13 23:32:00 -08:00
parent bb1f287e6a
commit 910767c7d2
3 changed files with 154 additions and 0 deletions

View File

@ -5,4 +5,5 @@ import "context"
type Limiter interface { type Limiter interface {
Wait(ctx context.Context) error Wait(ctx context.Context) error
Shutdown() Shutdown()
Reset()
} }

View File

@ -47,6 +47,11 @@ type slidingWindow struct {
// mu synchronizes access to the curr and prev windows // mu synchronizes access to the curr and prev windows
mu sync.Mutex mu sync.Mutex
// requestMu synchronizes access between requests. This is especially needed
// for WaitN to ensure that all N tokens are granted to the same request.
// It also allows us to reset the limiter transparently without affecting
// any pending requests.
requestMu sync.Mutex
// stopTicker stops the recurring slide ticker // stopTicker stops the recurring slide ticker
stopTicker chan struct{} stopTicker chan struct{}
closeOnce sync.Once closeOnce sync.Once
@ -86,6 +91,9 @@ func NewSlidingWindowLimiter(
// Wait blocks a request until a token is available or the context is cancelled. // Wait blocks a request until a token is available or the context is cancelled.
// TODO(pandeyabs): Implement WaitN. // TODO(pandeyabs): Implement WaitN.
func (s *slidingWindow) Wait(ctx context.Context) error { func (s *slidingWindow) Wait(ctx context.Context) error {
s.requestMu.Lock()
defer s.requestMu.Unlock()
select { select {
case <-ctx.Done(): case <-ctx.Done():
return clues.Stack(ctx.Err()) return clues.Stack(ctx.Err())
@ -107,8 +115,42 @@ func (s *slidingWindow) Shutdown() {
}) })
} }
// Reset resets the limiter to its initial state. Any pending requests will be
// cancelled. Reset should only be called when the limiter is not in use.
// No need to shutdown slide goroutine. prev and curr
func (s *slidingWindow) Reset() {
// Acquire request mutex and slide mutex in order like Wait does.
s.requestMu.Lock()
defer s.requestMu.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
// Clear all existing counts
s.prev = fixedWindow{
count: make([]int, s.numIntervals),
}
s.curr = fixedWindow{
count: make([]int, s.numIntervals),
}
// Reset permits
close(s.permits)
s.permits = make(chan token, s.capacity)
// Prefill permits to allow tokens to be granted immediately
for i := 0; i < s.capacity; i++ {
s.permits <- token{}
}
}
// initialize starts the slide goroutine and prefills tokens to full capacity. // initialize starts the slide goroutine and prefills tokens to full capacity.
func (s *slidingWindow) initialize() { func (s *slidingWindow) initialize() {
// Hold request mutex to ensure that no requests get processed until
// we are done initializing.
s.requestMu.Lock()
defer s.requestMu.Unlock()
// Ok to not hold the mutex here since nothing else is running yet. // Ok to not hold the mutex here since nothing else is running yet.
s.nextInterval() s.nextInterval()

View File

@ -283,3 +283,114 @@ func (suite *SlidingWindowUnitTestSuite) TestShutdown() {
// Second call to Shutdown() should be a no-op. // Second call to Shutdown() should be a no-op.
s.Shutdown() s.Shutdown()
} }
// TestReset tests if limiter state is cleared and all tokens are available for
// use post reset.
func (suite *SlidingWindowUnitTestSuite) TestReset() {
var (
t = suite.T()
windowSize = 100 * time.Millisecond
slideInterval = 10 * time.Millisecond
capacity = 10
numRequests = capacity
wg sync.WaitGroup
)
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t)
defer flush()
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
require.NoError(t, err)
// Make some requests to the limiter.
for i := 0; i < numRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := s.Wait(ctx)
require.NoError(t, err)
}()
}
wg.Wait()
// Reset the limiter.
s.Reset()
// Shutdown the ticker before accessing the internal limiter state.
s.Shutdown()
sw := s.(*slidingWindow)
// Check if state is cleared, and all tokens are available for use post reset.
require.Equal(t, capacity, len(sw.permits))
for i := 0; i < sw.numIntervals; i++ {
require.Equal(t, 0, sw.prev.count[i])
require.Equal(t, 0, sw.curr.count[i])
}
}
// TestResetDuringActiveRequests tests if reset is transparent to any active
// requests and they are not affected. It also checks that limiter stays
// within capacity limits post reset.
func (suite *SlidingWindowUnitTestSuite) TestResetDuringActiveRequests() {
var (
t = suite.T()
windowSize = 100 * time.Millisecond
slideInterval = 10 * time.Millisecond
capacity = 10
numRequests = 10 * capacity
wg sync.WaitGroup
)
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t)
defer flush()
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
require.NoError(t, err)
// Make some requests to the limiter as well as reset it concurrently
// in 10:1 request to reset ratio.
for i := 0; i < numRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := s.Wait(ctx)
require.NoError(t, err)
}()
// Launch a reset every 10th iteration.
if i%10 == 0 {
wg.Add(1)
go func() {
defer wg.Done()
s.Reset()
}()
}
}
wg.Wait()
// Shutdown the ticker before accessing the internal limiter state.
s.Shutdown()
// Verify that number of requests allowed in each window is less than or equal
// to window capacity
sw := s.(*slidingWindow)
data := append(sw.prev.count, sw.curr.count...)
sums := slidingSums(data, sw.numIntervals)
for _, sum := range sums {
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity)
}
}