From f2102e55f605ae396c82db1c0dfd5953d36971c2 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Thu, 16 Nov 2023 01:58:00 -0800 Subject: [PATCH] Integrate sliding window limiter into exchange backup operation (#4680) 2nd last PR in the chain. Wires up the new sliding limiter into exchange backup create flows. I'll add some e2e regression tests to wrap it up. --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [x] :clock1: Yes, but in a later PR - [ ] :no_entry: No #### Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) * # #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- src/internal/m365/backup.go | 24 ++++++++++ src/internal/m365/mock/connector.go | 8 ++++ src/internal/operations/backup.go | 12 +++-- src/internal/operations/inject/inject.go | 8 ++++ src/internal/operations/inject/mock/inject.go | 9 ++++ .../m365/api/graph/concurrency_middleware.go | 47 +++++++++++++++++-- .../m365/api/graph/middleware_test.go | 28 +++++++++-- 7 files changed, 122 insertions(+), 14 deletions(-) diff --git a/src/internal/m365/backup.go b/src/internal/m365/backup.go index 10e29dab1..5794eee54 100644 --- a/src/internal/m365/backup.go +++ b/src/internal/m365/backup.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/service/sharepoint" "github.com/alcionai/corso/src/internal/operations/inject" bupMD "github.com/alcionai/corso/src/pkg/backup/metadata" + "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/filters" @@ -224,3 +225,26 @@ func (ctrl *Controller) GetMetadataPaths( return paths, nil } + +func (ctrl *Controller) SetRateLimiter( + ctx context.Context, + service path.ServiceType, + options control.Options, +) context.Context { + // Use sliding window limiter for Exchange if the feature is not explicitly + // disabled. For other services we always use token bucket limiter. + enableSlidingLim := false + if service == path.ExchangeService && + !options.ToggleFeatures.DisableSlidingWindowLimiter { + enableSlidingLim = true + } + + ctx = graph.BindRateLimiterConfig( + ctx, + graph.LimiterCfg{ + Service: service, + EnableSlidingLimiter: enableSlidingLim, + }) + + return ctx +} diff --git a/src/internal/m365/mock/connector.go b/src/internal/m365/mock/connector.go index a131fed01..50c2fd092 100644 --- a/src/internal/m365/mock/connector.go +++ b/src/internal/m365/mock/connector.go @@ -101,3 +101,11 @@ func (ctrl Controller) PopulateProtectedResourceIDAndName( return idname.NewProvider(ctrl.ProtectedResourceID, ctrl.ProtectedResourceName), ctrl.ProtectedResourceErr } + +func (ctrl Controller) SetRateLimiter( + ctx context.Context, + service path.ServiceType, + options control.Options, +) context.Context { + return ctx +} diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 4b9699e7a..28a9a152f 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -206,11 +206,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { ctx = clues.AddTrace(ctx) // Select an appropriate rate limiter for the service. - ctx = graph.BindRateLimiterConfig( - ctx, - graph.LimiterCfg{ - Service: op.Selectors.PathService(), - }) + ctx = op.bp.SetRateLimiter(ctx, op.Selectors.PathService(), op.Options) // Check if the protected resource has the service enabled in order for us // to run a backup. @@ -326,6 +322,12 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { logger.Ctx(ctx).Infow("completed backup", "results", op.Results) } + // For exchange, rate limits are enforced on a mailbox level. Reset the + // rate limiter so that it doesn't accidentally throttle following mailboxes. + // This is a no-op if we are using token bucket limiter since it refreshes + // tokens on a fixed per second basis. + graph.ResetLimiter(ctx) + return op.Errors.Failure() } diff --git a/src/internal/operations/inject/inject.go b/src/internal/operations/inject/inject.go index 28713e15a..52f50fce2 100644 --- a/src/internal/operations/inject/inject.go +++ b/src/internal/operations/inject/inject.go @@ -44,6 +44,14 @@ type ( ) ([]path.RestorePaths, error) Wait() *data.CollectionStats + + // SetRateLimiter selects a rate limiter type for the service being + // backed up and binds it to the context. + SetRateLimiter( + ctx context.Context, + service path.ServiceType, + options control.Options, + ) context.Context } RestoreConsumer interface { diff --git a/src/internal/operations/inject/mock/inject.go b/src/internal/operations/inject/mock/inject.go index b558c322d..dba52e273 100644 --- a/src/internal/operations/inject/mock/inject.go +++ b/src/internal/operations/inject/mock/inject.go @@ -10,6 +10,7 @@ import ( kinject "github.com/alcionai/corso/src/internal/kopia/inject" "github.com/alcionai/corso/src/internal/m365" "github.com/alcionai/corso/src/internal/operations/inject" + "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" @@ -69,3 +70,11 @@ func (mbp mockBackupProducer) GetMetadataPaths( ctrl := m365.Controller{} return ctrl.GetMetadataPaths(ctx, r, base, errs) } + +func (mbp mockBackupProducer) SetRateLimiter( + ctx context.Context, + service path.ServiceType, + options control.Options, +) context.Context { + return ctx +} diff --git a/src/pkg/services/m365/api/graph/concurrency_middleware.go b/src/pkg/services/m365/api/graph/concurrency_middleware.go index 0f694ca01..ee9d62f73 100644 --- a/src/pkg/services/m365/api/graph/concurrency_middleware.go +++ b/src/pkg/services/m365/api/graph/concurrency_middleware.go @@ -88,6 +88,14 @@ const ( // but doing so risks timeouts. It's better to give the limits breathing room. defaultPerSecond = 16 // 16 * 60 * 10 = 9600 defaultMaxCap = 200 // real cap is 10k-per-10-minutes + + // Sliding window limiter for exchange service. We are restricting it to 9600 per + // 10 mins to give the limits some breathing room. It can be slowly increased + // over time to get closer to the 10k limit. + exchWindow = 10 * time.Minute + exchSlideInterval = 1 * time.Second + exchCapacity = 9600 + // since drive runs on a per-minute, rather than per-10-minute bucket, we have // to keep the max cap equal to the per-second cap. A large maxCap pool (say, // 1200, similar to the per-minute cap) would allow us to make a flood of 2400 @@ -100,12 +108,26 @@ const ( var ( driveLimiter = limiters.NewTokenBucketLimiter(drivePerSecond, driveMaxCap) - // also used as the exchange service limiter + // Exchange service token bucket rate limiter defaultLimiter = limiters.NewTokenBucketLimiter(defaultPerSecond, defaultMaxCap) + + // Exchange service sliding window rate limiter. + // + // TODO(pandeyabs): We are swallowing the error here. It's a limitation of + // using global limiters. For now, we'll catch any errors in the regression + // test until we look into managing limiter life cycles. + exchSlidingLimiter, _ = limiters.NewSlidingWindowLimiter( + exchWindow, + exchSlideInterval, + exchCapacity) ) type LimiterCfg struct { Service path.ServiceType + // Experimental flag to enable sliding window rate limiter. It should only be + // enabled for Exchange backups. It's set to false by default to prevent accidental + // enablement for non backup operations and other services. + EnableSlidingLimiter bool } type limiterCfgKey string @@ -122,13 +144,21 @@ func ctxLimiter(ctx context.Context) limiters.Limiter { return defaultLimiter } + lim := defaultLimiter + switch lc.Service { // FIXME: Handle based on category once we add chat backup case path.OneDriveService, path.SharePointService, path.GroupsService: - return driveLimiter - default: - return defaultLimiter + lim = driveLimiter + case path.ExchangeService: + if lc.EnableSlidingLimiter { + // Return sliding window limiter for Exchange if enabled. Otherwise, + // return the default token bucket limiter. + lim = exchSlidingLimiter + } } + + return lim } func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) { @@ -193,6 +223,15 @@ func QueueRequest(ctx context.Context) { } } +// ResetLimiter resets the limiter to its initial state and refills tokens to +// initial capacity. This is only relevant for the sliding window limiter, and a +// no-op for token bucket limiter. The token bucket limiter doesn't need to be +// reset since it refills tokens at a fixed per-second rate. +func ResetLimiter(ctx context.Context) { + limiter := ctxLimiter(ctx) + limiter.Reset() +} + // RateLimiterMiddleware is used to ensure we don't overstep per-min request limits. type RateLimiterMiddleware struct{} diff --git a/src/pkg/services/m365/api/graph/middleware_test.go b/src/pkg/services/m365/api/graph/middleware_test.go index 9bd2255cb..10a0352fb 100644 --- a/src/pkg/services/m365/api/graph/middleware_test.go +++ b/src/pkg/services/m365/api/graph/middleware_test.go @@ -353,10 +353,10 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() { assert.Equal(t, defaultLimiter, ctxLimiter(ctx)) table := []struct { - name string - service path.ServiceType - expectOK require.BoolAssertionFunc - expectLimiter limiters.Limiter + name string + service path.ServiceType + enableSlidingLim bool + expectLimiter limiters.Limiter }{ { name: "exchange", @@ -388,12 +388,30 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() { service: path.ServiceType(-1), expectLimiter: defaultLimiter, }, + { + name: "exchange sliding limiter", + service: path.ExchangeService, + enableSlidingLim: true, + expectLimiter: exchSlidingLimiter, + }, + // Sliding limiter flag is ignored for non-exchange services + { + name: "onedrive with sliding limiter flag set", + service: path.OneDriveService, + enableSlidingLim: true, + expectLimiter: driveLimiter, + }, } for _, test := range table { suite.Run(test.name, func() { t := suite.T() - tctx := BindRateLimiterConfig(ctx, LimiterCfg{Service: test.service}) + tctx := BindRateLimiterConfig( + ctx, + LimiterCfg{ + Service: test.service, + EnableSlidingLimiter: test.enableSlidingLim, + }) lc, ok := extractRateLimiterConfig(tctx) require.True(t, ok, "found rate limiter in ctx") assert.Equal(t, test.service, lc.Service)