Compare commits

...

11 Commits

Author SHA1 Message Date
Abhishek Pandey
314fb5f847 610 2023-11-14 18:40:16 -08:00
Abhishek Pandey
ea9f6d97b0 Address feedback 2023-11-14 18:32:14 -08:00
Abhishek Pandey
f6b83f3e7e Integrate w rebase 2023-11-14 17:45:54 -08:00
Abhishek Pandey
4e117b5703 Integrate new sliding window limiter into exchange backup flows 2023-11-14 17:45:02 -08:00
Abhishek Pandey
3b6d4bfe8b Address feedback 2023-11-14 17:39:06 -08:00
Abhishek Pandey
5bab966f67 Move rate limiter ctx binding to backup operation scope 2023-11-14 03:10:10 -08:00
Abhishek Pandey
4b959098a8 Move various limiter types behind a common interface 2023-11-14 01:36:46 -08:00
Abhishek Pandey
42ae33c436 Add a control option to disable sliding window limiter 2023-11-14 00:43:01 -08:00
Abhishek Pandey
ac0fa41c9c Slight test fix 2023-11-14 00:10:31 -08:00
Abhishek Pandey
cd2181193b Add WaitN support 2023-11-13 23:48:31 -08:00
Abhishek Pandey
910767c7d2 Add limiter reset functionality 2023-11-13 23:32:00 -08:00
18 changed files with 454 additions and 97 deletions

View File

