Address feedback

This commit is contained in:
Abhishek Pandey 2023-11-07 12:45:49 -08:00
parent e7fa269532
commit 4f6e60558a
4 changed files with 32 additions and 23 deletions

View File

@ -34,6 +34,7 @@ require (
github.com/tidwall/pretty v1.2.1 github.com/tidwall/pretty v1.2.1
github.com/tomlazar/table v0.1.2 github.com/tomlazar/table v0.1.2
github.com/vbauerster/mpb/v8 v8.1.6 github.com/vbauerster/mpb/v8 v8.1.6
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.26.0 go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/time v0.3.0 golang.org/x/time v0.3.0

View File

@ -468,8 +468,8 @@ go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPi
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=

View File

@ -47,8 +47,8 @@ type slidingWindow struct {
// mu synchronizes access to the curr and prev windows // mu synchronizes access to the curr and prev windows
mu sync.Mutex mu sync.Mutex
// stopTimer stops the recurring slide timer // stopTicker stops the recurring slide ticker
stopTimer chan struct{} stopTicker chan struct{}
} }
func NewSlidingWindowLimiter( func NewSlidingWindowLimiter(
@ -74,7 +74,7 @@ func NewSlidingWindowLimiter(
count: make([]int, ni), count: make([]int, ni),
}, },
currentInterval: -1, currentInterval: -1,
stopTimer: make(chan struct{}), stopTicker: make(chan struct{}),
} }
s.initialize() s.initialize()
@ -87,7 +87,7 @@ func NewSlidingWindowLimiter(
func (s *slidingWindow) Wait(ctx context.Context) error { func (s *slidingWindow) Wait(ctx context.Context) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return clues.Stack(ctx.Err())
case <-s.permits: case <-s.permits:
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -101,10 +101,7 @@ func (s *slidingWindow) Wait(ctx context.Context) error {
// Shutdown cleans up the slide goroutine. If shutdown is not called, the slide // Shutdown cleans up the slide goroutine. If shutdown is not called, the slide
// goroutine will continue to run until the program exits. // goroutine will continue to run until the program exits.
func (s *slidingWindow) Shutdown() { func (s *slidingWindow) Shutdown() {
select { close(s.stopTicker)
case s.stopTimer <- struct{}{}:
default:
}
} }
// initialize starts the slide goroutine and prefills tokens to full capacity. // initialize starts the slide goroutine and prefills tokens to full capacity.
@ -121,7 +118,7 @@ func (s *slidingWindow) initialize() {
select { select {
case <-ticker.C: case <-ticker.C:
s.slide() s.slide()
case <-s.stopTimer: case <-s.stopTicker:
ticker.Stop() ticker.Stop()
return return
} }
@ -159,12 +156,7 @@ func (s *slidingWindow) slide() {
s.nextInterval() s.nextInterval()
for i := 0; i < s.prev.count[s.currentInterval]; i++ { for i := 0; i < s.prev.count[s.currentInterval]; i++ {
select { s.permits <- token{}
case s.permits <- token{}:
default:
// Skip if permits are at capacity
return
}
} }
} }

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"go.uber.org/goleak"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
) )
@ -38,6 +39,8 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() {
intervalToCount = make(map[time.Duration]int) intervalToCount = make(map[time.Duration]int)
) )
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
@ -77,8 +80,8 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() {
} }
} }
// TestWaitSliding tests the sliding window functionality of the limiter with distributed // TestWaitSliding tests the sliding window functionality of the limiter with
// Wait() calls. // time distributed Wait() calls.
func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() { func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
var ( var (
t = suite.T() t = suite.T()
@ -90,6 +93,8 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
wg sync.WaitGroup wg sync.WaitGroup
) )
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
@ -122,7 +127,7 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
sw := s.(*slidingWindow) sw := s.(*slidingWindow)
data := append(sw.prev.count, sw.curr.count...) data := append(sw.prev.count, sw.curr.count...)
sums := slidingSum(data, sw.numIntervals) sums := slidingSums(data, sw.numIntervals)
for _, sum := range sums { for _, sum := range sums {
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity) require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity)
@ -137,6 +142,8 @@ func (suite *SlidingWindowUnitTestSuite) TestContextCancellation() {
wg sync.WaitGroup wg sync.WaitGroup
) )
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
@ -155,7 +162,7 @@ func (suite *SlidingWindowUnitTestSuite) TestContextCancellation() {
defer wg.Done() defer wg.Done()
err := s.Wait(ctx) err := s.Wait(ctx)
require.Equal(t, context.DeadlineExceeded, err) require.ErrorIs(t, err, context.DeadlineExceeded)
}() }()
wg.Wait() wg.Wait()
@ -197,6 +204,13 @@ func (suite *SlidingWindowUnitTestSuite) TestNewSlidingWindowLimiter() {
capacity: -1, capacity: -1,
expectErr: assert.Error, expectErr: assert.Error,
}, },
{
name: "Window not divisible by slide interval",
windowSize: 100 * time.Millisecond,
slideInterval: 11 * time.Millisecond,
capacity: 100,
expectErr: assert.Error,
},
{ {
name: "Valid parameters", name: "Valid parameters",
windowSize: 100 * time.Millisecond, windowSize: 100 * time.Millisecond,
@ -210,12 +224,14 @@ func (suite *SlidingWindowUnitTestSuite) TestNewSlidingWindowLimiter() {
suite.Run(test.name, func() { suite.Run(test.name, func() {
t := suite.T() t := suite.T()
defer goleak.VerifyNone(t)
s, err := NewSlidingWindowLimiter( s, err := NewSlidingWindowLimiter(
test.windowSize, test.windowSize,
test.slideInterval, test.slideInterval,
test.capacity) test.capacity)
if s != nil { if s != nil {
defer s.Shutdown() s.Shutdown()
} }
test.expectErr(t, err) test.expectErr(t, err)
@ -223,7 +239,7 @@ func (suite *SlidingWindowUnitTestSuite) TestNewSlidingWindowLimiter() {
} }
} }
func slidingSum(data []int, w int) []int { func slidingSums(data []int, w int) []int {
var ( var (
sum = 0 sum = 0
res = make([]int, len(data)-w+1) res = make([]int, len(data)-w+1)