From eb0299d3165ca6e1b3eeffa6ce3ef1720320609e Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Tue, 10 Oct 2023 22:15:37 +0530 Subject: [PATCH] Push item download requests through drive rate limiter (#4393) **Changes:** 1. Count item download requests under drive rate limit quota . Currently this is under exchange limiter. 2. Use 1 rate limit token instead of 2 for drive calls by default. 3. Use 2 tokens instead of 1 for initial delta query ( which has no token). Sharing internal docs separately to go along with the review. --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [ ] :sunflower: Feature - [x] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) * # #### Test Plan - [x] :muscle: Manual - [ ] :zap: Unit test - [ ] :green_heart: E2E --- src/internal/m365/collection/drive/collection.go | 8 ++++++++ src/internal/m365/graph/concurrency_middleware.go | 15 +++++---------- src/internal/m365/graph/http_wrapper.go | 2 +- src/pkg/services/m365/api/item_pager.go | 10 +++++++++- 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/internal/m365/collection/drive/collection.go b/src/internal/m365/collection/drive/collection.go index 8a632fe0c..fcd177b9e 100644 --- a/src/internal/m365/collection/drive/collection.go +++ b/src/internal/m365/collection/drive/collection.go @@ -567,6 +567,14 @@ func (oc *Collection) streamDriveItem( parentPath) ctx = clues.Add(ctx, "item_info", itemInfo) + // Drive content download requests are also rate limited by graph api. + // Ensure that this request goes through the drive limiter & not the default + // limiter. + ctx = graph.BindRateLimiterConfig( + ctx, + graph.LimiterCfg{ + Service: path.OneDriveService, + }) if isFile { dataSuffix := metadata.DataFileSuffix diff --git a/src/internal/m365/graph/concurrency_middleware.go b/src/internal/m365/graph/concurrency_middleware.go index 6d651391e..c470fc1cf 100644 --- a/src/internal/m365/graph/concurrency_middleware.go +++ b/src/internal/m365/graph/concurrency_middleware.go @@ -149,11 +149,12 @@ const limiterConsumptionCtxKey limiterConsumptionKey = "corsoGraphRateLimiterCon const ( // https://learn.microsoft.com/en-us/sharepoint/dev/general-development // /how-to-avoid-getting-throttled-or-blocked-in-sharepoint-online#application-throttling - defaultLC = 1 - driveDefaultLC = 2 + defaultLC = 1 // limit consumption rate for single-item GETs requests, - // or delta-based multi-item GETs. + // or delta-based multi-item GETs, or item content download requests. SingleGetOrDeltaLC = 1 + // delta queries without a delta token cost 2 units + DeltaNoTokenLC = 2 // limit consumption rate for anything permissions related PermissionsLC = 5 ) @@ -185,13 +186,7 @@ func ctxLimiterConsumption(ctx context.Context, defaultConsumption int) int { // the next token set is available. func QueueRequest(ctx context.Context) { limiter := ctxLimiter(ctx) - defaultConsumed := defaultLC - - if limiter == driveLimiter { - defaultConsumed = driveDefaultLC - } - - consume := ctxLimiterConsumption(ctx, defaultConsumed) + consume := ctxLimiterConsumption(ctx, defaultLC) if err := limiter.WaitN(ctx, consume); err != nil { logger.CtxErr(ctx, err).Error("graph middleware waiting on the limiter") diff --git a/src/internal/m365/graph/http_wrapper.go b/src/internal/m365/graph/http_wrapper.go index 0948b9b9e..7f5c840b2 100644 --- a/src/internal/m365/graph/http_wrapper.go +++ b/src/internal/m365/graph/http_wrapper.go @@ -82,7 +82,7 @@ func (hw httpWrapper) Request( body io.Reader, headers map[string]string, ) (*http.Response, error) { - req, err := http.NewRequest(method, url, body) + req, err := http.NewRequestWithContext(ctx, method, url, body) if err != nil { return nil, clues.Wrap(err, "new http request") } diff --git a/src/pkg/services/m365/api/item_pager.go b/src/pkg/services/m365/api/item_pager.go index f991f2345..a64ae71b7 100644 --- a/src/pkg/services/m365/api/item_pager.go +++ b/src/pkg/services/m365/api/item_pager.go @@ -147,11 +147,17 @@ func deltaEnumerateItems[T any]( newDeltaLink = "" invalidPrevDelta = len(prevDeltaLink) == 0 nextLink = "do-while" + consume = graph.SingleGetOrDeltaLC ) + if invalidPrevDelta { + // Delta queries with no previous token cost more. + consume = graph.DeltaNoTokenLC + } + // Loop through all pages returned by Graph API. for len(nextLink) > 0 { - page, err := pager.GetPage(graph.ConsumeNTokens(ctx, graph.SingleGetOrDeltaLC)) + page, err := pager.GetPage(graph.ConsumeNTokens(ctx, consume)) if graph.IsErrDeltaNotSupported(err) { logger.Ctx(ctx).Infow("delta queries not supported") return nil, DeltaUpdate{}, clues.Stack(graph.ErrDeltaNotSupported, err) @@ -161,6 +167,8 @@ func deltaEnumerateItems[T any]( logger.Ctx(ctx).Infow("invalid previous delta", "delta_link", prevDeltaLink) invalidPrevDelta = true + // Reset limiter consumption since we don't have a valid delta token. + consume = graph.DeltaNoTokenLC result = make([]T, 0) // Reset tells the pager to try again after ditching its delta history.