@ -85,6 +85,7 @@ func addExchangeCommands(cmd *cobra.Command) *cobra.Command {
flags.AddDisableConcurrencyLimiterFlag(c)
flags.AddDeltaPageSizeFlag(c)
flags.AddGenericBackupFlags(c)
flags.AddDisableSlidingWindowLimiterFlag(c)
case listCommand:
c, fs = utils.AddCommand(cmd, exchangeListCmd())

View File

@ -109,6 +109,7 @@ func (suite *ExchangeUnitSuite) TestBackupCreateFlags() {
"--" + flags.DisableDeltaFN,
"--" + flags.EnableImmutableIDFN,
"--" + flags.DisableConcurrencyLimiterFN,
"--" + flags.DisableSlidingWindowLimiterFN,
},
flagsTD.PreparedGenericBackupFlags(),
flagsTD.PreparedProviderFlags(),
@ -126,6 +127,7 @@ func (suite *ExchangeUnitSuite) TestBackupCreateFlags() {
assert.True(t, co.ToggleFeatures.DisableDelta)
assert.True(t, co.ToggleFeatures.ExchangeImmutableIDs)
assert.True(t, co.ToggleFeatures.DisableConcurrencyLimiter)
assert.True(t, co.ToggleFeatures.DisableSlidingWindowLimiter)
flagsTD.AssertGenericBackupFlags(t, cmd)
flagsTD.AssertProviderFlags(t, cmd)
flagsTD.AssertStorageFlags(t, cmd)

View File

@ -5,22 +5,23 @@ import (
)
const (
AlertsFN = "alerts"
DeltaPageSizeFN = "delta-page-size"
DisableConcurrencyLimiterFN = "disable-concurrency-limiter"
DisableDeltaFN = "disable-delta"
DisableIncrementalsFN = "disable-incrementals"
ForceItemDataDownloadFN = "force-item-data-download"
EnableImmutableIDFN = "enable-immutable-id"
FailFastFN = "fail-fast"
FailedItemsFN = "failed-items"
FetchParallelismFN = "fetch-parallelism"
NoStatsFN = "no-stats"
RecoveredErrorsFN = "recovered-errors"
NoPermissionsFN = "no-permissions"
RunModeFN = "run-mode"
SkippedItemsFN = "skipped-items"
SkipReduceFN = "skip-reduce"
AlertsFN = "alerts"
DeltaPageSizeFN = "delta-page-size"
DisableConcurrencyLimiterFN = "disable-concurrency-limiter"
DisableDeltaFN = "disable-delta"
DisableIncrementalsFN = "disable-incrementals"
ForceItemDataDownloadFN = "force-item-data-download"
EnableImmutableIDFN = "enable-immutable-id"
FailFastFN = "fail-fast"
FailedItemsFN = "failed-items"
FetchParallelismFN = "fetch-parallelism"
NoStatsFN = "no-stats"
RecoveredErrorsFN = "recovered-errors"
NoPermissionsFN = "no-permissions"
RunModeFN = "run-mode"
SkippedItemsFN = "skipped-items"
SkipReduceFN = "skip-reduce"
DisableSlidingWindowLimiterFN = "disable-sliding-limiter"
)
var (
@ -39,9 +40,10 @@ var (
NoStatsFV bool
// RunMode describes the type of run, such as:
// flagtest, dry, run. Should default to 'run'.
RunModeFV string
NoPermissionsFV bool
SkipReduceFV bool
RunModeFV string
NoPermissionsFV bool
SkipReduceFV bool
DisableSlidingWindowLimiterFV bool
)
// well-known flag values
@ -174,3 +176,18 @@ func AddRunModeFlag(cmd *cobra.Command, persistent bool) {
fs.StringVar(&RunModeFV, RunModeFN, "run", "What mode to run: dry, test, run. Defaults to run.")
cobra.CheckErr(fs.MarkHidden(RunModeFN))
}
// AddDisableSlidingWindowLimiterFN disables the experimental sliding window rate
// limiter for graph API requests. This is only relevant for exchange backups.
// Exchange restores continue to use the default token bucket rate limiter.
// Setting this flag switches exchange backups to use the default token bucket
// rate limiter.
func AddDisableSlidingWindowLimiterFlag(cmd *cobra.Command) {
fs := cmd.Flags()
fs.BoolVar(
&DisableSlidingWindowLimiterFV,
DisableSlidingWindowLimiterFN,
false,
"Disable sliding window rate limiter. Default: false")
cobra.CheckErr(fs.MarkHidden(DisableSlidingWindowLimiterFN))
}

View File

@ -27,6 +27,7 @@ func Control() control.Options {
opt.ToggleFeatures.DisableDelta = flags.DisableDeltaFV
opt.ToggleFeatures.ExchangeImmutableIDs = flags.EnableImmutableIDFV
opt.ToggleFeatures.DisableConcurrencyLimiter = flags.DisableConcurrencyLimiterFV
opt.ToggleFeatures.DisableSlidingWindowLimiter = flags.DisableSlidingWindowLimiterFV
opt.Parallelism.ItemFetch = flags.FetchParallelismFV
return opt

View File

@ -37,6 +37,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
assert.Equal(t, 2, flags.FetchParallelismFV, flags.FetchParallelismFN)
assert.True(t, flags.DisableConcurrencyLimiterFV, flags.DisableConcurrencyLimiterFN)
assert.Equal(t, 499, flags.DeltaPageSizeFV, flags.DeltaPageSizeFN)
assert.True(t, flags.DisableSlidingWindowLimiterFV, flags.DisableSlidingWindowLimiterFN)
},
}
@ -52,6 +53,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
flags.AddFetchParallelismFlag(cmd)
flags.AddDisableConcurrencyLimiterFlag(cmd)
flags.AddDeltaPageSizeFlag(cmd)
flags.AddDisableSlidingWindowLimiterFlag(cmd)
// Test arg parsing for few args
cmd.SetArgs([]string{
@ -66,6 +68,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
"--" + flags.FetchParallelismFN, "2",
"--" + flags.DisableConcurrencyLimiterFN,
"--" + flags.DeltaPageSizeFN, "499",
"--" + flags.DisableSlidingWindowLimiterFN,
})
err := cmd.Execute()

View File

@ -4,5 +4,7 @@ import "context"
type Limiter interface {
Wait(ctx context.Context) error
WaitN(ctx context.Context, n int) error
Shutdown()
Reset()
}

View File

@ -47,6 +47,11 @@ type slidingWindow struct {
// mu synchronizes access to the curr and prev windows
mu sync.Mutex
// requestMu synchronizes access between requests. This is especially needed
// for WaitN to ensure that all N tokens are granted to the same request.
// It also allows us to reset the limiter transparently without affecting
// any pending requests.
requestMu sync.Mutex
// stopTicker stops the recurring slide ticker
stopTicker chan struct{}
closeOnce sync.Once
@ -84,16 +89,31 @@ func NewSlidingWindowLimiter(
}
// Wait blocks a request until a token is available or the context is cancelled.
// TODO(pandeyabs): Implement WaitN.
func (s *slidingWindow) Wait(ctx context.Context) error {
select {
case <-ctx.Done():
return clues.Stack(ctx.Err())
case <-s.permits:
s.mu.Lock()
defer s.mu.Unlock()
return s.WaitN(ctx, 1)
}
s.curr.count[s.currentInterval]++
// WaitN blocks a request until n tokens are available or the context gets
// cancelled. WaitN should be called with n <= capacity otherwise it will block
// forever.
//
// TODO(pandeyabs): Enforce n <= capacity check. Not adding it right now because
// we are relying on capacity = 0 for ctx cancellation test, which would need
// some refactoring.
func (s *slidingWindow) WaitN(ctx context.Context, n int) error {
// Acquire request mutex and slide mutex in order.
s.requestMu.Lock()
defer s.requestMu.Unlock()
for i := 0; i < n; i++ {
select {
case <-ctx.Done():
return clues.Stack(ctx.Err())
case <-s.permits:
s.mu.Lock()
s.curr.count[s.currentInterval]++
s.mu.Unlock()
}
}
return nil
@ -107,8 +127,42 @@ func (s *slidingWindow) Shutdown() {
})
}
// Reset resets the limiter to its initial state. Any pending requests will be
// cancelled. Reset should only be called when the limiter is not in use.
// No need to shutdown slide goroutine. prev and curr
func (s *slidingWindow) Reset() {
// Acquire request mutex and slide mutex in order like Wait does.
s.requestMu.Lock()
defer s.requestMu.Unlock()
s.mu.Lock()
defer s.mu.Unlock()
// Clear all existing counts
s.prev = fixedWindow{
count: make([]int, s.numIntervals),
}
s.curr = fixedWindow{
count: make([]int, s.numIntervals),
}
// Reset permits
close(s.permits)
s.permits = make(chan token, s.capacity)
// Prefill permits to allow tokens to be granted immediately
for i := 0; i < s.capacity; i++ {
s.permits <- token{}
}
}
// initialize starts the slide goroutine and prefills tokens to full capacity.
func (s *slidingWindow) initialize() {
// Hold request mutex to ensure that no requests get processed until
// we are done initializing.
s.requestMu.Lock()
defer s.requestMu.Unlock()
// Ok to not hold the mutex here since nothing else is running yet.
s.nextInterval()

View File

@ -81,57 +81,83 @@ func (suite *SlidingWindowUnitTestSuite) TestWaitBasic() {
}
// TestWaitSliding tests the sliding window functionality of the limiter with
// time distributed Wait() calls.
func (suite *SlidingWindowUnitTestSuite) TestWaitSliding() {
var (
t = suite.T()
windowSize = 1 * time.Second
slideInterval = 10 * time.Millisecond
capacity = 100
// Test will run for duration of 2 windowSize.
numRequests = 2 * capacity
wg sync.WaitGroup
)
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t)
defer flush()
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
require.NoError(t, err)
// Make concurrent requests to the limiter
for i := 0; i < numRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Sleep for a random duration to spread out requests over multiple slide
// intervals & windows, so that we can test the sliding window logic better.
// Without this, the requests will be bunched up in the very first intervals
// of the 2 windows. Rest of the intervals will be empty.
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
err := s.Wait(ctx)
require.NoError(t, err)
}()
// time distributed WaitN() calls.
func (suite *SlidingWindowUnitTestSuite) TestWaitNSliding() {
tests := []struct {
Name string
windowSize time.Duration
slideInterval time.Duration
capacity int
numRequests int
n int
}{
{
Name: "Request 1 token each",
windowSize: 100 * time.Millisecond,
slideInterval: 10 * time.Millisecond,
capacity: 100,
numRequests: 200,
n: 1,
},
{
Name: "Request N tokens each",
windowSize: 100 * time.Millisecond,
slideInterval: 10 * time.Millisecond,
capacity: 1000,
numRequests: 200,
n: 10,
},
}
wg.Wait()
// Shutdown the ticker before accessing the internal limiter state.
s.Shutdown()
for _, test := range tests {
suite.Run(test.Name, func() {
t := suite.T()
// Verify that number of requests allowed in each window is less than or equal
// to window capacity
sw := s.(*slidingWindow)
data := append(sw.prev.count, sw.curr.count...)
defer goleak.VerifyNone(t)
sums := slidingSums(data, sw.numIntervals)
ctx, flush := tester.NewContext(t)
defer flush()
for _, sum := range sums {
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity)
s, err := NewSlidingWindowLimiter(test.windowSize, test.slideInterval, test.capacity)
require.NoError(t, err)
var wg sync.WaitGroup
// Make concurrent requests to the limiter
for i := 0; i < test.numRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Sleep for a random duration to spread out requests over
// multiple slide intervals & windows, so that we can test
// the sliding window logic better.
// Without this, the requests will be bunched up in the very
// first interval of the 2 windows. Rest of the intervals
// will be empty.
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
err := s.WaitN(ctx, test.n)
require.NoError(t, err)
}()
}
wg.Wait()
// Shutdown the ticker before accessing the internal limiter state.
s.Shutdown()
// Verify that number of requests allowed in each window is less than or equal
// to window capacity
sw := s.(*slidingWindow)
data := append(sw.prev.count, sw.curr.count...)
sums := slidingSums(data, sw.numIntervals)
for _, sum := range sums {
require.True(t, sum <= test.capacity, "sum: %d, capacity: %d", sum, test.capacity)
}
})
}
}
@ -283,3 +309,114 @@ func (suite *SlidingWindowUnitTestSuite) TestShutdown() {
// Second call to Shutdown() should be a no-op.
s.Shutdown()
}
// TestReset tests if limiter state is cleared and all tokens are available for
// use post reset.
func (suite *SlidingWindowUnitTestSuite) TestReset() {
var (
t = suite.T()
windowSize = 100 * time.Millisecond
slideInterval = 10 * time.Millisecond
capacity = 10
numRequests = capacity
wg sync.WaitGroup
)
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t)
defer flush()
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
require.NoError(t, err)
// Make some requests to the limiter.
for i := 0; i < numRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := s.Wait(ctx)
require.NoError(t, err)
}()
}
wg.Wait()
// Reset the limiter.
s.Reset()
// Shutdown the ticker before accessing the internal limiter state.
s.Shutdown()
sw := s.(*slidingWindow)
// Check if state is cleared, and all tokens are available for use post reset.
require.Equal(t, capacity, len(sw.permits))
for i := 0; i < sw.numIntervals; i++ {
require.Equal(t, 0, sw.prev.count[i])
require.Equal(t, 0, sw.curr.count[i])
}
}
// TestResetDuringActiveRequests tests if reset is transparent to any active
// requests and they are not affected. It also checks that limiter stays
// within capacity limits post reset.
func (suite *SlidingWindowUnitTestSuite) TestResetDuringActiveRequests() {
var (
t = suite.T()
windowSize = 100 * time.Millisecond
slideInterval = 10 * time.Millisecond
capacity = 10
numRequests = 10 * capacity
wg sync.WaitGroup
)
defer goleak.VerifyNone(t)
ctx, flush := tester.NewContext(t)
defer flush()
s, err := NewSlidingWindowLimiter(windowSize, slideInterval, capacity)
require.NoError(t, err)
// Make some requests to the limiter as well as reset it concurrently
// in 10:1 request to reset ratio.
for i := 0; i < numRequests; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := s.Wait(ctx)
require.NoError(t, err)
}()
// Launch a reset every 10th iteration.
if i%10 == 0 {
wg.Add(1)
go func() {
defer wg.Done()
s.Reset()
}()
}
}
wg.Wait()
// Shutdown the ticker before accessing the internal limiter state.
s.Shutdown()
// Verify that number of requests allowed in each window is less than or equal
// to window capacity
sw := s.(*slidingWindow)
data := append(sw.prev.count, sw.curr.count...)
sums := slidingSums(data, sw.numIntervals)
for _, sum := range sums {
require.True(t, sum <= capacity, "sum: %d, capacity: %d", sum, capacity)
}
}

