From 4e117b5703e4156ae06752f91945431153317fa7 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Tue, 14 Nov 2023 02:43:34 -0800 Subject: [PATCH] Integrate new sliding window limiter into exchange backup flows --- .../m365/api/graph/concurrency_middleware.go | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/src/pkg/services/m365/api/graph/concurrency_middleware.go b/src/pkg/services/m365/api/graph/concurrency_middleware.go index 0f694ca01..cc176ba7a 100644 --- a/src/pkg/services/m365/api/graph/concurrency_middleware.go +++ b/src/pkg/services/m365/api/graph/concurrency_middleware.go @@ -88,6 +88,14 @@ const ( // but doing so risks timeouts. It's better to give the limits breathing room. defaultPerSecond = 16 // 16 * 60 * 10 = 9600 defaultMaxCap = 200 // real cap is 10k-per-10-minutes + + // Sliding window limiter for exchange service. Exchange enforces a + // 10-k-per-10-minute limit. We are however keeping it to 9600-per-10-minutes + // to give the limits breathing room. It can be slowly increased over time. + exchWindow = 10 * time.Minute + exchSlideInterval = 1 * time.Second + exchCapacity = 9600 + // since drive runs on a per-minute, rather than per-10-minute bucket, we have // to keep the max cap equal to the per-second cap. A large maxCap pool (say, // 1200, similar to the per-minute cap) would allow us to make a flood of 2400 @@ -100,12 +108,26 @@ const ( var ( driveLimiter = limiters.NewTokenBucketLimiter(drivePerSecond, driveMaxCap) - // also used as the exchange service limiter + // Exchange service token bucket rate limiter defaultLimiter = limiters.NewTokenBucketLimiter(defaultPerSecond, defaultMaxCap) + + // Exchange service sliding window rate limiter. + // + // TODO(pandeyabs): We are swallowing the error here. It's a limitation of + // using global limiters. For now, we'll catch any errors in the regression + // test until we look into managing limiter life cycles. + exchSlidingLimiter, _ = limiters.NewSlidingWindowLimiter( + exchWindow, + exchSlideInterval, + exchCapacity) ) type LimiterCfg struct { Service path.ServiceType + // Experimental flag to enable sliding window rate limiter. It should only be + // enabled for Exchange backups. Set to false by default to prevent accidental + // enablement for non-backup operations and other services. + EnableSlidingLimiter bool } type limiterCfgKey string @@ -122,13 +144,21 @@ func ctxLimiter(ctx context.Context) limiters.Limiter { return defaultLimiter } + lim := defaultLimiter + switch lc.Service { // FIXME: Handle based on category once we add chat backup case path.OneDriveService, path.SharePointService, path.GroupsService: - return driveLimiter - default: - return defaultLimiter + lim = driveLimiter + case path.ExchangeService: + if lc.EnableSlidingLimiter { + // Return sliding window limiter for Exchange if enabled. Otherwise, + // return the default token bucket limiter. + lim = exchSlidingLimiter + } } + + return lim } func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) {