Compare commits

...

6 Commits

Author SHA1 Message Date
Abhishek Pandey
89914d8639 test 2023-11-08 16:34:51 -08:00
Abhishek Pandey
4f6e60558a Address feedback 2023-11-07 12:45:49 -08:00
Abhishek Pandey
e7fa269532 Remove stray prints 2023-11-07 08:52:32 -08:00
Abhishek Pandey
969d1fb6b6 Remove goleak usage 2023-11-06 16:10:11 -08:00
Abhishek Pandey
110b477346 Var renaming & tests 2023-11-06 15:41:02 -08:00
Abhishek Pandey
07740938fb Add sliding window rate limiter 2023-11-02 03:08:55 -07:00
6 changed files with 470 additions and 5 deletions

View File

@ -34,6 +34,7 @@ require (
github.com/tidwall/pretty v1.2.1
github.com/tomlazar/table v0.1.2
github.com/vbauerster/mpb/v8 v8.1.6
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/time v0.3.0

View File

@ -468,8 +468,8 @@ go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPi
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo=

View File

@ -0,0 +1,8 @@
package limiters
import "context"
type Limiter interface {
Wait(ctx context.Context) error
Shutdown()
}

View File

@ -0,0 +1,189 @@
package limiters
import (
"context"
"sync"
"time"
"github.com/alcionai/clues"
)
type token struct{}
type fixedWindow struct {
count []int
}
var _ Limiter = &slidingWindow{}
type slidingWindow struct {
// capacity is the maximum number of requests allowed in a sliding window at
// any given time.
capacity int
// windowSize is the total duration of the sliding window. Limiter will allow
// at most capacity requests in this duration.
windowSize time.Duration
// slideInterval controls how frequently the window slides. Smaller interval
// provides better accuracy at the cost of more frequent sliding & more
// memory usage.
slideInterval time.Duration
// numIntervals is the number of intervals in the window. Calculated as
// windowSize / slideInterval.
numIntervals int
// currentInterval tracks the current slide interval
currentInterval int
// Each request acquires a token from the permits channel. If the channel
// is empty, the request is blocked until a permit is available or if the
// context is cancelled.
permits chan token
// curr and prev are fixed windows of size windowSize. Each window contains
// a slice of intervals which hold a count of the number of tokens granted
// during that interval.
curr fixedWindow
prev fixedWindow
// mu synchronizes access to the curr and prev windows
mu sync.Mutex
// stopTicker stops the recurring slide ticker
stopTicker chan struct{}
}
func NewSlidingWindowLimiter(
windowSize, slideInterval time.Duration,
capacity int,
) (Limiter, error) {
if err := validate(windowSize, slideInterval, capacity); err != nil {
return nil, err
}
ni := int(windowSize / slideInterval)
s := &slidingWindow{
windowSize: windowSize,
slideInterval: slideInterval,
capacity: capacity,
permits: make(chan token, capacity),
numIntervals: ni,
prev: fixedWindow{
count: make([]int, ni),
},
curr: fixedWindow{
count: make([]int, ni),
},
currentInterval: -1,
stopTicker: make(chan struct{}),
}
s.initialize()
return s, nil
}
// Wait blocks a request until a token is available or the context is cancelled.
// TODO(pandeyabs): Implement WaitN.
func (s *slidingWindow) Wait(ctx context.Context) error {
select {
case <-ctx.Done():
return clues.Stack(ctx.Err())
case <-s.permits:
s.mu.Lock()
defer s.mu.Unlock()
s.curr.count[s.currentInterval]++
}
return nil
}
// Shutdown cleans up the slide goroutine. If shutdown is not called, the slide
// goroutine will continue to run until the program exits.
func (s *slidingWindow) Shutdown() {
close(s.stopTicker)
}
// initialize starts the slide goroutine and prefills tokens to full capacity.
func (s *slidingWindow) initialize() {
// Ok to not hold the mutex here since nothing else is running yet.
s.nextInterval()
// Start a goroutine which runs every slideInterval. This goroutine will
// continue to run until the program exits or until Shutdown is called.
go func() {
ticker := time.NewTicker(s.slideInterval)
for {
select {
case <-ticker.C:
s.slide()
case <-s.stopTicker:
ticker.Stop()
return
}
}
}()
// Prefill permits to allow tokens to be granted immediately
for i := 0; i < s.capacity; i++ {
s.permits <- token{}
}
}
// nextInterval increments the current interval and slides the fixed
// windows if needed. Should be called with the mutex held.
func (s *slidingWindow) nextInterval() {
// Increment current interval
s.currentInterval = (s.currentInterval + 1) % s.numIntervals
// Slide the fixed windows if windowSize time has elapsed.
if s.currentInterval == 0 {
s.prev = s.curr
s.curr = fixedWindow{
count: make([]int, s.numIntervals),
}
}
}
// slide moves the window forward by one interval. It reclaims tokens from the
// interval that we slid past and adds them back to available permits. If the
// permits are already at capacity, excess tokens are discarded.
func (s *slidingWindow) slide() {
s.mu.Lock()
defer s.mu.Unlock()
s.nextInterval()
for i := 0; i < s.prev.count[s.currentInterval]; i++ {
s.permits <- token{}
}
}
func validate(
windowSize, slideInterval time.Duration,
capacity int,
) error {
if windowSize <= 0 {
return clues.New("invalid window size")
}
if slideInterval <= 0 {
return clues.New("invalid slide interval")
}
// Allow capacity to be 0 for testing purposes
if capacity < 0 {
return clues.New("invalid window capacity")
}
if windowSize < slideInterval {
return clues.New("window too small to fit intervals")
}
if windowSize%slideInterval != 0 {
return clues.New("window not divisible by slide interval")
}
return nil
}

View File

@ -0,0 +1,260 @@
package limiters
import (
"context"
"math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/goleak"
"github.com/alcionai/corso/src/internal/tester"
)
type SlidingWindowUnitTestSuite struct {
tester.Suite
}
func TestSlidingWindowLimiterSuite(t *testing.T) {
suite.Run(t, &SlidingWindowUnitTestSuite{Suite: tester.NewUnitSuite(t)})
}
// TestWaitBasic tests the Wait() functionality of the limiter with multiple
// concurrent requests.
func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() {
var (
t = suite.T()
windowSize = 1 * time.Second
// Assume slide interval is equal to window size for simplicity.
slideInterval = 1 * time.Second
capacity = 100
startTime = time.Now()
numRequests = 3 * capacity
wg sync.WaitGroup
mu sync.Mutex
intervalToCount = make(map[time.Duration]int)
)
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t)
defer flush()
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
require.NoError(t, err)
defer s.Shutdown()
// Check if all tokens are available for use post initialization.
require.Equal(t, capacity, len(s.(*slidingWindow).permits))
// Make concurrent requests to the limiter
for i := 0; i < numRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := s.Wait(ctx)
require.NoError(t, err)
// Number of seconds since startTime
bucket := time.Since(startTime).Truncate(windowSize)
mu.Lock()
intervalToCount[bucket]++
mu.Unlock()
}()
}
wg.Wait()
// Verify that number of requests allowed in each window is less than or equal
// to window capacity
for _, c := range intervalToCount {
require.True(t, c <= capacity, "count: %d, capacity: %d", c, capacity)
}
}
// TestWaitSliding tests the sliding window functionality of the limiter with
// time distributed Wait() calls.
func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
var (
t = suite.T()
windowSize = 1 * time.Second
slideInterval = 10 * time.Millisecond
capacity = 100
// Test will run for duration of 2 windowSize.
numRequests = 2 * capacity
wg sync.WaitGroup
)
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t)
defer flush()
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
require.NoError(t, err)
defer s.Shutdown()
// Make concurrent requests to the limiter
for i := 0; i < numRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Sleep for a random duration to spread out requests over multiple slide
// intervals & windows, so that we can test the sliding window logic better.
// Without this, the requests will be bunched up in the very first intervals
// of the 2 windows. Rest of the intervals will be empty.
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
err := s.Wait(ctx)
require.NoError(t, err)
}()
}
wg.Wait()
// Verify that number of requests allowed in each window is less than or equal
// to window capacity
sw := s.(*slidingWindow)
data := append(sw.prev.count, sw.curr.count...)
sums := slidingSums(data, sw.numIntervals)
for _, sum := range sums {
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity)
}
}
func (suite *SlidingWindowUnitTestSuite) TestContextCancellation() {
var (
t = suite.T()
windowSize = 100 * time.Millisecond
slideInterval = 10 * time.Millisecond
wg sync.WaitGroup
)
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t)
defer flush()
// Initialize limiter with capacity = 0 to test context cancellations.
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, 0)
require.NoError(t, err)
defer s.Shutdown()
ctx, cancel := context.WithTimeout(ctx, 2*windowSize)
defer cancel()
wg.Add(1)
go func() {
defer wg.Done()
err := s.Wait(ctx)
require.ErrorIs(t, err, context.DeadlineExceeded)
}()
wg.Wait()
}
func (suite *SlidingWindowUnitTestSuite) TestNewSlidingWindowLimiter() {
tests := []struct {
name string
windowSize time.Duration
slideInterval time.Duration
capacity int
expectErr assert.ErrorAssertionFunc
}{
{
name: "Invalid window size",
windowSize: 0,
slideInterval: 10 * time.Millisecond,
capacity: 100,
expectErr: assert.Error,
},
{
name: "Invalid slide interval",
windowSize: 100 * time.Millisecond,
slideInterval: 0,
capacity: 100,
expectErr: assert.Error,
},
{
name: "Slide interval > window size",
windowSize: 10 * time.Millisecond,
slideInterval: 100 * time.Millisecond,
capacity: 100,
expectErr: assert.Error,
},
{
name: "Invalid capacity",
windowSize: 100 * time.Millisecond,
slideInterval: 10 * time.Millisecond,
capacity: -1,
expectErr: assert.Error,
},
{
name: "Window not divisible by slide interval",
windowSize: 100 * time.Millisecond,
slideInterval: 11 * time.Millisecond,
capacity: 100,
expectErr: assert.Error,
},
{
name: "Valid parameters",
windowSize: 100 * time.Millisecond,
slideInterval: 10 * time.Millisecond,
capacity: 100,
expectErr: assert.NoError,
},
}
for _, test := range tests {
suite.Run(test.name, func() {
t := suite.T()
defer goleak.VerifyNone(t)
s, err := NewSlidingWindowLimiter(
test.windowSize,
test.slideInterval,
test.capacity)
if s != nil {
s.Shutdown()
}
test.expectErr(t, err)
})
}
}
func slidingSums(data []int, w int) []int {
var (
sum = 0
res = make([]int, len(data)-w+1)
)
for i := 0; i < w; i++ {
sum += data[i]
}
res[0] = sum
for i := 1; i < len(data)-w+1; i++ {
sum = sum - data[i-1] + data[i+w-1]
res[i] = sum
}
return res
}

View File

@ -11,6 +11,7 @@ import (
khttp "github.com/microsoft/kiota-http-go"
"golang.org/x/time/rate"
limiters "github.com/alcionai/corso/src/internal/common/limiters"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -102,6 +103,8 @@ var (
driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap)
// also used as the exchange service limiter
defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap)
exchLim, _ = limiters.NewSlidingWindowLimiter(610*time.Second, 1*time.Second, 9800)
)
type LimiterCfg struct {
@ -185,10 +188,14 @@ func ctxLimiterConsumption(ctx context.Context, defaultConsumption int) int {
// calls-per-minute rate. Otherwise, the call will wait in a queue until
// the next token set is available.
func QueueRequest(ctx context.Context) {
limiter := ctxLimiter(ctx)
consume := ctxLimiterConsumption(ctx, defaultLC)
// limiter := ctxLimiter(ctx)
// consume := ctxLimiterConsumption(ctx, defaultLC)
if err := limiter.WaitN(ctx, consume); err != nil {
// if err := limiter.WaitN(ctx, consume); err != nil {
// logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter")
// }
if err := exchLim.Wait(ctx); err != nil {
logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter")
}
}