View File

@ -0,0 +1,32 @@
package limiters
import (
"context"
"golang.org/x/time/rate"
)
var _ Limiter = &TokenBucket{}
// Thin wrapper around the golang.org/x/time/rate token bucket rate limiter.
type TokenBucket struct {
*rate.Limiter
}
func NewTokenBucketLimiter(r int, burst int) Limiter {
return &TokenBucket{
Limiter: rate.NewLimiter(rate.Limit(r), burst),
}
}
func (tb *TokenBucket) Wait(ctx context.Context) error {
return tb.Limiter.Wait(ctx)
}
func (tb *TokenBucket) WaitN(ctx context.Context, n int) error {
return tb.Limiter.WaitN(ctx, n)
}
// Reset and shutdown are no-ops for the token bucket limiter.
func (tb *TokenBucket) Reset() {}
func (tb *TokenBucket) Shutdown() {}

View File

@ -15,6 +15,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/service/sharepoint"
"github.com/alcionai/corso/src/internal/operations/inject"
bupMD "github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/filters"
@ -46,8 +47,6 @@ func (ctrl *Controller) ProduceBackupCollections(
diagnostics.Index("service", bpc.Selector.PathService().String()))
defer end()
ctx = graph.BindRateLimiterConfig(ctx, graph.LimiterCfg{Service: service})
// Limit the max number of active requests to graph from this collection.
bpc.Options.Parallelism.ItemFetch = graph.Parallelism(service).
ItemOverride(ctx, bpc.Options.Parallelism.ItemFetch)
@ -226,3 +225,26 @@ func (ctrl *Controller) GetMetadataPaths(
return paths, nil
}
func (ctrl *Controller) SetRateLimiter(
ctx context.Context,
service path.ServiceType,
options control.Options,
) context.Context {
// Use sliding window limiter for Exchange if the feature is not explicitly
// disabled. For other services we always use token bucket limiter.
enableSlidingLim := false
if service == path.ExchangeService &&
!options.ToggleFeatures.DisableSlidingWindowLimiter {
enableSlidingLim = true
}
ctx = graph.BindRateLimiterConfig(
ctx,
graph.LimiterCfg{
Service: service,
EnableSlidingLimiter: enableSlidingLim,
})
return ctx
}

