diff --git a/src/internal/m365/backup.go b/src/internal/m365/backup.go index 8d91ac419..772f4385c 100644 --- a/src/internal/m365/backup.go +++ b/src/internal/m365/backup.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/service/sharepoint" "github.com/alcionai/corso/src/internal/operations/inject" bupMD "github.com/alcionai/corso/src/pkg/backup/metadata" + "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/filters" @@ -224,3 +225,26 @@ func (ctrl *Controller) GetMetadataPaths( return paths, nil } + +func (ctrl *Controller) SetRateLimiter( + ctx context.Context, + service path.ServiceType, + options control.Options, +) context.Context { + // Use sliding window limiter for Exchange if the feature is not explicitly + // disabled. For other services we always use token bucket limiter. + enableSlidingLim := false + if service == path.ExchangeService && + !options.ToggleFeatures.DisableSlidingWindowLimiter { + enableSlidingLim = true + } + + ctx = graph.BindRateLimiterConfig( + ctx, + graph.LimiterCfg{ + Service: service, + EnableSlidingLimiter: enableSlidingLim, + }) + + return ctx +} diff --git a/src/internal/m365/mock/connector.go b/src/internal/m365/mock/connector.go index a131fed01..50c2fd092 100644 --- a/src/internal/m365/mock/connector.go +++ b/src/internal/m365/mock/connector.go @@ -101,3 +101,11 @@ func (ctrl Controller) PopulateProtectedResourceIDAndName( return idname.NewProvider(ctrl.ProtectedResourceID, ctrl.ProtectedResourceName), ctrl.ProtectedResourceErr } + +func (ctrl Controller) SetRateLimiter( + ctx context.Context, + service path.ServiceType, + options control.Options, +) context.Context { + return ctx +} diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index f700c757e..932d56fa9 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -206,11 +206,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { defer flushMetrics() // Select an appropriate rate limiter for the service. - ctx = graph.BindRateLimiterConfig( - ctx, - graph.LimiterCfg{ - Service: op.Selectors.PathService(), - }) + ctx = op.bp.SetRateLimiter(ctx, op.Selectors.PathService(), op.Options) // Check if the protected resource has the service enabled in order for us // to run a backup. @@ -326,6 +322,12 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { logger.Ctx(ctx).Infow("completed backup", "results", op.Results) } + // For exchange, rate limits are enforced on a mailbox level. Reset the + // rate limiter so that it doesn't accidentally throttle following mailboxes. + // This is a no-op if we are using token bucket limiter since it refreshes + // tokens on a fixed per second basis. + graph.ResetLimiter(ctx) + return op.Errors.Failure() } @@ -338,7 +340,6 @@ func (op *BackupOperation) do( backupID model.StableID, ) (*details.Builder, error) { lastBackupVersion := version.NoBackup - service := op.Selectors.PathService() reasons, err := op.Selectors.Reasons(op.account.ID(), false) if err != nil { @@ -386,7 +387,7 @@ func (op *BackupOperation) do( // * the base finder code to skip over older bases (breaks isolation a bit // by requiring knowledge of good/bad backup versions for different // services) - if service == path.GroupsService { + if op.Selectors.PathService() == path.GroupsService { if mans.MinBackupVersion() != version.NoBackup && mans.MinBackupVersion() < version.Groups9Update { logger.Ctx(ctx).Info("dropping merge bases due to groups version change") @@ -432,22 +433,6 @@ func (op *BackupOperation) do( lastBackupVersion = mans.MinBackupVersion() } - // Use sliding window limiter for Exchange if the feature is not explicitly - // disabled. For other services we don't use sliding window limiter. - enableSlidingLim := false - if service == path.ExchangeService && - !op.Options.ToggleFeatures.DisableSlidingWindowLimiter { - enableSlidingLim = true - } - - // Select an appropriate rate limiter for the service. - ctx = graph.BindRateLimiterConfig( - ctx, - graph.LimiterCfg{ - Service: service, - EnableSlidingLimiter: enableSlidingLim, - }) - // TODO(ashmrtn): This should probably just return a collection that deletes // the entire subtree instead of returning an additional bool. That way base // selection is controlled completely by flags and merging is controlled @@ -508,12 +493,6 @@ func (op *BackupOperation) do( logger.Ctx(ctx).Debug(opStats.ctrl) - // For exchange, rate limits are enforced on a mailbox level. Reset the - // rate limiter so that it doesn't accidentally throttle following mailboxes. - // This is a no-op if we are using token bucket limiter since it refreshes - // tokens on a fixed per second basis. - graph.ResetLimiter(ctx) - return deets, nil } diff --git a/src/internal/operations/inject/inject.go b/src/internal/operations/inject/inject.go index 28713e15a..52f50fce2 100644 --- a/src/internal/operations/inject/inject.go +++ b/src/internal/operations/inject/inject.go @@ -44,6 +44,14 @@ type ( ) ([]path.RestorePaths, error) Wait() *data.CollectionStats + + // SetRateLimiter selects a rate limiter type for the service being + // backed up and binds it to the context. + SetRateLimiter( + ctx context.Context, + service path.ServiceType, + options control.Options, + ) context.Context } RestoreConsumer interface { diff --git a/src/internal/operations/inject/mock/inject.go b/src/internal/operations/inject/mock/inject.go index b558c322d..dba52e273 100644 --- a/src/internal/operations/inject/mock/inject.go +++ b/src/internal/operations/inject/mock/inject.go @@ -10,6 +10,7 @@ import ( kinject "github.com/alcionai/corso/src/internal/kopia/inject" "github.com/alcionai/corso/src/internal/m365" "github.com/alcionai/corso/src/internal/operations/inject" + "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" @@ -69,3 +70,11 @@ func (mbp mockBackupProducer) GetMetadataPaths( ctrl := m365.Controller{} return ctrl.GetMetadataPaths(ctx, r, base, errs) } + +func (mbp mockBackupProducer) SetRateLimiter( + ctx context.Context, + service path.ServiceType, + options control.Options, +) context.Context { + return ctx +}