Integrate sliding window limiter into exchange backup operation (#4680)

<!-- PR description-->

2nd last PR in the chain. Wires up the new sliding limiter into exchange backup create flows. I'll add some e2e regression tests to wrap it up.

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [x] 🕐 Yes, but in a later PR
- [ ]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abhishek Pandey 2023-11-16 01:58:00 -08:00 committed by GitHub
parent dfea154cfe
commit f2102e55f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 122 additions and 14 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/service/sharepoint" "github.com/alcionai/corso/src/internal/m365/service/sharepoint"
"github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/internal/operations/inject"
bupMD "github.com/alcionai/corso/src/pkg/backup/metadata" bupMD "github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/filters" "github.com/alcionai/corso/src/pkg/filters"
@ -224,3 +225,26 @@ func (ctrl *Controller) GetMetadataPaths(
return paths, nil return paths, nil
} }
func (ctrl *Controller) SetRateLimiter(
ctx context.Context,
service path.ServiceType,
options control.Options,
) context.Context {
// Use sliding window limiter for Exchange if the feature is not explicitly
// disabled. For other services we always use token bucket limiter.
enableSlidingLim := false
if service == path.ExchangeService &&
!options.ToggleFeatures.DisableSlidingWindowLimiter {
enableSlidingLim = true
}
ctx = graph.BindRateLimiterConfig(
ctx,
graph.LimiterCfg{
Service: service,
EnableSlidingLimiter: enableSlidingLim,
})
return ctx
}

View File

@ -101,3 +101,11 @@ func (ctrl Controller) PopulateProtectedResourceIDAndName(
return idname.NewProvider(ctrl.ProtectedResourceID, ctrl.ProtectedResourceName), return idname.NewProvider(ctrl.ProtectedResourceID, ctrl.ProtectedResourceName),
ctrl.ProtectedResourceErr ctrl.ProtectedResourceErr
} }
func (ctrl Controller) SetRateLimiter(
ctx context.Context,
service path.ServiceType,
options control.Options,
) context.Context {
return ctx
}

View File

@ -206,11 +206,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
ctx = clues.AddTrace(ctx) ctx = clues.AddTrace(ctx)
// Select an appropriate rate limiter for the service. // Select an appropriate rate limiter for the service.
ctx = graph.BindRateLimiterConfig( ctx = op.bp.SetRateLimiter(ctx, op.Selectors.PathService(), op.Options)
ctx,
graph.LimiterCfg{
Service: op.Selectors.PathService(),
})
// Check if the protected resource has the service enabled in order for us // Check if the protected resource has the service enabled in order for us
// to run a backup. // to run a backup.
@ -326,6 +322,12 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
logger.Ctx(ctx).Infow("completed backup", "results", op.Results) logger.Ctx(ctx).Infow("completed backup", "results", op.Results)
} }
// For exchange, rate limits are enforced on a mailbox level. Reset the
// rate limiter so that it doesn't accidentally throttle following mailboxes.
// This is a no-op if we are using token bucket limiter since it refreshes
// tokens on a fixed per second basis.
graph.ResetLimiter(ctx)
return op.Errors.Failure() return op.Errors.Failure()
} }

View File