View File

@ -589,14 +589,6 @@ func (oc *Collection) streamDriveItem(
parentPath)
ctx = clues.Add(ctx, "item_info", itemInfo)
// Drive content download requests are also rate limited by graph api.
// Ensure that this request goes through the drive limiter & not the default
// limiter.
ctx = graph.BindRateLimiterConfig(
ctx,
graph.LimiterCfg{
Service: path.OneDriveService,
})
if isFile {
dataSuffix := metadata.DataFileSuffix

View File

@ -101,3 +101,11 @@ func (ctrl Controller) PopulateProtectedResourceIDAndName(
return idname.NewProvider(ctrl.ProtectedResourceID, ctrl.ProtectedResourceName),
ctrl.ProtectedResourceErr
}
func (ctrl Controller) SetRateLimiter(
ctx context.Context,
service path.ServiceType,
options control.Options,
) context.Context {
return ctx
}

View File

@ -205,6 +205,9 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
defer flushMetrics()
// Select an appropriate rate limiter for the service.
ctx = op.bp.SetRateLimiter(ctx, op.Selectors.PathService(), op.Options)
// Check if the protected resource has the service enabled in order for us
// to run a backup.
enabled, err := op.bp.IsServiceEnabled(
@ -319,6 +322,12 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
logger.Ctx(ctx).Infow("completed backup", "results", op.Results)
}
// For exchange, rate limits are enforced on a mailbox level. Reset the
// rate limiter so that it doesn't accidentally throttle following mailboxes.
// This is a no-op if we are using token bucket limiter since it refreshes
// tokens on a fixed per second basis.
graph.ResetLimiter(ctx)
return op.Errors.Failure()
}

