Integrate new sliding window limiter into exchange backup flows

This commit is contained in:
Abhishek Pandey 2023-11-14 02:43:34 -08:00
parent 3b6d4bfe8b
commit 4e117b5703

View File

@ -88,6 +88,14 @@ const (
// but doing so risks timeouts. It's better to give the limits breathing room. // but doing so risks timeouts. It's better to give the limits breathing room.
defaultPerSecond = 16 // 16 * 60 * 10 = 9600 defaultPerSecond = 16 // 16 * 60 * 10 = 9600
defaultMaxCap = 200 // real cap is 10k-per-10-minutes defaultMaxCap = 200 // real cap is 10k-per-10-minutes
// Sliding window limiter for exchange service. Exchange enforces a
// 10-k-per-10-minute limit. We are however keeping it to 9600-per-10-minutes
// to give the limits breathing room. It can be slowly increased over time.
exchWindow = 10 * time.Minute
exchSlideInterval = 1 * time.Second
exchCapacity = 9600
// since drive runs on a per-minute, rather than per-10-minute bucket, we have // since drive runs on a per-minute, rather than per-10-minute bucket, we have
// to keep the max cap equal to the per-second cap. A large maxCap pool (say, // to keep the max cap equal to the per-second cap. A large maxCap pool (say,
// 1200, similar to the per-minute cap) would allow us to make a flood of 2400 // 1200, similar to the per-minute cap) would allow us to make a flood of 2400
@ -100,12 +108,26 @@ const (
var ( var (
driveLimiter = limiters.NewTokenBucketLimiter(drivePerSecond, driveMaxCap) driveLimiter = limiters.NewTokenBucketLimiter(drivePerSecond, driveMaxCap)
// also used as the exchange service limiter // Exchange service token bucket rate limiter
defaultLimiter = limiters.NewTokenBucketLimiter(defaultPerSecond, defaultMaxCap) defaultLimiter = limiters.NewTokenBucketLimiter(defaultPerSecond, defaultMaxCap)
// Exchange service sliding window rate limiter.
//
// TODO(pandeyabs): We are swallowing the error here. It's a limitation of
// using global limiters. For now, we'll catch any errors in the regression
// test until we look into managing limiter life cycles.
exchSlidingLimiter, _ = limiters.NewSlidingWindowLimiter(
exchWindow,
exchSlideInterval,
exchCapacity)
) )
type LimiterCfg struct { type LimiterCfg struct {
Service path.ServiceType Service path.ServiceType
// Experimental flag to enable sliding window rate limiter. It should only be
// enabled for Exchange backups. Set to false by default to prevent accidental
// enablement for non-backup operations and other services.
EnableSlidingLimiter bool
} }
type limiterCfgKey string type limiterCfgKey string
@ -122,13 +144,21 @@ func ctxLimiter(ctx context.Context) limiters.Limiter {
return defaultLimiter return defaultLimiter
} }
lim := defaultLimiter
switch lc.Service { switch lc.Service {
// FIXME: Handle based on category once we add chat backup // FIXME: Handle based on category once we add chat backup
case path.OneDriveService, path.SharePointService, path.GroupsService: case path.OneDriveService, path.SharePointService, path.GroupsService:
return driveLimiter lim = driveLimiter
default: case path.ExchangeService:
return defaultLimiter if lc.EnableSlidingLimiter {
// Return sliding window limiter for Exchange if enabled. Otherwise,
// return the default token bucket limiter.
lim = exchSlidingLimiter
} }
}
return lim
} }
func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) { func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) {