@ -44,6 +44,14 @@ type (
) ([]path.RestorePaths, error) ) ([]path.RestorePaths, error)
Wait() *data.CollectionStats Wait() *data.CollectionStats
// SetRateLimiter selects a rate limiter type for the service being
// backed up and binds it to the context.
SetRateLimiter(
ctx context.Context,
service path.ServiceType,
options control.Options,
) context.Context
} }
RestoreConsumer interface { RestoreConsumer interface {

View File

@ -10,6 +10,7 @@ import (
kinject "github.com/alcionai/corso/src/internal/kopia/inject" kinject "github.com/alcionai/corso/src/internal/kopia/inject"
"github.com/alcionai/corso/src/internal/m365" "github.com/alcionai/corso/src/internal/m365"
"github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
@ -69,3 +70,11 @@ func (mbp mockBackupProducer) GetMetadataPaths(
ctrl := m365.Controller{} ctrl := m365.Controller{}
return ctrl.GetMetadataPaths(ctx, r, base, errs) return ctrl.GetMetadataPaths(ctx, r, base, errs)
} }
func (mbp mockBackupProducer) SetRateLimiter(
ctx context.Context,
service path.ServiceType,
options control.Options,
) context.Context {
return ctx
}

View File

@ -88,6 +88,14 @@ const (
// but doing so risks timeouts. It's better to give the limits breathing room. // but doing so risks timeouts. It's better to give the limits breathing room.
defaultPerSecond = 16 // 16 * 60 * 10 = 9600 defaultPerSecond = 16 // 16 * 60 * 10 = 9600
defaultMaxCap = 200 // real cap is 10k-per-10-minutes defaultMaxCap = 200 // real cap is 10k-per-10-minutes
// Sliding window limiter for exchange service. We are restricting it to 9600 per
// 10 mins to give the limits some breathing room. It can be slowly increased
// over time to get closer to the 10k limit.
exchWindow = 10 * time.Minute
exchSlideInterval = 1 * time.Second
exchCapacity = 9600
// since drive runs on a per-minute, rather than per-10-minute bucket, we have // since drive runs on a per-minute, rather than per-10-minute bucket, we have
// to keep the max cap equal to the per-second cap. A large maxCap pool (say, // to keep the max cap equal to the per-second cap. A large maxCap pool (say,
// 1200, similar to the per-minute cap) would allow us to make a flood of 2400 // 1200, similar to the per-minute cap) would allow us to make a flood of 2400
@ -100,12 +108,26 @@ const (
var ( var (
driveLimiter = limiters.NewTokenBucketLimiter(drivePerSecond, driveMaxCap) driveLimiter = limiters.NewTokenBucketLimiter(drivePerSecond, driveMaxCap)
// also used as the exchange service limiter // Exchange service token bucket rate limiter
defaultLimiter = limiters.NewTokenBucketLimiter(defaultPerSecond, defaultMaxCap) defaultLimiter = limiters.NewTokenBucketLimiter(defaultPerSecond, defaultMaxCap)
// Exchange service sliding window rate limiter.
//
// TODO(pandeyabs): We are swallowing the error here. It's a limitation of
// using global limiters. For now, we'll catch any errors in the regression
// test until we look into managing limiter life cycles.
exchSlidingLimiter, _ = limiters.NewSlidingWindowLimiter(
exchWindow,
exchSlideInterval,
exchCapacity)
) )
type LimiterCfg struct { type LimiterCfg struct {
Service path.ServiceType Service path.ServiceType
// Experimental flag to enable sliding window rate limiter. It should only be
// enabled for Exchange backups. It's set to false by default to prevent accidental
// enablement for non backup operations and other services.
EnableSlidingLimiter bool
} }
type limiterCfgKey string type limiterCfgKey string
@ -122,13 +144,21 @@ func ctxLimiter(ctx context.Context) limiters.Limiter {
return defaultLimiter return defaultLimiter
} }
lim := defaultLimiter
switch lc.Service { switch lc.Service {
// FIXME: Handle based on category once we add chat backup // FIXME: Handle based on category once we add chat backup
case path.OneDriveService, path.SharePointService, path.GroupsService: case path.OneDriveService, path.SharePointService, path.GroupsService:
return driveLimiter lim = driveLimiter
default: case path.ExchangeService:
return defaultLimiter if lc.EnableSlidingLimiter {
// Return sliding window limiter for Exchange if enabled. Otherwise,
// return the default token bucket limiter.
lim = exchSlidingLimiter
}
} }
return lim
} }
func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) { func extractRateLimiterConfig(ctx context.Context) (LimiterCfg, bool) {
@ -193,6 +223,15 @@ func QueueRequest(ctx context.Context) {
} }
} }
// ResetLimiter resets the limiter to its initial state and refills tokens to
// initial capacity. This is only relevant for the sliding window limiter, and a
// no-op for token bucket limiter. The token bucket limiter doesn't need to be
// reset since it refills tokens at a fixed per-second rate.
func ResetLimiter(ctx context.Context) {
limiter := ctxLimiter(ctx)
limiter.Reset()
}
// RateLimiterMiddleware is used to ensure we don't overstep per-min request limits. // RateLimiterMiddleware is used to ensure we don't overstep per-min request limits.
type RateLimiterMiddleware struct{} type RateLimiterMiddleware struct{}

View File

@ -353,10 +353,10 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() {
assert.Equal(t, defaultLimiter, ctxLimiter(ctx)) assert.Equal(t, defaultLimiter, ctxLimiter(ctx))
table := []struct { table := []struct {
name string name string
service path.ServiceType service path.ServiceType
expectOK require.BoolAssertionFunc enableSlidingLim bool
expectLimiter limiters.Limiter expectLimiter limiters.Limiter
}{ }{
{ {
name: "exchange", name: "exchange",
@ -388,12 +388,30 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() {
service: path.ServiceType(-1), service: path.ServiceType(-1),
expectLimiter: defaultLimiter, expectLimiter: defaultLimiter,
}, },
{
name: "exchange sliding limiter",
service: path.ExchangeService,
enableSlidingLim: true,
expectLimiter: exchSlidingLimiter,
},
// Sliding limiter flag is ignored for non-exchange services
{
name: "onedrive with sliding limiter flag set",
service: path.OneDriveService,
enableSlidingLim: true,
expectLimiter: driveLimiter,
},
} }
for _, test := range table { for _, test := range table {
suite.Run(test.name, func() { suite.Run(test.name, func() {
t := suite.T() t := suite.T()
tctx := BindRateLimiterConfig(ctx, LimiterCfg{Service: test.service}) tctx := BindRateLimiterConfig(
ctx,
LimiterCfg{
Service: test.service,
EnableSlidingLimiter: test.enableSlidingLim,
})
lc, ok := extractRateLimiterConfig(tctx) lc, ok := extractRateLimiterConfig(tctx)
require.True(t, ok, "found rate limiter in ctx") require.True(t, ok, "found rate limiter in ctx")
assert.Equal(t, test.service, lc.Service) assert.Equal(t, test.service, lc.Service)