View File

@ -44,6 +44,14 @@ type (
) ([]path.RestorePaths, error)
Wait() *data.CollectionStats
// SetRateLimiter selects a rate limiter type for the service being
// backed up and binds it to the context.
SetRateLimiter(
ctx context.Context,
service path.ServiceType,
options control.Options,
) context.Context
}
RestoreConsumer interface {

View File

@ -10,6 +10,7 @@ import (
kinject "github.com/alcionai/corso/src/internal/kopia/inject"
"github.com/alcionai/corso/src/internal/m365"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
@ -69,3 +70,11 @@ func (mbp mockBackupProducer) GetMetadataPaths(
ctrl := m365.Controller{}
return ctrl.GetMetadataPaths(ctx, r, base, errs)
}
func (mbp mockBackupProducer) SetRateLimiter(
ctx context.Context,
service path.ServiceType,
options control.Options,
) context.Context {
return ctx
}

View File

@ -91,4 +91,11 @@ type Toggles struct {
// the protected resource. PreviewBackups are used to demonstrate value by
// being quick to create.
PreviewBackup bool `json:"previewBackup"`
// DisableSlidingWindowLimiter disables the experimental sliding window rate
// limiter for graph API requests. This is only relevant for exchange backups.
// Exchange restores continue to use the default token bucket rate limiter.
// Setting this flag switches exchange backups to use the default token bucket
// rate limiter.
DisableSlidingWindowLimiter bool `json:"disableSlidingWindowLimiter"`
}

View File

@ -9,8 +9,8 @@ import (
"github.com/alcionai/clues"
khttp "github.com/microsoft/kiota-http-go"
"golang.org/x/time/rate"
"github.com/alcionai/corso/src/internal/common/limiters"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -88,6 +88,14 @@ const (
// but doing so risks timeouts. It's better to give the limits breathing room.
defaultPerSecond = 16 // 16 * 60 * 10 = 9600
defaultMaxCap = 200 // real cap is 10k-per-10-minutes
// Sliding window limiter for exchange service. We are restricting it to 9600 per
// 10 mins to give the limits some breathing room. It can be slowly increased
// over time to get closer to the 10k limit.
exchWindow = 610 * time.Second
exchSlideInterval = 1 * time.Second
exchCapacity = 9600
// since drive runs on a per-minute, rather than per-10-minute bucket, we have
// to keep the max cap equal to the per-second cap. A large maxCap pool (say,
// 1200, similar to the per-minute cap) would allow us to make a flood of 2400
@ -99,13 +107,27 @@ const (
)
var (
driveLimiter = rate.NewLimiter(drivePerSecond, driveMaxCap)
// also used as the exchange service limiter
defaultLimiter = rate.NewLimiter(defaultPerSecond, defaultMaxCap)
driveLimiter = limiters.NewTokenBucketLimiter(drivePerSecond, driveMaxCap)
// Exchange service token bucket rate limiter
defaultLimiter = limiters.NewTokenBucketLimiter(defaultPerSecond, defaultMaxCap)
// Exchange service sliding window rate limiter.
//
// TODO(pandeyabs): We are swallowing the error here. It's a limitation of
// using global limiters. For now, we'll catch any errors in the regression
// test until we look into managing limiter life cycles.
exchSlidingLimiter, _ = limiters.NewSlidingWindowLimiter(
exchWindow,
exchSlideInterval,
exchCapacity)
)
type LimiterCfg struct {
Service path.ServiceType
// Experimental flag to enable sliding window rate limiter. It should only be
// enabled for Exchange backups. It's set to false by default to prevent accidental
// enablement for non backup operations and other services.
EnableSlidingLimiter bool
}
type limiterCfgKey string
@ -116,19 +138,27 @@ func BindRateLimiterConfig(ctx context.Context, lc LimiterCfg) context.Context {
return context.WithValue(ctx, limiterCfgCtxKey, lc)
}
func ctxLimiter(ctx context.Context) *rate.Limiter {
func ctxLimiter(ctx context.Context) limiters.Limiter {
lc, ok := extractRateLimiterConfig(ctx)
if !ok {
return defaultLimiter
}
lim := defaultLimiter
switch lc.Service {
// FIXME: Handle based on category once we add chat backup
case path.OneDriveService, path.SharePointService, path.GroupsService:
return driveLimiter
default:
return defaultLimiter
lim = driveLimiter
case path.ExchangeService:
if lc.EnableSlidingLimiter {
// Return sliding window limiter for Exchange if enabled. Otherwise,
// return the default token bucket limiter.
lim = exchSlidingLimiter
}
}
return lim
}
func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) {
@ -193,6 +223,11 @@ func QueueRequest(ctx context.Context) {
}
}
func ResetLimiter(ctx context.Context) {
limiter := ctxLimiter(ctx)
limiter.Reset()
}
// RateLimiterMiddleware is used to ensure we don't overstep per-min request limits.
type RateLimiterMiddleware struct{}

View File

@ -19,8 +19,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/time/rate"
"github.com/alcionai/corso/src/internal/common/limiters"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
@ -353,10 +353,10 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() {
assert.Equal(t, defaultLimiter, ctxLimiter(ctx))
table := []struct {
name string
service path.ServiceType
expectOK require.BoolAssertionFunc
expectLimiter *rate.Limiter
name string
service path.ServiceType
enableSlidingLim bool
expectLimiter limiters.Limiter
}{
{
name: "exchange",
@ -388,12 +388,30 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() {
service: path.ServiceType(-1),
expectLimiter: defaultLimiter,
},
{
name: "exchange sliding limiter",
service: path.ExchangeService,
enableSlidingLim: true,
expectLimiter: exchSlidingLimiter,
},
// Sliding limiter flag is ignored for non-exchange services
{
name: "onedrive with sliding limiter flag set",
service: path.OneDriveService,
enableSlidingLim: true,
expectLimiter: driveLimiter,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
tctx := BindRateLimiterConfig(ctx, LimiterCfg{Service: test.service})
tctx := BindRateLimiterConfig(
ctx,
LimiterCfg{
Service: test.service,
EnableSlidingLimiter: test.enableSlidingLim,
})
lc, ok := extractRateLimiterConfig(tctx)
require.True(t, ok, "found rate limiter in ctx")
assert.Equal(t, test.service, lc.Service)