Integrate w rebase
This commit is contained in:
parent
4e117b5703
commit
f6b83f3e7e
@ -338,6 +338,7 @@ func (op *BackupOperation) do(
|
|||||||
backupID model.StableID,
|
backupID model.StableID,
|
||||||
) (*details.Builder, error) {
|
) (*details.Builder, error) {
|
||||||
lastBackupVersion := version.NoBackup
|
lastBackupVersion := version.NoBackup
|
||||||
|
service := op.Selectors.PathService()
|
||||||
|
|
||||||
reasons, err := op.Selectors.Reasons(op.account.ID(), false)
|
reasons, err := op.Selectors.Reasons(op.account.ID(), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -385,7 +386,7 @@ func (op *BackupOperation) do(
|
|||||||
// * the base finder code to skip over older bases (breaks isolation a bit
|
// * the base finder code to skip over older bases (breaks isolation a bit
|
||||||
// by requiring knowledge of good/bad backup versions for different
|
// by requiring knowledge of good/bad backup versions for different
|
||||||
// services)
|
// services)
|
||||||
if op.Selectors.PathService() == path.GroupsService {
|
if service == path.GroupsService {
|
||||||
if mans.MinBackupVersion() != version.NoBackup &&
|
if mans.MinBackupVersion() != version.NoBackup &&
|
||||||
mans.MinBackupVersion() < version.Groups9Update {
|
mans.MinBackupVersion() < version.Groups9Update {
|
||||||
logger.Ctx(ctx).Info("dropping merge bases due to groups version change")
|
logger.Ctx(ctx).Info("dropping merge bases due to groups version change")
|
||||||
@ -431,6 +432,22 @@ func (op *BackupOperation) do(
|
|||||||
lastBackupVersion = mans.MinBackupVersion()
|
lastBackupVersion = mans.MinBackupVersion()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Use sliding window limiter for Exchange if the feature is not explicitly
|
||||||
|
// disabled. For other services we don't use sliding window limiter.
|
||||||
|
enableSlidingLim := false
|
||||||
|
if service == path.ExchangeService &&
|
||||||
|
!op.Options.ToggleFeatures.DisableSlidingWindowLimiter {
|
||||||
|
enableSlidingLim = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Select an appropriate rate limiter for the service.
|
||||||
|
ctx = graph.BindRateLimiterConfig(
|
||||||
|
ctx,
|
||||||
|
graph.LimiterCfg{
|
||||||
|
Service: service,
|
||||||
|
EnableSlidingLimiter: enableSlidingLim,
|
||||||
|
})
|
||||||
|
|
||||||
// TODO(ashmrtn): This should probably just return a collection that deletes
|
// TODO(ashmrtn): This should probably just return a collection that deletes
|
||||||
// the entire subtree instead of returning an additional bool. That way base
|
// the entire subtree instead of returning an additional bool. That way base
|
||||||
// selection is controlled completely by flags and merging is controlled
|
// selection is controlled completely by flags and merging is controlled
|
||||||
@ -491,6 +508,12 @@ func (op *BackupOperation) do(
|
|||||||
|
|
||||||
logger.Ctx(ctx).Debug(opStats.ctrl)
|
logger.Ctx(ctx).Debug(opStats.ctrl)
|
||||||
|
|
||||||
|
// For exchange, rate limits are enforced on a mailbox level. Reset the
|
||||||
|
// rate limiter so that it doesn't accidentally throttle following mailboxes.
|
||||||
|
// This is a no-op if we are using token bucket limiter since it refreshes
|
||||||
|
// tokens on a fixed per second basis.
|
||||||
|
graph.ResetLimiter(ctx)
|
||||||
|
|
||||||
return deets, nil
|
return deets, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -89,9 +89,9 @@ const (
|
|||||||
defaultPerSecond = 16 // 16 * 60 * 10 = 9600
|
defaultPerSecond = 16 // 16 * 60 * 10 = 9600
|
||||||
defaultMaxCap = 200 // real cap is 10k-per-10-minutes
|
defaultMaxCap = 200 // real cap is 10k-per-10-minutes
|
||||||
|
|
||||||
// Sliding window limiter for exchange service. Exchange enforces a
|
// Sliding window limiter for exchange service. We are restricting it to 9600 per
|
||||||
// 10-k-per-10-minute limit. We are however keeping it to 9600-per-10-minutes
|
// 10 mins to give the limits some breathing room. It can be slowly increased
|
||||||
// to give the limits breathing room. It can be slowly increased over time.
|
// over time to get closer to the 10k limit.
|
||||||
exchWindow = 10 * time.Minute
|
exchWindow = 10 * time.Minute
|
||||||
exchSlideInterval = 1 * time.Second
|
exchSlideInterval = 1 * time.Second
|
||||||
exchCapacity = 9600
|
exchCapacity = 9600
|
||||||
@ -125,8 +125,8 @@ var (
|
|||||||
type LimiterCfg struct {
|
type LimiterCfg struct {
|
||||||
Service path.ServiceType
|
Service path.ServiceType
|
||||||
// Experimental flag to enable sliding window rate limiter. It should only be
|
// Experimental flag to enable sliding window rate limiter. It should only be
|
||||||
// enabled for Exchange backups. Set to false by default to prevent accidental
|
// enabled for Exchange backups. It's set to false by default to prevent accidental
|
||||||
// enablement for non-backup operations and other services.
|
// enablement for non backup operations and other services.
|
||||||
EnableSlidingLimiter bool
|
EnableSlidingLimiter bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -223,6 +223,11 @@ func QueueRequest(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ResetLimiter(ctx context.Context) {
|
||||||
|
limiter := ctxLimiter(ctx)
|
||||||
|
limiter.Reset()
|
||||||
|
}
|
||||||
|
|
||||||
// RateLimiterMiddleware is used to ensure we don't overstep per-min request limits.
|
// RateLimiterMiddleware is used to ensure we don't overstep per-min request limits.
|
||||||
type RateLimiterMiddleware struct{}
|
type RateLimiterMiddleware struct{}
|
||||||
|
|
||||||
|
|||||||
@ -355,7 +355,7 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() {
|
|||||||
table := []struct {
|
table := []struct {
|
||||||
name string
|
name string
|
||||||
service path.ServiceType
|
service path.ServiceType
|
||||||
expectOK require.BoolAssertionFunc
|
enableSlidingLim bool
|
||||||
expectLimiter limiters.Limiter
|
expectLimiter limiters.Limiter
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -388,12 +388,30 @@ func (suite *MiddlewareUnitSuite) TestBindExtractLimiterConfig() {
|
|||||||
service: path.ServiceType(-1),
|
service: path.ServiceType(-1),
|
||||||
expectLimiter: defaultLimiter,
|
expectLimiter: defaultLimiter,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "exchange sliding limiter",
|
||||||
|
service: path.ExchangeService,
|
||||||
|
enableSlidingLim: true,
|
||||||
|
expectLimiter: exchSlidingLimiter,
|
||||||
|
},
|
||||||
|
// Sliding limiter flag is ignored for non-exchange services
|
||||||
|
{
|
||||||
|
name: "onedrive with sliding limiter flag set",
|
||||||
|
service: path.OneDriveService,
|
||||||
|
enableSlidingLim: true,
|
||||||
|
expectLimiter: driveLimiter,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
t := suite.T()
|
t := suite.T()
|
||||||
|
|
||||||
tctx := BindRateLimiterConfig(ctx, LimiterCfg{Service: test.service})
|
tctx := BindRateLimiterConfig(
|
||||||
|
ctx,
|
||||||
|
LimiterCfg{
|
||||||
|
Service: test.service,
|
||||||
|
EnableSlidingLimiter: test.enableSlidingLim,
|
||||||
|
})
|
||||||
lc, ok := extractRateLimiterConfig(tctx)
|
lc, ok := extractRateLimiterConfig(tctx)
|
||||||
require.True(t, ok, "found rate limiter in ctx")
|
require.True(t, ok, "found rate limiter in ctx")
|
||||||
assert.Equal(t, test.service, lc.Service)
|
assert.Equal(t, test.service, lc.Service)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user