Compare commits
11 Commits
main
...
lim_tuning
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
314fb5f847 | ||
|
|
ea9f6d97b0 | ||
|
|
f6b83f3e7e | ||
|
|
4e117b5703 | ||
|
|
3b6d4bfe8b | ||
|
|
5bab966f67 | ||
|
|
4b959098a8 | ||
|
|
42ae33c436 | ||
|
|
ac0fa41c9c | ||
|
|
cd2181193b | ||
|
|
910767c7d2 |
@ -85,6 +85,7 @@ func addExchangeCommands(cmd *cobra.Command) *cobra.Command {
|
||||
flags.AddDisableConcurrencyLimiterFlag(c)
|
||||
flags.AddDeltaPageSizeFlag(c)
|
||||
flags.AddGenericBackupFlags(c)
|
||||
flags.AddDisableSlidingWindowLimiterFlag(c)
|
||||
|
||||
case listCommand:
|
||||
c, fs = utils.AddCommand(cmd, exchangeListCmd())
|
||||
|
||||
@ -109,6 +109,7 @@ func (suite *ExchangeUnitSuite) TestBackupCreateFlags() {
|
||||
"--" + flags.DisableDeltaFN,
|
||||
"--" + flags.EnableImmutableIDFN,
|
||||
"--" + flags.DisableConcurrencyLimiterFN,
|
||||
"--" + flags.DisableSlidingWindowLimiterFN,
|
||||
},
|
||||
flagsTD.PreparedGenericBackupFlags(),
|
||||
flagsTD.PreparedProviderFlags(),
|
||||
@ -126,6 +127,7 @@ func (suite *ExchangeUnitSuite) TestBackupCreateFlags() {
|
||||
assert.True(t, co.ToggleFeatures.DisableDelta)
|
||||
assert.True(t, co.ToggleFeatures.ExchangeImmutableIDs)
|
||||
assert.True(t, co.ToggleFeatures.DisableConcurrencyLimiter)
|
||||
assert.True(t, co.ToggleFeatures.DisableSlidingWindowLimiter)
|
||||
flagsTD.AssertGenericBackupFlags(t, cmd)
|
||||
flagsTD.AssertProviderFlags(t, cmd)
|
||||
flagsTD.AssertStorageFlags(t, cmd)
|
||||
|
||||
@ -5,22 +5,23 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
AlertsFN = "alerts"
|
||||
DeltaPageSizeFN = "delta-page-size"
|
||||
DisableConcurrencyLimiterFN = "disable-concurrency-limiter"
|
||||
DisableDeltaFN = "disable-delta"
|
||||
DisableIncrementalsFN = "disable-incrementals"
|
||||
ForceItemDataDownloadFN = "force-item-data-download"
|
||||
EnableImmutableIDFN = "enable-immutable-id"
|
||||
FailFastFN = "fail-fast"
|
||||
FailedItemsFN = "failed-items"
|
||||
FetchParallelismFN = "fetch-parallelism"
|
||||
NoStatsFN = "no-stats"
|
||||
RecoveredErrorsFN = "recovered-errors"
|
||||
NoPermissionsFN = "no-permissions"
|
||||
RunModeFN = "run-mode"
|
||||
SkippedItemsFN = "skipped-items"
|
||||
SkipReduceFN = "skip-reduce"
|
||||
AlertsFN = "alerts"
|
||||
DeltaPageSizeFN = "delta-page-size"
|
||||
DisableConcurrencyLimiterFN = "disable-concurrency-limiter"
|
||||
DisableDeltaFN = "disable-delta"
|
||||
DisableIncrementalsFN = "disable-incrementals"
|
||||
ForceItemDataDownloadFN = "force-item-data-download"
|
||||
EnableImmutableIDFN = "enable-immutable-id"
|
||||
FailFastFN = "fail-fast"
|
||||
FailedItemsFN = "failed-items"
|
||||
FetchParallelismFN = "fetch-parallelism"
|
||||
NoStatsFN = "no-stats"
|
||||
RecoveredErrorsFN = "recovered-errors"
|
||||
NoPermissionsFN = "no-permissions"
|
||||
RunModeFN = "run-mode"
|
||||
SkippedItemsFN = "skipped-items"
|
||||
SkipReduceFN = "skip-reduce"
|
||||
DisableSlidingWindowLimiterFN = "disable-sliding-limiter"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -39,9 +40,10 @@ var (
|
||||
NoStatsFV bool
|
||||
// RunMode describes the type of run, such as:
|
||||
// flagtest, dry, run. Should default to 'run'.
|
||||
RunModeFV string
|
||||
NoPermissionsFV bool
|
||||
SkipReduceFV bool
|
||||
RunModeFV string
|
||||
NoPermissionsFV bool
|
||||
SkipReduceFV bool
|
||||
DisableSlidingWindowLimiterFV bool
|
||||
)
|
||||
|
||||
// well-known flag values
|
||||
@ -174,3 +176,18 @@ func AddRunModeFlag(cmd *cobra.Command, persistent bool) {
|
||||
fs.StringVar(&RunModeFV, RunModeFN, "run", "What mode to run: dry, test, run. Defaults to run.")
|
||||
cobra.CheckErr(fs.MarkHidden(RunModeFN))
|
||||
}
|
||||
|
||||
// AddDisableSlidingWindowLimiterFN disables the experimental sliding window rate
|
||||
// limiter for graph API requests. This is only relevant for exchange backups.
|
||||
// Exchange restores continue to use the default token bucket rate limiter.
|
||||
// Setting this flag switches exchange backups to use the default token bucket
|
||||
// rate limiter.
|
||||
func AddDisableSlidingWindowLimiterFlag(cmd *cobra.Command) {
|
||||
fs := cmd.Flags()
|
||||
fs.BoolVar(
|
||||
&DisableSlidingWindowLimiterFV,
|
||||
DisableSlidingWindowLimiterFN,
|
||||
false,
|
||||
"Disable sliding window rate limiter. Default: false")
|
||||
cobra.CheckErr(fs.MarkHidden(DisableSlidingWindowLimiterFN))
|
||||
}
|
||||
|
||||
@ -27,6 +27,7 @@ func Control() control.Options {
|
||||
opt.ToggleFeatures.DisableDelta = flags.DisableDeltaFV
|
||||
opt.ToggleFeatures.ExchangeImmutableIDs = flags.EnableImmutableIDFV
|
||||
opt.ToggleFeatures.DisableConcurrencyLimiter = flags.DisableConcurrencyLimiterFV
|
||||
opt.ToggleFeatures.DisableSlidingWindowLimiter = flags.DisableSlidingWindowLimiterFV
|
||||
opt.Parallelism.ItemFetch = flags.FetchParallelismFV
|
||||
|
||||
return opt
|
||||
|
||||
@ -37,6 +37,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
|
||||
assert.Equal(t, 2, flags.FetchParallelismFV, flags.FetchParallelismFN)
|
||||
assert.True(t, flags.DisableConcurrencyLimiterFV, flags.DisableConcurrencyLimiterFN)
|
||||
assert.Equal(t, 499, flags.DeltaPageSizeFV, flags.DeltaPageSizeFN)
|
||||
assert.True(t, flags.DisableSlidingWindowLimiterFV, flags.DisableSlidingWindowLimiterFN)
|
||||
},
|
||||
}
|
||||
|
||||
@ -52,6 +53,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
|
||||
flags.AddFetchParallelismFlag(cmd)
|
||||
flags.AddDisableConcurrencyLimiterFlag(cmd)
|
||||
flags.AddDeltaPageSizeFlag(cmd)
|
||||
flags.AddDisableSlidingWindowLimiterFlag(cmd)
|
||||
|
||||
// Test arg parsing for few args
|
||||
cmd.SetArgs([]string{
|
||||
@ -66,6 +68,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
|
||||
"--" + flags.FetchParallelismFN, "2",
|
||||
"--" + flags.DisableConcurrencyLimiterFN,
|
||||
"--" + flags.DeltaPageSizeFN, "499",
|
||||
"--" + flags.DisableSlidingWindowLimiterFN,
|
||||
})
|
||||
|
||||
err := cmd.Execute()
|
||||
|
||||
@ -4,5 +4,7 @@ import "context"
|
||||
|
||||
type Limiter interface {
|
||||
Wait(ctx context.Context) error
|
||||
WaitN(ctx context.Context, n int) error
|
||||
Shutdown()
|
||||
Reset()
|
||||
}
|
||||
|
||||
@ -47,6 +47,11 @@ type slidingWindow struct {
|
||||
|
||||
// mu synchronizes access to the curr and prev windows
|
||||
mu sync.Mutex
|
||||
// requestMu synchronizes access between requests. This is especially needed
|
||||
// for WaitN to ensure that all N tokens are granted to the same request.
|
||||
// It also allows us to reset the limiter transparently without affecting
|
||||
// any pending requests.
|
||||
requestMu sync.Mutex
|
||||
// stopTicker stops the recurring slide ticker
|
||||
stopTicker chan struct{}
|
||||
closeOnce sync.Once
|
||||
@ -84,16 +89,31 @@ func NewSlidingWindowLimiter(
|
||||
}
|
||||
|
||||
// Wait blocks a request until a token is available or the context is cancelled.
|
||||
// TODO(pandeyabs): Implement WaitN.
|
||||
func (s *slidingWindow) Wait(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return clues.Stack(ctx.Err())
|
||||
case <-s.permits:
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.WaitN(ctx, 1)
|
||||
}
|
||||
|
||||
s.curr.count[s.currentInterval]++
|
||||
// WaitN blocks a request until n tokens are available or the context gets
|
||||
// cancelled. WaitN should be called with n <= capacity otherwise it will block
|
||||
// forever.
|
||||
//
|
||||
// TODO(pandeyabs): Enforce n <= capacity check. Not adding it right now because
|
||||
// we are relying on capacity = 0 for ctx cancellation test, which would need
|
||||
// some refactoring.
|
||||
func (s *slidingWindow) WaitN(ctx context.Context, n int) error {
|
||||
// Acquire request mutex and slide mutex in order.
|
||||
s.requestMu.Lock()
|
||||
defer s.requestMu.Unlock()
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return clues.Stack(ctx.Err())
|
||||
case <-s.permits:
|
||||
s.mu.Lock()
|
||||
s.curr.count[s.currentInterval]++
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -107,8 +127,42 @@ func (s *slidingWindow) Shutdown() {
|
||||
})
|
||||
}
|
||||
|
||||
// Reset resets the limiter to its initial state. Any pending requests will be
|
||||
// cancelled. Reset should only be called when the limiter is not in use.
|
||||
// No need to shutdown slide goroutine. prev and curr
|
||||
func (s *slidingWindow) Reset() {
|
||||
// Acquire request mutex and slide mutex in order like Wait does.
|
||||
s.requestMu.Lock()
|
||||
defer s.requestMu.Unlock()
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Clear all existing counts
|
||||
s.prev = fixedWindow{
|
||||
count: make([]int, s.numIntervals),
|
||||
}
|
||||
s.curr = fixedWindow{
|
||||
count: make([]int, s.numIntervals),
|
||||
}
|
||||
|
||||
// Reset permits
|
||||
close(s.permits)
|
||||
s.permits = make(chan token, s.capacity)
|
||||
|
||||
// Prefill permits to allow tokens to be granted immediately
|
||||
for i := 0; i < s.capacity; i++ {
|
||||
s.permits <- token{}
|
||||
}
|
||||
}
|
||||
|
||||
// initialize starts the slide goroutine and prefills tokens to full capacity.
|
||||
func (s *slidingWindow) initialize() {
|
||||
// Hold request mutex to ensure that no requests get processed until
|
||||
// we are done initializing.
|
||||
s.requestMu.Lock()
|
||||
defer s.requestMu.Unlock()
|
||||
|
||||
// Ok to not hold the mutex here since nothing else is running yet.
|
||||
s.nextInterval()
|
||||
|
||||
|
||||
@ -81,57 +81,83 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() {
|
||||
}
|
||||
|
||||
// TestWaitSliding tests the sliding window functionality of the limiter with
|
||||
// time distributed Wait() calls.
|
||||
func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
|
||||
var (
|
||||
t = suite.T()
|
||||
windowSize = 1 * time.Second
|
||||
slideInterval = 10 * time.Millisecond
|
||||
capacity = 100
|
||||
// Test will run for duration of 2 windowSize.
|
||||
numRequests = 2 * capacity
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Make concurrent requests to the limiter
|
||||
for i := 0; i < numRequests; i++ {
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
// Sleep for a random duration to spread out requests over multiple slide
|
||||
// intervals & windows, so that we can test the sliding window logic better.
|
||||
// Without this, the requests will be bunched up in the very first intervals
|
||||
// of the 2 windows. Rest of the intervals will be empty.
|
||||
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
|
||||
|
||||
err := s.Wait(ctx)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
// time distributed WaitN() calls.
|
||||
func (suite *SlidingWindowUnitTestSuite) TestWaitNSliding() {
|
||||
tests := []struct {
|
||||
Name string
|
||||
windowSize time.Duration
|
||||
slideInterval time.Duration
|
||||
capacity int
|
||||
numRequests int
|
||||
n int
|
||||
}{
|
||||
{
|
||||
Name: "Request 1 token each",
|
||||
windowSize: 100 * time.Millisecond,
|
||||
slideInterval: 10 * time.Millisecond,
|
||||
capacity: 100,
|
||||
numRequests: 200,
|
||||
n: 1,
|
||||
},
|
||||
{
|
||||
Name: "Request N tokens each",
|
||||
windowSize: 100 * time.Millisecond,
|
||||
slideInterval: 10 * time.Millisecond,
|
||||
capacity: 1000,
|
||||
numRequests: 200,
|
||||
n: 10,
|
||||
},
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Shutdown the ticker before accessing the internal limiter state.
|
||||
s.Shutdown()
|
||||
for _, test := range tests {
|
||||
suite.Run(test.Name, func() {
|
||||
t := suite.T()
|
||||
|
||||
// Verify that number of requests allowed in each window is less than or equal
|
||||
// to window capacity
|
||||
sw := s.(*slidingWindow)
|
||||
data := append(sw.prev.count, sw.curr.count...)
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
sums := slidingSums(data, sw.numIntervals)
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
for _, sum := range sums {
|
||||
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity)
|
||||
s, err := NewSlidingWindowLimiter(test.windowSize, test.slideInterval, test.capacity)
|
||||
require.NoError(t, err)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Make concurrent requests to the limiter
|
||||
for i := 0; i < test.numRequests; i++ {
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
// Sleep for a random duration to spread out requests over
|
||||
// multiple slide intervals & windows, so that we can test
|
||||
// the sliding window logic better.
|
||||
// Without this, the requests will be bunched up in the very
|
||||
// first interval of the 2 windows. Rest of the intervals
|
||||
// will be empty.
|
||||
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
|
||||
|
||||
err := s.WaitN(ctx, test.n)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Shutdown the ticker before accessing the internal limiter state.
|
||||
s.Shutdown()
|
||||
|
||||
// Verify that number of requests allowed in each window is less than or equal
|
||||
// to window capacity
|
||||
sw := s.(*slidingWindow)
|
||||
data := append(sw.prev.count, sw.curr.count...)
|
||||
|
||||
sums := slidingSums(data, sw.numIntervals)
|
||||
|
||||
for _, sum := range sums {
|
||||
require.True(t, sum <= test.capacity, "sum: %d, capacity: %d", sum, test.capacity)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -283,3 +309,114 @@ func (suite *SlidingWindowUnitTestSuite) TestShutdown() {
|
||||
// Second call to Shutdown() should be a no-op.
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
// TestReset tests if limiter state is cleared and all tokens are available for
|
||||
// use post reset.
|
||||
func (suite *SlidingWindowUnitTestSuite) TestReset() {
|
||||
var (
|
||||
t = suite.T()
|
||||
windowSize = 100 * time.Millisecond
|
||||
slideInterval = 10 * time.Millisecond
|
||||
capacity = 10
|
||||
numRequests = capacity
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Make some requests to the limiter.
|
||||
for i := 0; i < numRequests; i++ {
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
err := s.Wait(ctx)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Reset the limiter.
|
||||
s.Reset()
|
||||
|
||||
// Shutdown the ticker before accessing the internal limiter state.
|
||||
s.Shutdown()
|
||||
|
||||
sw := s.(*slidingWindow)
|
||||
|
||||
// Check if state is cleared, and all tokens are available for use post reset.
|
||||
require.Equal(t, capacity, len(sw.permits))
|
||||
|
||||
for i := 0; i < sw.numIntervals; i++ {
|
||||
require.Equal(t, 0, sw.prev.count[i])
|
||||
require.Equal(t, 0, sw.curr.count[i])
|
||||
}
|
||||
}
|
||||
|
||||
// TestResetDuringActiveRequests tests if reset is transparent to any active
|
||||
// requests and they are not affected. It also checks that limiter stays
|
||||
// within capacity limits post reset.
|
||||
func (suite *SlidingWindowUnitTestSuite) TestResetDuringActiveRequests() {
|
||||
var (
|
||||
t = suite.T()
|
||||
windowSize = 100 * time.Millisecond
|
||||
slideInterval = 10 * time.Millisecond
|
||||
capacity = 10
|
||||
numRequests = 10 * capacity
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
|
||||
defer goleak.VerifyNone(t)
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Make some requests to the limiter as well as reset it concurrently
|
||||
// in 10:1 request to reset ratio.
|
||||
for i := 0; i < numRequests; i++ {
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
err := s.Wait(ctx)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
// Launch a reset every 10th iteration.
|
||||
if i%10 == 0 {
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
s.Reset()
|
||||
}()
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Shutdown the ticker before accessing the internal limiter state.
|
||||
s.Shutdown()
|
||||
|
||||
// Verify that number of requests allowed in each window is less than or equal
|
||||
// to window capacity
|
||||
sw := s.(*slidingWindow)
|
||||
data := append(sw.prev.count, sw.curr.count...)
|
||||
|
||||
sums := slidingSums(data, sw.numIntervals)
|
||||
|
||||
for _, sum := range sums {
|
||||
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity)
|
||||
}
|
||||
}
|
||||
|
||||
32
src/internal/common/limiters/token_bucket.go
Normal file
32
src/internal/common/limiters/token_bucket.go
Normal file
@ -0,0 +1,32 @@
|
||||
package limiters
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
var _ Limiter = &TokenBucket{}
|
||||
|
||||
// Thin wrapper around the golang.org/x/time/rate token bucket rate limiter.
|
||||
type TokenBucket struct {
|
||||
*rate.Limiter
|
||||
}
|
||||
|
||||
func NewTokenBucketLimiter(r int, burst int) Limiter {
|
||||
return &TokenBucket{
|
||||
Limiter: rate.NewLimiter(rate.Limit(r), burst),
|
||||
}
|
||||
}
|
||||
|
||||
func (tb *TokenBucket) Wait(ctx context.Context) error {
|
||||
return tb.Limiter.Wait(ctx)
|
||||
}
|
||||
|
||||
func (tb *TokenBucket) WaitN(ctx context.Context, n int) error {
|
||||
return tb.Limiter.WaitN(ctx, n)
|
||||
}
|
||||
|
||||
// Reset and shutdown are no-ops for the token bucket limiter.
|
||||
func (tb *TokenBucket) Reset() {}
|
||||
func (tb *TokenBucket) Shutdown() {}
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/m365/service/sharepoint"
|
||||
"github.com/alcionai/corso/src/internal/operations/inject"
|
||||
bupMD "github.com/alcionai/corso/src/pkg/backup/metadata"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/count"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/filters"
|
||||
@ -46,8 +47,6 @@ func (ctrl *Controller) ProduceBackupCollections(
|
||||
diagnostics.Index("service", bpc.Selector.PathService().String()))
|
||||
defer end()
|
||||
|
||||
ctx = graph.BindRateLimiterConfig(ctx, graph.LimiterCfg{Service: service})
|
||||
|
||||
// Limit the max number of active requests to graph from this collection.
|
||||
bpc.Options.Parallelism.ItemFetch = graph.Parallelism(service).
|
||||
ItemOverride(ctx, bpc.Options.Parallelism.ItemFetch)
|
||||
@ -226,3 +225,26 @@ func (ctrl *Controller) GetMetadataPaths(
|
||||
|
||||
return paths, nil
|
||||
}
|
||||
|
||||
func (ctrl *Controller) SetRateLimiter(
|
||||
ctx context.Context,
|
||||
service path.ServiceType,
|
||||
options control.Options,
|
||||
) context.Context {
|
||||
// Use sliding window limiter for Exchange if the feature is not explicitly
|
||||
// disabled. For other services we always use token bucket limiter.
|
||||
enableSlidingLim := false
|
||||
if service == path.ExchangeService &&
|
||||
!options.ToggleFeatures.DisableSlidingWindowLimiter {
|
||||
enableSlidingLim = true
|
||||
}
|
||||
|
||||
ctx = graph.BindRateLimiterConfig(
|
||||
ctx,
|
||||
graph.LimiterCfg{
|
||||
Service: service,
|
||||
EnableSlidingLimiter: enableSlidingLim,
|
||||
})
|
||||
|
||||
return ctx
|
||||
}
|
||||
|
||||
@ -589,14 +589,6 @@ func (oc *Collection) streamDriveItem(
|
||||
parentPath)
|
||||
|
||||
ctx = clues.Add(ctx, "item_info", itemInfo)
|
||||
// Drive content download requests are also rate limited by graph api.
|
||||
// Ensure that this request goes through the drive limiter & not the default
|
||||
// limiter.
|
||||
ctx = graph.BindRateLimiterConfig(
|
||||
ctx,
|
||||
graph.LimiterCfg{
|
||||
Service: path.OneDriveService,
|
||||
})
|
||||
|
||||
if isFile {
|
||||
dataSuffix := metadata.DataFileSuffix
|
||||
|
||||
@ -101,3 +101,11 @@ func (ctrl Controller) PopulateProtectedResourceIDAndName(
|
||||
return idname.NewProvider(ctrl.ProtectedResourceID, ctrl.ProtectedResourceName),
|
||||
ctrl.ProtectedResourceErr
|
||||
}
|
||||
|
||||
func (ctrl Controller) SetRateLimiter(
|
||||
ctx context.Context,
|
||||
service path.ServiceType,
|
||||
options control.Options,
|
||||
) context.Context {
|
||||
return ctx
|
||||
}
|
||||
|
||||
@ -205,6 +205,9 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
||||
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
|
||||
defer flushMetrics()
|
||||
|
||||
// Select an appropriate rate limiter for the service.
|
||||
ctx = op.bp.SetRateLimiter(ctx, op.Selectors.PathService(), op.Options)
|
||||
|
||||
// Check if the protected resource has the service enabled in order for us
|
||||
// to run a backup.
|
||||
enabled, err := op.bp.IsServiceEnabled(
|
||||
@ -319,6 +322,12 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
||||
logger.Ctx(ctx).Infow("completed backup", "results", op.Results)
|
||||
}
|
||||
|
||||
// For exchange, rate limits are enforced on a mailbox level. Reset the
|
||||
// rate limiter so that it doesn't accidentally throttle following mailboxes.
|
||||
// This is a no-op if we are using token bucket limiter since it refreshes
|
||||
// tokens on a fixed per second basis.
|
||||
graph.ResetLimiter(ctx)
|
||||
|
||||
return op.Errors.Failure()
|
||||
}
|
||||
|
||||
|
||||
@ -44,6 +44,14 @@ type (
|
||||
) ([]path.RestorePaths, error)
|
||||
|
||||
Wait() *data.CollectionStats
|
||||
|
||||
// SetRateLimiter selects a rate limiter type for the service being
|
||||
// backed up and binds it to the context.
|
||||
SetRateLimiter(
|
||||
ctx context.Context,
|
||||
service path.ServiceType,
|
||||
options control.Options,
|
||||
) context.Context
|
||||
}
|
||||
|
||||
RestoreConsumer interface {
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
kinject "github.com/alcionai/corso/src/internal/kopia/inject"
|
||||
"github.com/alcionai/corso/src/internal/m365"
|
||||
"github.com/alcionai/corso/src/internal/operations/inject"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/count"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
@ -69,3 +70,11 @@ func (mbp mockBackupProducer) GetMetadataPaths(
|
||||
ctrl := m365.Controller{}
|
||||
return ctrl.GetMetadataPaths(ctx, r, base, errs)
|
||||
}
|
||||
|
||||
func (mbp mockBackupProducer) SetRateLimiter(
|
||||
ctx context.Context,
|
||||
service path.ServiceType,
|
||||
options control.Options,
|
||||
) context.Context {
|
||||
return ctx
|
||||
}
|
||||
|
||||
@ -91,4 +91,11 @@ type Toggles struct {
|
||||
// the protected resource. PreviewBackups are used to demonstrate value by
|
||||
// being quick to create.
|
||||
PreviewBackup bool `json:"previewBackup"`
|
||||
|
||||
// DisableSlidingWindowLimiter disables the experimental sliding window rate
|
||||
// limiter for graph API requests. This is only relevant for exchange backups.
|
||||
// Exchange restores continue to use the default token bucket rate limiter.
|
||||
// Setting this flag switches exchange backups to use the default token bucket
|
||||
// rate limiter.
|
||||
DisableSlidingWindowLimiter bool `json:"disableSlidingWindowLimiter"`
|
||||
}
|
||||
|
||||
@ -9,8 +9,8 @@ import (
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
khttp "github.com/microsoft/kiota-http-go"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/limiters"
|
||||
"github.com/alcionai/corso/src/pkg/count"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
@ -88,6 +88,14 @@ const (
|
||||
// but doing so risks timeouts. It's better to give the limits breathing room.
|
||||
defaultPerSecond = 16 // 16 * 60 * 10 = 9600
|
||||
defaultMaxCap = 200 // real cap is 10k-per-10-minutes
|
||||
|
||||
// Sliding window limiter for exchange service. We are restricting it to 9600 per
|
||||
// 10 mins to give the limits some breathing room. It can be slowly increased
|
||||
// over time to get closer to the 10k limit.
|
||||
exchWindow = 610 * time.Second
|
||||
exchSlideInterval = 1 * time.Second
|
||||
exchCapacity = 9600
|
||||
|
||||
// since drive runs on a per-minute, rather than per-10-minute bucket, we have
|
||||
// to keep the max cap equal to the per-second cap. A large maxCap pool (say,
|
||||
// 1200, similar to the per-minute cap) would allow us to make a flood of 2400
|
||||
@ -99,13 +107,27 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap)
|
||||
// also used as the exchange service limiter
|
||||
defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap)
|
||||
driveLimiter = limiters.NewTokenBucketLimiter(drivePerSecond, driveMaxCap)
|
||||
// Exchange service token bucket rate limiter
|
||||
defaultLimiter = limiters.NewTokenBucketLimiter(defaultPerSecond, defaultMaxCap)
|
||||
|
||||
// Exchange service sliding window rate limiter.
|
||||
//
|
||||
// TODO(pandeyabs): We are swallowing the error here. It's a limitation of
|
||||
// using global limiters. For now, we'll catch any errors in the regression
|
||||
// test until we look into managing limiter life cycles.
|
||||
exchSlidingLimiter, _ = limiters.NewSlidingWindowLimiter(
|
||||
exchWindow,
|
||||
exchSlideInterval,
|
||||
exchCapacity)
|
||||
)
|
||||
|
||||
type LimiterCfg struct {
|
||||
Service path.ServiceType
|
||||
// Experimental flag to enable sliding window rate limiter. It should only be
|
||||
// enabled for Exchange backups. It's set to false by default to prevent accidental
|
||||
// enablement for non backup operations and other services.
|
||||
EnableSlidingLimiter bool
|
||||
}
|
||||
|
||||
type limiterCfgKey string
|
||||
@ -116,19 +138,27 @@ func BindRateLimiterConfig(ctx context.Context, lc LimiterCfg) context.Context {
|
||||
return context.WithValue(ctx, limiterCfgCtxKey, lc)
|
||||
}
|
||||
|
||||
func ctxLimiter(ctx context.Context) *rate.Limiter {
|
||||
func ctxLimiter(ctx context.Context) limiters.Limiter {
|
||||
lc, ok := extractRateLimiterConfig(ctx)
|
||||
if !ok {
|
||||
return defaultLimiter
|
||||
}
|
||||
|
||||
lim := defaultLimiter
|
||||
|
||||
switch lc.Service {
|
||||
// FIXME: Handle based on category once we add chat backup
|
||||
case path.OneDriveService, path.SharePointService, path.GroupsService:
|
||||
return driveLimiter
|
||||
default:
|
||||
return defaultLimiter
|
||||
lim = driveLimiter
|
||||
case path.ExchangeService:
|
||||
if lc.EnableSlidingLimiter {
|
||||
// Return sliding window limiter for Exchange if enabled. Otherwise,
|
||||
// return the default token bucket limiter.
|
||||
lim = exchSlidingLimiter
|
||||
}
|
||||
}
|
||||
|
||||
return lim
|
||||
}
|
||||
|
||||
func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) {
|
||||
@ -193,6 +223,11 @@ func QueueRequest(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func ResetLimiter(ctx context.Context) {
|
||||
limiter := ctxLimiter(ctx)
|
||||
limiter.Reset()
|
||||
}
|
||||
|
||||
// RateLimiterMiddleware is used to ensure we don't overstep per-min request limits.
|
||||
type RateLimiterMiddleware struct{}
|
||||
|
||||
|
||||
@ -19,8 +19,8 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/limiters"
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/internal/tester/tconfig"
|
||||
@ -353,10 +353,10 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() {
|
||||
assert.Equal(t, defaultLimiter, ctxLimiter(ctx))
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
service path.ServiceType
|
||||
expectOK require.BoolAssertionFunc
|
||||
expectLimiter *rate.Limiter
|
||||
name string
|
||||
service path.ServiceType
|
||||
enableSlidingLim bool
|
||||
expectLimiter limiters.Limiter
|
||||
}{
|
||||
{
|
||||
name: "exchange",
|
||||
@ -388,12 +388,30 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() {
|
||||
service: path.ServiceType(-1),
|
||||
expectLimiter: defaultLimiter,
|
||||
},
|
||||
{
|
||||
name: "exchange sliding limiter",
|
||||
service: path.ExchangeService,
|
||||
enableSlidingLim: true,
|
||||
expectLimiter: exchSlidingLimiter,
|
||||
},
|
||||
// Sliding limiter flag is ignored for non-exchange services
|
||||
{
|
||||
name: "onedrive with sliding limiter flag set",
|
||||
service: path.OneDriveService,
|
||||
enableSlidingLim: true,
|
||||
expectLimiter: driveLimiter,
|
||||
},
|
||||
}
|
||||
for _, test := range table {
|
||||
suite.Run(test.name, func() {
|
||||
t := suite.T()
|
||||
|
||||
tctx := BindRateLimiterConfig(ctx, LimiterCfg{Service: test.service})
|
||||
tctx := BindRateLimiterConfig(
|
||||
ctx,
|
||||
LimiterCfg{
|
||||
Service: test.service,
|
||||
EnableSlidingLimiter: test.enableSlidingLim,
|
||||
})
|
||||
lc, ok := extractRateLimiterConfig(tctx)
|
||||
require.True(t, ok, "found rate limiter in ctx")
|
||||
assert.Equal(t, test.service, lc.Service)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user