Address feedback

This commit is contained in:
Abhishek Pandey 2023-11-14 18:32:14 -08:00
parent f6b83f3e7e
commit ea9f6d97b0
5 changed files with 57 additions and 29 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/service/sharepoint" "github.com/alcionai/corso/src/internal/m365/service/sharepoint"
"github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/internal/operations/inject"
bupMD "github.com/alcionai/corso/src/pkg/backup/metadata" bupMD "github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/filters" "github.com/alcionai/corso/src/pkg/filters"
@ -224,3 +225,26 @@ func (ctrl *Controller) GetMetadataPaths(
return paths, nil return paths, nil
} }
func (ctrl *Controller) SetRateLimiter(
ctx context.Context,
service path.ServiceType,
options control.Options,
) context.Context {
// Use sliding window limiter for Exchange if the feature is not explicitly
// disabled. For other services we always use token bucket limiter.
enableSlidingLim := false
if service == path.ExchangeService &&
!options.ToggleFeatures.DisableSlidingWindowLimiter {
enableSlidingLim = true
}
ctx = graph.BindRateLimiterConfig(
ctx,
graph.LimiterCfg{
Service: service,
EnableSlidingLimiter: enableSlidingLim,
})
return ctx
}

View File

@ -101,3 +101,11 @@ func (ctrl Controller) PopulateProtectedResourceIDAndName(
return idname.NewProvider(ctrl.ProtectedResourceID, ctrl.ProtectedResourceName), return idname.NewProvider(ctrl.ProtectedResourceID, ctrl.ProtectedResourceName),
ctrl.ProtectedResourceErr ctrl.ProtectedResourceErr
} }
func (ctrl Controller) SetRateLimiter(
ctx context.Context,
service path.ServiceType,
options control.Options,
) context.Context {
return ctx
}

View File

@ -206,11 +206,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
defer flushMetrics() defer flushMetrics()
// Select an appropriate rate limiter for the service. // Select an appropriate rate limiter for the service.
ctx = graph.BindRateLimiterConfig( ctx = op.bp.SetRateLimiter(ctx, op.Selectors.PathService(), op.Options)
ctx,
graph.LimiterCfg{
Service: op.Selectors.PathService(),
})
// Check if the protected resource has the service enabled in order for us // Check if the protected resource has the service enabled in order for us
// to run a backup. // to run a backup.
@ -326,6 +322,12 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
logger.Ctx(ctx).Infow("completed backup", "results", op.Results) logger.Ctx(ctx).Infow("completed backup", "results", op.Results)
} }
// For exchange, rate limits are enforced on a mailbox level. Reset the
// rate limiter so that it doesn't accidentally throttle following mailboxes.
// This is a no-op if we are using token bucket limiter since it refreshes
// tokens on a fixed per second basis.
graph.ResetLimiter(ctx)
return op.Errors.Failure() return op.Errors.Failure()
} }
@ -338,7 +340,6 @@ func (op *BackupOperation) do(
backupID model.StableID, backupID model.StableID,
) (*details.Builder, error) { ) (*details.Builder, error) {
lastBackupVersion := version.NoBackup lastBackupVersion := version.NoBackup
service := op.Selectors.PathService()
reasons, err := op.Selectors.Reasons(op.account.ID(), false) reasons, err := op.Selectors.Reasons(op.account.ID(), false)
if err != nil { if err != nil {
@ -386,7 +387,7 @@ func (op *BackupOperation) do(
// * the base finder code to skip over older bases (breaks isolation a bit // * the base finder code to skip over older bases (breaks isolation a bit
// by requiring knowledge of good/bad backup versions for different // by requiring knowledge of good/bad backup versions for different
// services) // services)
if service == path.GroupsService { if op.Selectors.PathService() == path.GroupsService {
if mans.MinBackupVersion() != version.NoBackup && if mans.MinBackupVersion() != version.NoBackup &&
mans.MinBackupVersion() < version.Groups9Update { mans.MinBackupVersion() < version.Groups9Update {
logger.Ctx(ctx).Info("dropping merge bases due to groups version change") logger.Ctx(ctx).Info("dropping merge bases due to groups version change")
@ -432,22 +433,6 @@ func (op *BackupOperation) do(
lastBackupVersion = mans.MinBackupVersion() lastBackupVersion = mans.MinBackupVersion()
} }
// Use sliding window limiter for Exchange if the feature is not explicitly
// disabled. For other services we don't use sliding window limiter.
enableSlidingLim := false
if service == path.ExchangeService &&
!op.Options.ToggleFeatures.DisableSlidingWindowLimiter {
enableSlidingLim = true
}
// Select an appropriate rate limiter for the service.
ctx = graph.BindRateLimiterConfig(
ctx,
graph.LimiterCfg{
Service: service,
EnableSlidingLimiter: enableSlidingLim,
})
// TODO(ashmrtn): This should probably just return a collection that deletes // TODO(ashmrtn): This should probably just return a collection that deletes
// the entire subtree instead of returning an additional bool. That way base // the entire subtree instead of returning an additional bool. That way base
// selection is controlled completely by flags and merging is controlled // selection is controlled completely by flags and merging is controlled
@ -508,12 +493,6 @@ func (op *BackupOperation) do(
logger.Ctx(ctx).Debug(opStats.ctrl) logger.Ctx(ctx).Debug(opStats.ctrl)
// For exchange, rate limits are enforced on a mailbox level. Reset the
// rate limiter so that it doesn't accidentally throttle following mailboxes.
// This is a no-op if we are using token bucket limiter since it refreshes
// tokens on a fixed per second basis.
graph.ResetLimiter(ctx)
return deets, nil return deets, nil
} }

View File

@ -44,6 +44,14 @@ type (
) ([]path.RestorePaths, error) ) ([]path.RestorePaths, error)
Wait() *data.CollectionStats Wait() *data.CollectionStats
// SetRateLimiter selects a rate limiter type for the service being
// backed up and binds it to the context.
SetRateLimiter(
ctx context.Context,
service path.ServiceType,
options control.Options,
) context.Context
} }
RestoreConsumer interface { RestoreConsumer interface {

View File

@ -10,6 +10,7 @@ import (
kinject "github.com/alcionai/corso/src/internal/kopia/inject" kinject "github.com/alcionai/corso/src/internal/kopia/inject"
"github.com/alcionai/corso/src/internal/m365" "github.com/alcionai/corso/src/internal/m365"
"github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
@ -69,3 +70,11 @@ func (mbp mockBackupProducer) GetMetadataPaths(
ctrl := m365.Controller{} ctrl := m365.Controller{}
return ctrl.GetMetadataPaths(ctx, r, base, errs) return ctrl.GetMetadataPaths(ctx, r, base, errs)
} }
func (mbp mockBackupProducer) SetRateLimiter(
ctx context.Context,
service path.ServiceType,
options control.Options,
) context.Context {
return ctx
}