Compare commits
11 Commits
main
...
lim_tuning
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
314fb5f847 | ||
|
|
ea9f6d97b0 | ||
|
|
f6b83f3e7e | ||
|
|
4e117b5703 | ||
|
|
3b6d4bfe8b | ||
|
|
5bab966f67 | ||
|
|
4b959098a8 | ||
|
|
42ae33c436 | ||
|
|
ac0fa41c9c | ||
|
|
cd2181193b | ||
|
|
910767c7d2 |
@ -85,6 +85,7 @@ func addExchangeCommands(cmd *cobra.Command) *cobra.Command {
|
|||||||
flags.AddDisableConcurrencyLimiterFlag(c)
|
flags.AddDisableConcurrencyLimiterFlag(c)
|
||||||
flags.AddDeltaPageSizeFlag(c)
|
flags.AddDeltaPageSizeFlag(c)
|
||||||
flags.AddGenericBackupFlags(c)
|
flags.AddGenericBackupFlags(c)
|
||||||
|
flags.AddDisableSlidingWindowLimiterFlag(c)
|
||||||
|
|
||||||
case listCommand:
|
case listCommand:
|
||||||
c, fs = utils.AddCommand(cmd, exchangeListCmd())
|
c, fs = utils.AddCommand(cmd, exchangeListCmd())
|
||||||
|
|||||||
@ -109,6 +109,7 @@ func (suite *ExchangeUnitSuite) TestBackupCreateFlags() {
|
|||||||
"--" + flags.DisableDeltaFN,
|
"--" + flags.DisableDeltaFN,
|
||||||
"--" + flags.EnableImmutableIDFN,
|
"--" + flags.EnableImmutableIDFN,
|
||||||
"--" + flags.DisableConcurrencyLimiterFN,
|
"--" + flags.DisableConcurrencyLimiterFN,
|
||||||
|
"--" + flags.DisableSlidingWindowLimiterFN,
|
||||||
},
|
},
|
||||||
flagsTD.PreparedGenericBackupFlags(),
|
flagsTD.PreparedGenericBackupFlags(),
|
||||||
flagsTD.PreparedProviderFlags(),
|
flagsTD.PreparedProviderFlags(),
|
||||||
@ -126,6 +127,7 @@ func (suite *ExchangeUnitSuite) TestBackupCreateFlags() {
|
|||||||
assert.True(t, co.ToggleFeatures.DisableDelta)
|
assert.True(t, co.ToggleFeatures.DisableDelta)
|
||||||
assert.True(t, co.ToggleFeatures.ExchangeImmutableIDs)
|
assert.True(t, co.ToggleFeatures.ExchangeImmutableIDs)
|
||||||
assert.True(t, co.ToggleFeatures.DisableConcurrencyLimiter)
|
assert.True(t, co.ToggleFeatures.DisableConcurrencyLimiter)
|
||||||
|
assert.True(t, co.ToggleFeatures.DisableSlidingWindowLimiter)
|
||||||
flagsTD.AssertGenericBackupFlags(t, cmd)
|
flagsTD.AssertGenericBackupFlags(t, cmd)
|
||||||
flagsTD.AssertProviderFlags(t, cmd)
|
flagsTD.AssertProviderFlags(t, cmd)
|
||||||
flagsTD.AssertStorageFlags(t, cmd)
|
flagsTD.AssertStorageFlags(t, cmd)
|
||||||
|
|||||||
@ -21,6 +21,7 @@ const (
|
|||||||
RunModeFN = "run-mode"
|
RunModeFN = "run-mode"
|
||||||
SkippedItemsFN = "skipped-items"
|
SkippedItemsFN = "skipped-items"
|
||||||
SkipReduceFN = "skip-reduce"
|
SkipReduceFN = "skip-reduce"
|
||||||
|
DisableSlidingWindowLimiterFN = "disable-sliding-limiter"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -42,6 +43,7 @@ var (
|
|||||||
RunModeFV string
|
RunModeFV string
|
||||||
NoPermissionsFV bool
|
NoPermissionsFV bool
|
||||||
SkipReduceFV bool
|
SkipReduceFV bool
|
||||||
|
DisableSlidingWindowLimiterFV bool
|
||||||
)
|
)
|
||||||
|
|
||||||
// well-known flag values
|
// well-known flag values
|
||||||
@ -174,3 +176,18 @@ func AddRunModeFlag(cmd *cobra.Command, persistent bool) {
|
|||||||
fs.StringVar(&RunModeFV, RunModeFN, "run", "What mode to run: dry, test, run. Defaults to run.")
|
fs.StringVar(&RunModeFV, RunModeFN, "run", "What mode to run: dry, test, run. Defaults to run.")
|
||||||
cobra.CheckErr(fs.MarkHidden(RunModeFN))
|
cobra.CheckErr(fs.MarkHidden(RunModeFN))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddDisableSlidingWindowLimiterFN disables the experimental sliding window rate
|
||||||
|
// limiter for graph API requests. This is only relevant for exchange backups.
|
||||||
|
// Exchange restores continue to use the default token bucket rate limiter.
|
||||||
|
// Setting this flag switches exchange backups to use the default token bucket
|
||||||
|
// rate limiter.
|
||||||
|
func AddDisableSlidingWindowLimiterFlag(cmd *cobra.Command) {
|
||||||
|
fs := cmd.Flags()
|
||||||
|
fs.BoolVar(
|
||||||
|
&DisableSlidingWindowLimiterFV,
|
||||||
|
DisableSlidingWindowLimiterFN,
|
||||||
|
false,
|
||||||
|
"Disable sliding window rate limiter. Default: false")
|
||||||
|
cobra.CheckErr(fs.MarkHidden(DisableSlidingWindowLimiterFN))
|
||||||
|
}
|
||||||
|
|||||||
@ -27,6 +27,7 @@ func Control() control.Options {
|
|||||||
opt.ToggleFeatures.DisableDelta = flags.DisableDeltaFV
|
opt.ToggleFeatures.DisableDelta = flags.DisableDeltaFV
|
||||||
opt.ToggleFeatures.ExchangeImmutableIDs = flags.EnableImmutableIDFV
|
opt.ToggleFeatures.ExchangeImmutableIDs = flags.EnableImmutableIDFV
|
||||||
opt.ToggleFeatures.DisableConcurrencyLimiter = flags.DisableConcurrencyLimiterFV
|
opt.ToggleFeatures.DisableConcurrencyLimiter = flags.DisableConcurrencyLimiterFV
|
||||||
|
opt.ToggleFeatures.DisableSlidingWindowLimiter = flags.DisableSlidingWindowLimiterFV
|
||||||
opt.Parallelism.ItemFetch = flags.FetchParallelismFV
|
opt.Parallelism.ItemFetch = flags.FetchParallelismFV
|
||||||
|
|
||||||
return opt
|
return opt
|
||||||
|
|||||||
@ -37,6 +37,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
|
|||||||
assert.Equal(t, 2, flags.FetchParallelismFV, flags.FetchParallelismFN)
|
assert.Equal(t, 2, flags.FetchParallelismFV, flags.FetchParallelismFN)
|
||||||
assert.True(t, flags.DisableConcurrencyLimiterFV, flags.DisableConcurrencyLimiterFN)
|
assert.True(t, flags.DisableConcurrencyLimiterFV, flags.DisableConcurrencyLimiterFN)
|
||||||
assert.Equal(t, 499, flags.DeltaPageSizeFV, flags.DeltaPageSizeFN)
|
assert.Equal(t, 499, flags.DeltaPageSizeFV, flags.DeltaPageSizeFN)
|
||||||
|
assert.True(t, flags.DisableSlidingWindowLimiterFV, flags.DisableSlidingWindowLimiterFN)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -52,6 +53,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
|
|||||||
flags.AddFetchParallelismFlag(cmd)
|
flags.AddFetchParallelismFlag(cmd)
|
||||||
flags.AddDisableConcurrencyLimiterFlag(cmd)
|
flags.AddDisableConcurrencyLimiterFlag(cmd)
|
||||||
flags.AddDeltaPageSizeFlag(cmd)
|
flags.AddDeltaPageSizeFlag(cmd)
|
||||||
|
flags.AddDisableSlidingWindowLimiterFlag(cmd)
|
||||||
|
|
||||||
// Test arg parsing for few args
|
// Test arg parsing for few args
|
||||||
cmd.SetArgs([]string{
|
cmd.SetArgs([]string{
|
||||||
@ -66,6 +68,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
|
|||||||
"--" + flags.FetchParallelismFN, "2",
|
"--" + flags.FetchParallelismFN, "2",
|
||||||
"--" + flags.DisableConcurrencyLimiterFN,
|
"--" + flags.DisableConcurrencyLimiterFN,
|
||||||
"--" + flags.DeltaPageSizeFN, "499",
|
"--" + flags.DeltaPageSizeFN, "499",
|
||||||
|
"--" + flags.DisableSlidingWindowLimiterFN,
|
||||||
})
|
})
|
||||||
|
|
||||||
err := cmd.Execute()
|
err := cmd.Execute()
|
||||||
|
|||||||
@ -4,5 +4,7 @@ import "context"
|
|||||||
|
|
||||||
type Limiter interface {
|
type Limiter interface {
|
||||||
Wait(ctx context.Context) error
|
Wait(ctx context.Context) error
|
||||||
|
WaitN(ctx context.Context, n int) error
|
||||||
Shutdown()
|
Shutdown()
|
||||||
|
Reset()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -47,6 +47,11 @@ type slidingWindow struct {
|
|||||||
|
|
||||||
// mu synchronizes access to the curr and prev windows
|
// mu synchronizes access to the curr and prev windows
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
// requestMu synchronizes access between requests. This is especially needed
|
||||||
|
// for WaitN to ensure that all N tokens are granted to the same request.
|
||||||
|
// It also allows us to reset the limiter transparently without affecting
|
||||||
|
// any pending requests.
|
||||||
|
requestMu sync.Mutex
|
||||||
// stopTicker stops the recurring slide ticker
|
// stopTicker stops the recurring slide ticker
|
||||||
stopTicker chan struct{}
|
stopTicker chan struct{}
|
||||||
closeOnce sync.Once
|
closeOnce sync.Once
|
||||||
@ -84,16 +89,31 @@ func NewSlidingWindowLimiter(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Wait blocks a request until a token is available or the context is cancelled.
|
// Wait blocks a request until a token is available or the context is cancelled.
|
||||||
// TODO(pandeyabs): Implement WaitN.
|
|
||||||
func (s *slidingWindow) Wait(ctx context.Context) error {
|
func (s *slidingWindow) Wait(ctx context.Context) error {
|
||||||
|
return s.WaitN(ctx, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WaitN blocks a request until n tokens are available or the context gets
|
||||||
|
// cancelled. WaitN should be called with n <= capacity otherwise it will block
|
||||||
|
// forever.
|
||||||
|
//
|
||||||
|
// TODO(pandeyabs): Enforce n <= capacity check. Not adding it right now because
|
||||||
|
// we are relying on capacity = 0 for ctx cancellation test, which would need
|
||||||
|
// some refactoring.
|
||||||
|
func (s *slidingWindow) WaitN(ctx context.Context, n int) error {
|
||||||
|
// Acquire request mutex and slide mutex in order.
|
||||||
|
s.requestMu.Lock()
|
||||||
|
defer s.requestMu.Unlock()
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return clues.Stack(ctx.Err())
|
return clues.Stack(ctx.Err())
|
||||||
case <-s.permits:
|
case <-s.permits:
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
s.curr.count[s.currentInterval]++
|
s.curr.count[s.currentInterval]++
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -107,8 +127,42 @@ func (s *slidingWindow) Shutdown() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reset resets the limiter to its initial state. Any pending requests will be
|
||||||
|
// cancelled. Reset should only be called when the limiter is not in use.
|
||||||
|
// No need to shutdown slide goroutine. prev and curr
|
||||||
|
func (s *slidingWindow) Reset() {
|
||||||
|
// Acquire request mutex and slide mutex in order like Wait does.
|
||||||
|
s.requestMu.Lock()
|
||||||
|
defer s.requestMu.Unlock()
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
// Clear all existing counts
|
||||||
|
s.prev = fixedWindow{
|
||||||
|
count: make([]int, s.numIntervals),
|
||||||
|
}
|
||||||
|
s.curr = fixedWindow{
|
||||||
|
count: make([]int, s.numIntervals),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset permits
|
||||||
|
close(s.permits)
|
||||||
|
s.permits = make(chan token, s.capacity)
|
||||||
|
|
||||||
|
// Prefill permits to allow tokens to be granted immediately
|
||||||
|
for i := 0; i < s.capacity; i++ {
|
||||||
|
s.permits <- token{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// initialize starts the slide goroutine and prefills tokens to full capacity.
|
// initialize starts the slide goroutine and prefills tokens to full capacity.
|
||||||
func (s *slidingWindow) initialize() {
|
func (s *slidingWindow) initialize() {
|
||||||
|
// Hold request mutex to ensure that no requests get processed until
|
||||||
|
// we are done initializing.
|
||||||
|
s.requestMu.Lock()
|
||||||
|
defer s.requestMu.Unlock()
|
||||||
|
|
||||||
// Ok to not hold the mutex here since nothing else is running yet.
|
// Ok to not hold the mutex here since nothing else is running yet.
|
||||||
s.nextInterval()
|
s.nextInterval()
|
||||||
|
|
||||||
|
|||||||
@ -81,40 +81,64 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TestWaitSliding tests the sliding window functionality of the limiter with
|
// TestWaitSliding tests the sliding window functionality of the limiter with
|
||||||
// time distributed Wait() calls.
|
// time distributed WaitN() calls.
|
||||||
func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
|
func (suite *SlidingWindowUnitTestSuite) TestWaitNSliding() {
|
||||||
var (
|
tests := []struct {
|
||||||
t = suite.T()
|
Name string
|
||||||
windowSize = 1 * time.Second
|
windowSize time.Duration
|
||||||
slideInterval = 10 * time.Millisecond
|
slideInterval time.Duration
|
||||||
capacity = 100
|
capacity int
|
||||||
// Test will run for duration of 2 windowSize.
|
numRequests int
|
||||||
numRequests = 2 * capacity
|
n int
|
||||||
wg sync.WaitGroup
|
}{
|
||||||
)
|
{
|
||||||
|
Name: "Request 1 token each",
|
||||||
|
windowSize: 100 * time.Millisecond,
|
||||||
|
slideInterval: 10 * time.Millisecond,
|
||||||
|
capacity: 100,
|
||||||
|
numRequests: 200,
|
||||||
|
n: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Request N tokens each",
|
||||||
|
windowSize: 100 * time.Millisecond,
|
||||||
|
slideInterval: 10 * time.Millisecond,
|
||||||
|
capacity: 1000,
|
||||||
|
numRequests: 200,
|
||||||
|
n: 10,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
suite.Run(test.Name, func() {
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
defer goleak.VerifyNone(t)
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
ctx, flush := tester.NewContext(t)
|
ctx, flush := tester.NewContext(t)
|
||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
|
s, err := NewSlidingWindowLimiter(test.windowSize, test.slideInterval, test.capacity)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
// Make concurrent requests to the limiter
|
// Make concurrent requests to the limiter
|
||||||
for i := 0; i < numRequests; i++ {
|
for i := 0; i < test.numRequests; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
// Sleep for a random duration to spread out requests over multiple slide
|
// Sleep for a random duration to spread out requests over
|
||||||
// intervals & windows, so that we can test the sliding window logic better.
|
// multiple slide intervals & windows, so that we can test
|
||||||
// Without this, the requests will be bunched up in the very first intervals
|
// the sliding window logic better.
|
||||||
// of the 2 windows. Rest of the intervals will be empty.
|
// Without this, the requests will be bunched up in the very
|
||||||
|
// first interval of the 2 windows. Rest of the intervals
|
||||||
|
// will be empty.
|
||||||
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
|
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
|
||||||
|
|
||||||
err := s.Wait(ctx)
|
err := s.WaitN(ctx, test.n)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -131,7 +155,9 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
|
|||||||
sums := slidingSums(data, sw.numIntervals)
|
sums := slidingSums(data, sw.numIntervals)
|
||||||
|
|
||||||
for _, sum := range sums {
|
for _, sum := range sums {
|
||||||
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity)
|
require.True(t, sum <= test.capacity, "sum: %d, capacity: %d", sum, test.capacity)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -283,3 +309,114 @@ func (suite *SlidingWindowUnitTestSuite) TestShutdown() {
|
|||||||
// Second call to Shutdown() should be a no-op.
|
// Second call to Shutdown() should be a no-op.
|
||||||
s.Shutdown()
|
s.Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestReset tests if limiter state is cleared and all tokens are available for
|
||||||
|
// use post reset.
|
||||||
|
func (suite *SlidingWindowUnitTestSuite) TestReset() {
|
||||||
|
var (
|
||||||
|
t = suite.T()
|
||||||
|
windowSize = 100 * time.Millisecond
|
||||||
|
slideInterval = 10 * time.Millisecond
|
||||||
|
capacity = 10
|
||||||
|
numRequests = capacity
|
||||||
|
wg sync.WaitGroup
|
||||||
|
)
|
||||||
|
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
ctx, flush := tester.NewContext(t)
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Make some requests to the limiter.
|
||||||
|
for i := 0; i < numRequests; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
err := s.Wait(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Reset the limiter.
|
||||||
|
s.Reset()
|
||||||
|
|
||||||
|
// Shutdown the ticker before accessing the internal limiter state.
|
||||||
|
s.Shutdown()
|
||||||
|
|
||||||
|
sw := s.(*slidingWindow)
|
||||||
|
|
||||||
|
// Check if state is cleared, and all tokens are available for use post reset.
|
||||||
|
require.Equal(t, capacity, len(sw.permits))
|
||||||
|
|
||||||
|
for i := 0; i < sw.numIntervals; i++ {
|
||||||
|
require.Equal(t, 0, sw.prev.count[i])
|
||||||
|
require.Equal(t, 0, sw.curr.count[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestResetDuringActiveRequests tests if reset is transparent to any active
|
||||||
|
// requests and they are not affected. It also checks that limiter stays
|
||||||
|
// within capacity limits post reset.
|
||||||
|
func (suite *SlidingWindowUnitTestSuite) TestResetDuringActiveRequests() {
|
||||||
|
var (
|
||||||
|
t = suite.T()
|
||||||
|
windowSize = 100 * time.Millisecond
|
||||||
|
slideInterval = 10 * time.Millisecond
|
||||||
|
capacity = 10
|
||||||
|
numRequests = 10 * capacity
|
||||||
|
wg sync.WaitGroup
|
||||||
|
)
|
||||||
|
|
||||||
|
defer goleak.VerifyNone(t)
|
||||||
|
|
||||||
|
ctx, flush := tester.NewContext(t)
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Make some requests to the limiter as well as reset it concurrently
|
||||||
|
// in 10:1 request to reset ratio.
|
||||||
|
for i := 0; i < numRequests; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
err := s.Wait(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Launch a reset every 10th iteration.
|
||||||
|
if i%10 == 0 {
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
s.Reset()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Shutdown the ticker before accessing the internal limiter state.
|
||||||
|
s.Shutdown()
|
||||||
|
|
||||||
|
// Verify that number of requests allowed in each window is less than or equal
|
||||||
|
// to window capacity
|
||||||
|
sw := s.(*slidingWindow)
|
||||||
|
data := append(sw.prev.count, sw.curr.count...)
|
||||||
|
|
||||||
|
sums := slidingSums(data, sw.numIntervals)
|
||||||
|
|
||||||
|
for _, sum := range sums {
|
||||||
|
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
32
src/internal/common/limiters/token_bucket.go
Normal file
32
src/internal/common/limiters/token_bucket.go
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
package limiters
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ Limiter = &TokenBucket{}
|
||||||
|
|
||||||
|
// Thin wrapper around the golang.org/x/time/rate token bucket rate limiter.
|
||||||
|
type TokenBucket struct {
|
||||||
|
*rate.Limiter
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTokenBucketLimiter(r int, burst int) Limiter {
|
||||||
|
return &TokenBucket{
|
||||||
|
Limiter: rate.NewLimiter(rate.Limit(r), burst),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tb *TokenBucket) Wait(ctx context.Context) error {
|
||||||
|
return tb.Limiter.Wait(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tb *TokenBucket) WaitN(ctx context.Context, n int) error {
|
||||||
|
return tb.Limiter.WaitN(ctx, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset and shutdown are no-ops for the token bucket limiter.
|
||||||
|
func (tb *TokenBucket) Reset() {}
|
||||||
|
func (tb *TokenBucket) Shutdown() {}
|
||||||
@ -15,6 +15,7 @@ import (
|
|||||||
"github.com/alcionai/corso/src/internal/m365/service/sharepoint"
|
"github.com/alcionai/corso/src/internal/m365/service/sharepoint"
|
||||||
"github.com/alcionai/corso/src/internal/operations/inject"
|
"github.com/alcionai/corso/src/internal/operations/inject"
|
||||||
bupMD "github.com/alcionai/corso/src/pkg/backup/metadata"
|
bupMD "github.com/alcionai/corso/src/pkg/backup/metadata"
|
||||||
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
"github.com/alcionai/corso/src/pkg/count"
|
"github.com/alcionai/corso/src/pkg/count"
|
||||||
"github.com/alcionai/corso/src/pkg/fault"
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
"github.com/alcionai/corso/src/pkg/filters"
|
"github.com/alcionai/corso/src/pkg/filters"
|
||||||
@ -46,8 +47,6 @@ func (ctrl *Controller) ProduceBackupCollections(
|
|||||||
diagnostics.Index("service", bpc.Selector.PathService().String()))
|
diagnostics.Index("service", bpc.Selector.PathService().String()))
|
||||||
defer end()
|
defer end()
|
||||||
|
|
||||||
ctx = graph.BindRateLimiterConfig(ctx, graph.LimiterCfg{Service: service})
|
|
||||||
|
|
||||||
// Limit the max number of active requests to graph from this collection.
|
// Limit the max number of active requests to graph from this collection.
|
||||||
bpc.Options.Parallelism.ItemFetch = graph.Parallelism(service).
|
bpc.Options.Parallelism.ItemFetch = graph.Parallelism(service).
|
||||||
ItemOverride(ctx, bpc.Options.Parallelism.ItemFetch)
|
ItemOverride(ctx, bpc.Options.Parallelism.ItemFetch)
|
||||||
@ -226,3 +225,26 @@ func (ctrl *Controller) GetMetadataPaths(
|
|||||||
|
|
||||||
return paths, nil
|
return paths, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ctrl *Controller) SetRateLimiter(
|
||||||
|
ctx context.Context,
|
||||||
|
service path.ServiceType,
|
||||||
|
options control.Options,
|
||||||
|
) context.Context {
|
||||||
|
// Use sliding window limiter for Exchange if the feature is not explicitly
|
||||||
|
// disabled. For other services we always use token bucket limiter.
|
||||||
|
enableSlidingLim := false
|
||||||
|
if service == path.ExchangeService &&
|
||||||
|
!options.ToggleFeatures.DisableSlidingWindowLimiter {
|
||||||
|
enableSlidingLim = true
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx = graph.BindRateLimiterConfig(
|
||||||
|
ctx,
|
||||||
|
graph.LimiterCfg{
|
||||||
|
Service: service,
|
||||||
|
EnableSlidingLimiter: enableSlidingLim,
|
||||||
|
})
|
||||||
|
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|||||||
@ -589,14 +589,6 @@ func (oc *Collection) streamDriveItem(
|
|||||||
parentPath)
|
parentPath)
|
||||||
|
|
||||||
ctx = clues.Add(ctx, "item_info", itemInfo)
|
ctx = clues.Add(ctx, "item_info", itemInfo)
|
||||||
// Drive content download requests are also rate limited by graph api.
|
|
||||||
// Ensure that this request goes through the drive limiter & not the default
|
|
||||||
// limiter.
|
|
||||||
ctx = graph.BindRateLimiterConfig(
|
|
||||||
ctx,
|
|
||||||
graph.LimiterCfg{
|
|
||||||
Service: path.OneDriveService,
|
|
||||||
})
|
|
||||||
|
|
||||||
if isFile {
|
if isFile {
|
||||||
dataSuffix := metadata.DataFileSuffix
|
dataSuffix := metadata.DataFileSuffix
|
||||||
|
|||||||
@ -101,3 +101,11 @@ func (ctrl Controller) PopulateProtectedResourceIDAndName(
|
|||||||
return idname.NewProvider(ctrl.ProtectedResourceID, ctrl.ProtectedResourceName),
|
return idname.NewProvider(ctrl.ProtectedResourceID, ctrl.ProtectedResourceName),
|
||||||
ctrl.ProtectedResourceErr
|
ctrl.ProtectedResourceErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ctrl Controller) SetRateLimiter(
|
||||||
|
ctx context.Context,
|
||||||
|
service path.ServiceType,
|
||||||
|
options control.Options,
|
||||||
|
) context.Context {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|||||||
@ -205,6 +205,9 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
|
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
|
||||||
defer flushMetrics()
|
defer flushMetrics()
|
||||||
|
|
||||||
|
// Select an appropriate rate limiter for the service.
|
||||||
|
ctx = op.bp.SetRateLimiter(ctx, op.Selectors.PathService(), op.Options)
|
||||||
|
|
||||||
// Check if the protected resource has the service enabled in order for us
|
// Check if the protected resource has the service enabled in order for us
|
||||||
// to run a backup.
|
// to run a backup.
|
||||||
enabled, err := op.bp.IsServiceEnabled(
|
enabled, err := op.bp.IsServiceEnabled(
|
||||||
@ -319,6 +322,12 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
logger.Ctx(ctx).Infow("completed backup", "results", op.Results)
|
logger.Ctx(ctx).Infow("completed backup", "results", op.Results)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For exchange, rate limits are enforced on a mailbox level. Reset the
|
||||||
|
// rate limiter so that it doesn't accidentally throttle following mailboxes.
|
||||||
|
// This is a no-op if we are using token bucket limiter since it refreshes
|
||||||
|
// tokens on a fixed per second basis.
|
||||||
|
graph.ResetLimiter(ctx)
|
||||||
|
|
||||||
return op.Errors.Failure()
|
return op.Errors.Failure()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -44,6 +44,14 @@ type (
|
|||||||
) ([]path.RestorePaths, error)
|
) ([]path.RestorePaths, error)
|
||||||
|
|
||||||
Wait() *data.CollectionStats
|
Wait() *data.CollectionStats
|
||||||
|
|
||||||
|
// SetRateLimiter selects a rate limiter type for the service being
|
||||||
|
// backed up and binds it to the context.
|
||||||
|
SetRateLimiter(
|
||||||
|
ctx context.Context,
|
||||||
|
service path.ServiceType,
|
||||||
|
options control.Options,
|
||||||
|
) context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
RestoreConsumer interface {
|
RestoreConsumer interface {
|
||||||
|
|||||||
@ -10,6 +10,7 @@ import (
|
|||||||
kinject "github.com/alcionai/corso/src/internal/kopia/inject"
|
kinject "github.com/alcionai/corso/src/internal/kopia/inject"
|
||||||
"github.com/alcionai/corso/src/internal/m365"
|
"github.com/alcionai/corso/src/internal/m365"
|
||||||
"github.com/alcionai/corso/src/internal/operations/inject"
|
"github.com/alcionai/corso/src/internal/operations/inject"
|
||||||
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
"github.com/alcionai/corso/src/pkg/count"
|
"github.com/alcionai/corso/src/pkg/count"
|
||||||
"github.com/alcionai/corso/src/pkg/fault"
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
"github.com/alcionai/corso/src/pkg/path"
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
@ -69,3 +70,11 @@ func (mbp mockBackupProducer) GetMetadataPaths(
|
|||||||
ctrl := m365.Controller{}
|
ctrl := m365.Controller{}
|
||||||
return ctrl.GetMetadataPaths(ctx, r, base, errs)
|
return ctrl.GetMetadataPaths(ctx, r, base, errs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mbp mockBackupProducer) SetRateLimiter(
|
||||||
|
ctx context.Context,
|
||||||
|
service path.ServiceType,
|
||||||
|
options control.Options,
|
||||||
|
) context.Context {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|||||||
@ -91,4 +91,11 @@ type Toggles struct {
|
|||||||
// the protected resource. PreviewBackups are used to demonstrate value by
|
// the protected resource. PreviewBackups are used to demonstrate value by
|
||||||
// being quick to create.
|
// being quick to create.
|
||||||
PreviewBackup bool `json:"previewBackup"`
|
PreviewBackup bool `json:"previewBackup"`
|
||||||
|
|
||||||
|
// DisableSlidingWindowLimiter disables the experimental sliding window rate
|
||||||
|
// limiter for graph API requests. This is only relevant for exchange backups.
|
||||||
|
// Exchange restores continue to use the default token bucket rate limiter.
|
||||||
|
// Setting this flag switches exchange backups to use the default token bucket
|
||||||
|
// rate limiter.
|
||||||
|
DisableSlidingWindowLimiter bool `json:"disableSlidingWindowLimiter"`
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,8 +9,8 @@ import (
|
|||||||
|
|
||||||
"github.com/alcionai/clues"
|
"github.com/alcionai/clues"
|
||||||
khttp "github.com/microsoft/kiota-http-go"
|
khttp "github.com/microsoft/kiota-http-go"
|
||||||
"golang.org/x/time/rate"
|
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/common/limiters"
|
||||||
"github.com/alcionai/corso/src/pkg/count"
|
"github.com/alcionai/corso/src/pkg/count"
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
"github.com/alcionai/corso/src/pkg/path"
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
@ -88,6 +88,14 @@ const (
|
|||||||
// but doing so risks timeouts. It's better to give the limits breathing room.
|
// but doing so risks timeouts. It's better to give the limits breathing room.
|
||||||
defaultPerSecond = 16 // 16 * 60 * 10 = 9600
|
defaultPerSecond = 16 // 16 * 60 * 10 = 9600
|
||||||
defaultMaxCap = 200 // real cap is 10k-per-10-minutes
|
defaultMaxCap = 200 // real cap is 10k-per-10-minutes
|
||||||
|
|
||||||
|
// Sliding window limiter for exchange service. We are restricting it to 9600 per
|
||||||
|
// 10 mins to give the limits some breathing room. It can be slowly increased
|
||||||
|
// over time to get closer to the 10k limit.
|
||||||
|
exchWindow = 610 * time.Second
|
||||||
|
exchSlideInterval = 1 * time.Second
|
||||||
|
exchCapacity = 9600
|
||||||
|
|
||||||
// since drive runs on a per-minute, rather than per-10-minute bucket, we have
|
// since drive runs on a per-minute, rather than per-10-minute bucket, we have
|
||||||
// to keep the max cap equal to the per-second cap. A large maxCap pool (say,
|
// to keep the max cap equal to the per-second cap. A large maxCap pool (say,
|
||||||
// 1200, similar to the per-minute cap) would allow us to make a flood of 2400
|
// 1200, similar to the per-minute cap) would allow us to make a flood of 2400
|
||||||
@ -99,13 +107,27 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap)
|
driveLimiter = limiters.NewTokenBucketLimiter(drivePerSecond, driveMaxCap)
|
||||||
// also used as the exchange service limiter
|
// Exchange service token bucket rate limiter
|
||||||
defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap)
|
defaultLimiter = limiters.NewTokenBucketLimiter(defaultPerSecond, defaultMaxCap)
|
||||||
|
|
||||||
|
// Exchange service sliding window rate limiter.
|
||||||
|
//
|
||||||
|
// TODO(pandeyabs): We are swallowing the error here. It's a limitation of
|
||||||
|
// using global limiters. For now, we'll catch any errors in the regression
|
||||||
|
// test until we look into managing limiter life cycles.
|
||||||
|
exchSlidingLimiter, _ = limiters.NewSlidingWindowLimiter(
|
||||||
|
exchWindow,
|
||||||
|
exchSlideInterval,
|
||||||
|
exchCapacity)
|
||||||
)
|
)
|
||||||
|
|
||||||
type LimiterCfg struct {
|
type LimiterCfg struct {
|
||||||
Service path.ServiceType
|
Service path.ServiceType
|
||||||
|
// Experimental flag to enable sliding window rate limiter. It should only be
|
||||||
|
// enabled for Exchange backups. It's set to false by default to prevent accidental
|
||||||
|
// enablement for non backup operations and other services.
|
||||||
|
EnableSlidingLimiter bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type limiterCfgKey string
|
type limiterCfgKey string
|
||||||
@ -116,19 +138,27 @@ func BindRateLimiterConfig(ctx context.Context, lc LimiterCfg) context.Context {
|
|||||||
return context.WithValue(ctx, limiterCfgCtxKey, lc)
|
return context.WithValue(ctx, limiterCfgCtxKey, lc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ctxLimiter(ctx context.Context) *rate.Limiter {
|
func ctxLimiter(ctx context.Context) limiters.Limiter {
|
||||||
lc, ok := extractRateLimiterConfig(ctx)
|
lc, ok := extractRateLimiterConfig(ctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
return defaultLimiter
|
return defaultLimiter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lim := defaultLimiter
|
||||||
|
|
||||||
switch lc.Service {
|
switch lc.Service {
|
||||||
// FIXME: Handle based on category once we add chat backup
|
// FIXME: Handle based on category once we add chat backup
|
||||||
case path.OneDriveService, path.SharePointService, path.GroupsService:
|
case path.OneDriveService, path.SharePointService, path.GroupsService:
|
||||||
return driveLimiter
|
lim = driveLimiter
|
||||||
default:
|
case path.ExchangeService:
|
||||||
return defaultLimiter
|
if lc.EnableSlidingLimiter {
|
||||||
|
// Return sliding window limiter for Exchange if enabled. Otherwise,
|
||||||
|
// return the default token bucket limiter.
|
||||||
|
lim = exchSlidingLimiter
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return lim
|
||||||
}
|
}
|
||||||
|
|
||||||
func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) {
|
func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) {
|
||||||
@ -193,6 +223,11 @@ func QueueRequest(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ResetLimiter(ctx context.Context) {
|
||||||
|
limiter := ctxLimiter(ctx)
|
||||||
|
limiter.Reset()
|
||||||
|
}
|
||||||
|
|
||||||
// RateLimiterMiddleware is used to ensure we don't overstep per-min request limits.
|
// RateLimiterMiddleware is used to ensure we don't overstep per-min request limits.
|
||||||
type RateLimiterMiddleware struct{}
|
type RateLimiterMiddleware struct{}
|
||||||
|
|
||||||
|
|||||||
@ -19,8 +19,8 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
"golang.org/x/time/rate"
|
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/common/limiters"
|
||||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||||
"github.com/alcionai/corso/src/internal/tester"
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
"github.com/alcionai/corso/src/internal/tester/tconfig"
|
"github.com/alcionai/corso/src/internal/tester/tconfig"
|
||||||
@ -355,8 +355,8 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() {
|
|||||||
table := []struct {
|
table := []struct {
|
||||||
name string
|
name string
|
||||||
service path.ServiceType
|
service path.ServiceType
|
||||||
expectOK require.BoolAssertionFunc
|
enableSlidingLim bool
|
||||||
expectLimiter *rate.Limiter
|
expectLimiter limiters.Limiter
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "exchange",
|
name: "exchange",
|
||||||
@ -388,12 +388,30 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() {
|
|||||||
service: path.ServiceType(-1),
|
service: path.ServiceType(-1),
|
||||||
expectLimiter: defaultLimiter,
|
expectLimiter: defaultLimiter,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "exchange sliding limiter",
|
||||||
|
service: path.ExchangeService,
|
||||||
|
enableSlidingLim: true,
|
||||||
|
expectLimiter: exchSlidingLimiter,
|
||||||
|
},
|
||||||
|
// Sliding limiter flag is ignored for non-exchange services
|
||||||
|
{
|
||||||
|
name: "onedrive with sliding limiter flag set",
|
||||||
|
service: path.OneDriveService,
|
||||||
|
enableSlidingLim: true,
|
||||||
|
expectLimiter: driveLimiter,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
t := suite.T()
|
t := suite.T()
|
||||||
|
|
||||||
tctx := BindRateLimiterConfig(ctx, LimiterCfg{Service: test.service})
|
tctx := BindRateLimiterConfig(
|
||||||
|
ctx,
|
||||||
|
LimiterCfg{
|
||||||
|
Service: test.service,
|
||||||
|
EnableSlidingLimiter: test.enableSlidingLim,
|
||||||
|
})
|
||||||
lc, ok := extractRateLimiterConfig(tctx)
|
lc, ok := extractRateLimiterConfig(tctx)
|
||||||
require.True(t, ok, "found rate limiter in ctx")
|
require.True(t, ok, "found rate limiter in ctx")
|
||||||
assert.Equal(t, test.service, lc.Service)
|
assert.Equal(t, test.service, lc.Service)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user