diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 1821e2e3c..f700c757e 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -338,6 +338,7 @@ func (op *BackupOperation) do( backupID model.StableID, ) (*details.Builder, error) { lastBackupVersion := version.NoBackup + service := op.Selectors.PathService() reasons, err := op.Selectors.Reasons(op.account.ID(), false) if err != nil { @@ -385,7 +386,7 @@ func (op *BackupOperation) do( // * the base finder code to skip over older bases (breaks isolation a bit // by requiring knowledge of good/bad backup versions for different // services) - if op.Selectors.PathService() == path.GroupsService { + if service == path.GroupsService { if mans.MinBackupVersion() != version.NoBackup && mans.MinBackupVersion() < version.Groups9Update { logger.Ctx(ctx).Info("dropping merge bases due to groups version change") @@ -431,6 +432,22 @@ func (op *BackupOperation) do( lastBackupVersion = mans.MinBackupVersion() } + // Use sliding window limiter for Exchange if the feature is not explicitly + // disabled. For other services we don't use sliding window limiter. + enableSlidingLim := false + if service == path.ExchangeService && + !op.Options.ToggleFeatures.DisableSlidingWindowLimiter { + enableSlidingLim = true + } + + // Select an appropriate rate limiter for the service. + ctx = graph.BindRateLimiterConfig( + ctx, + graph.LimiterCfg{ + Service: service, + EnableSlidingLimiter: enableSlidingLim, + }) + // TODO(ashmrtn): This should probably just return a collection that deletes // the entire subtree instead of returning an additional bool. That way base // selection is controlled completely by flags and merging is controlled @@ -491,6 +508,12 @@ func (op *BackupOperation) do( logger.Ctx(ctx).Debug(opStats.ctrl) + // For exchange, rate limits are enforced on a mailbox level. Reset the + // rate limiter so that it doesn't accidentally throttle following mailboxes. + // This is a no-op if we are using token bucket limiter since it refreshes + // tokens on a fixed per second basis. + graph.ResetLimiter(ctx) + return deets, nil } diff --git a/src/pkg/services/m365/api/graph/concurrency_middleware.go b/src/pkg/services/m365/api/graph/concurrency_middleware.go index cc176ba7a..19cc8aa3a 100644 --- a/src/pkg/services/m365/api/graph/concurrency_middleware.go +++ b/src/pkg/services/m365/api/graph/concurrency_middleware.go @@ -89,9 +89,9 @@ const ( defaultPerSecond = 16 // 16 * 60 * 10 = 9600 defaultMaxCap = 200 // real cap is 10k-per-10-minutes - // Sliding window limiter for exchange service. Exchange enforces a - // 10-k-per-10-minute limit. We are however keeping it to 9600-per-10-minutes - // to give the limits breathing room. It can be slowly increased over time. + // Sliding window limiter for exchange service. We are restricting it to 9600 per + // 10 mins to give the limits some breathing room. It can be slowly increased + // over time to get closer to the 10k limit. exchWindow = 10 * time.Minute exchSlideInterval = 1 * time.Second exchCapacity = 9600 @@ -125,8 +125,8 @@ var ( type LimiterCfg struct { Service path.ServiceType // Experimental flag to enable sliding window rate limiter. It should only be - // enabled for Exchange backups. Set to false by default to prevent accidental - // enablement for non-backup operations and other services. + // enabled for Exchange backups. It's set to false by default to prevent accidental + // enablement for non backup operations and other services. EnableSlidingLimiter bool } @@ -223,6 +223,11 @@ func QueueRequest(ctx context.Context) { } } +func ResetLimiter(ctx context.Context) { + limiter := ctxLimiter(ctx) + limiter.Reset() +} + // RateLimiterMiddleware is used to ensure we don't overstep per-min request limits. type RateLimiterMiddleware struct{} diff --git a/src/pkg/services/m365/api/graph/middleware_test.go b/src/pkg/services/m365/api/graph/middleware_test.go index 9bd2255cb..10a0352fb 100644 --- a/src/pkg/services/m365/api/graph/middleware_test.go +++ b/src/pkg/services/m365/api/graph/middleware_test.go @@ -353,10 +353,10 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() { assert.Equal(t, defaultLimiter, ctxLimiter(ctx)) table := []struct { - name string - service path.ServiceType - expectOK require.BoolAssertionFunc - expectLimiter limiters.Limiter + name string + service path.ServiceType + enableSlidingLim bool + expectLimiter limiters.Limiter }{ { name: "exchange", @@ -388,12 +388,30 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() { service: path.ServiceType(-1), expectLimiter: defaultLimiter, }, + { + name: "exchange sliding limiter", + service: path.ExchangeService, + enableSlidingLim: true, + expectLimiter: exchSlidingLimiter, + }, + // Sliding limiter flag is ignored for non-exchange services + { + name: "onedrive with sliding limiter flag set", + service: path.OneDriveService, + enableSlidingLim: true, + expectLimiter: driveLimiter, + }, } for _, test := range table { suite.Run(test.name, func() { t := suite.T() - tctx := BindRateLimiterConfig(ctx, LimiterCfg{Service: test.service}) + tctx := BindRateLimiterConfig( + ctx, + LimiterCfg{ + Service: test.service, + EnableSlidingLimiter: test.enableSlidingLim, + }) lc, ok := extractRateLimiterConfig(tctx) require.True(t, ok, "found rate limiter in ctx") assert.Equal(t, test.service, lc.Service)