diff --git a/src/go.mod b/src/go.mod index 17ad919ad..bced89a0c 100644 --- a/src/go.mod +++ b/src/go.mod @@ -34,6 +34,7 @@ require ( github.com/tidwall/pretty v1.2.1 github.com/tomlazar/table v0.1.2 github.com/vbauerster/mpb/v8 v8.1.6 + go.uber.org/goleak v1.3.0 go.uber.org/zap v1.26.0 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/time v0.3.0 diff --git a/src/go.sum b/src/go.sum index 3e0a5574e..28780c51e 100644 --- a/src/go.sum +++ b/src/go.sum @@ -468,8 +468,8 @@ go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPi go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= -go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= diff --git a/src/internal/common/limiters/sliding_window.go b/src/internal/common/limiters/sliding_window.go index 8d1b6d8c8..1ba829f34 100644 --- a/src/internal/common/limiters/sliding_window.go +++ b/src/internal/common/limiters/sliding_window.go @@ -47,8 +47,8 @@ type slidingWindow struct { // mu synchronizes access to the curr and prev windows mu sync.Mutex - // stopTimer stops the recurring slide timer - stopTimer chan struct{} + // stopTicker stops the recurring slide ticker + stopTicker chan struct{} } func NewSlidingWindowLimiter( @@ -74,7 +74,7 @@ func NewSlidingWindowLimiter( count: make([]int, ni), }, currentInterval: -1, - stopTimer: make(chan struct{}), + stopTicker: make(chan struct{}), } s.initialize() @@ -87,7 +87,7 @@ func NewSlidingWindowLimiter( func (s *slidingWindow) Wait(ctx context.Context) error { select { case <-ctx.Done(): - return ctx.Err() + return clues.Stack(ctx.Err()) case <-s.permits: s.mu.Lock() defer s.mu.Unlock() @@ -101,10 +101,7 @@ func (s *slidingWindow) Wait(ctx context.Context) error { // Shutdown cleans up the slide goroutine. If shutdown is not called, the slide // goroutine will continue to run until the program exits. func (s *slidingWindow) Shutdown() { - select { - case s.stopTimer <- struct{}{}: - default: - } + close(s.stopTicker) } // initialize starts the slide goroutine and prefills tokens to full capacity. @@ -121,7 +118,7 @@ func (s *slidingWindow) initialize() { select { case <-ticker.C: s.slide() - case <-s.stopTimer: + case <-s.stopTicker: ticker.Stop() return } @@ -159,12 +156,7 @@ func (s *slidingWindow) slide() { s.nextInterval() for i := 0; i < s.prev.count[s.currentInterval]; i++ { - select { - case s.permits <- token{}: - default: - // Skip if permits are at capacity - return - } + s.permits <- token{} } } diff --git a/src/internal/common/limiters/sliding_window_test.go b/src/internal/common/limiters/sliding_window_test.go index eef9f03b9..712283fa8 100644 --- a/src/internal/common/limiters/sliding_window_test.go +++ b/src/internal/common/limiters/sliding_window_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.uber.org/goleak" "github.com/alcionai/corso/src/internal/tester" ) @@ -38,6 +39,8 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() { intervalToCount = make(map[time.Duration]int) ) + defer goleak.VerifyNone(t) + ctx, flush := tester.NewContext(t) defer flush() @@ -77,8 +80,8 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() { } } -// TestWaitSliding tests the sliding window functionality of the limiter with distributed -// Wait() calls. +// TestWaitSliding tests the sliding window functionality of the limiter with +// time distributed Wait() calls. func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() { var ( t = suite.T() @@ -90,6 +93,8 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() { wg sync.WaitGroup ) + defer goleak.VerifyNone(t) + ctx, flush := tester.NewContext(t) defer flush() @@ -122,7 +127,7 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() { sw := s.(*slidingWindow) data := append(sw.prev.count, sw.curr.count...) - sums := slidingSum(data, sw.numIntervals) + sums := slidingSums(data, sw.numIntervals) for _, sum := range sums { require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity) @@ -137,6 +142,8 @@ func (suite *SlidingWindowUnitTestSuite) TestContextCancellation() { wg sync.WaitGroup ) + defer goleak.VerifyNone(t) + ctx, flush := tester.NewContext(t) defer flush() @@ -155,7 +162,7 @@ func (suite *SlidingWindowUnitTestSuite) TestContextCancellation() { defer wg.Done() err := s.Wait(ctx) - require.Equal(t, context.DeadlineExceeded, err) + require.ErrorIs(t, err, context.DeadlineExceeded) }() wg.Wait() @@ -197,6 +204,13 @@ func (suite *SlidingWindowUnitTestSuite) TestNewSlidingWindowLimiter() { capacity: -1, expectErr: assert.Error, }, + { + name: "Window not divisible by slide interval", + windowSize: 100 * time.Millisecond, + slideInterval: 11 * time.Millisecond, + capacity: 100, + expectErr: assert.Error, + }, { name: "Valid parameters", windowSize: 100 * time.Millisecond, @@ -210,12 +224,14 @@ func (suite *SlidingWindowUnitTestSuite) TestNewSlidingWindowLimiter() { suite.Run(test.name, func() { t := suite.T() + defer goleak.VerifyNone(t) + s, err := NewSlidingWindowLimiter( test.windowSize, test.slideInterval, test.capacity) if s != nil { - defer s.Shutdown() + s.Shutdown() } test.expectErr(t, err) @@ -223,7 +239,7 @@ func (suite *SlidingWindowUnitTestSuite) TestNewSlidingWindowLimiter() { } } -func slidingSum(data []int, w int) []int { +func slidingSums(data []int, w int) []int { var ( sum = 0 res = make([]int, len(data)-w+1)