From 6bc40ea2aad324057dc9d12f71fc0a950185de44 Mon Sep 17 00:00:00 2001 From: Keepers Date: Thu, 21 Sep 2023 18:43:16 -0600 Subject: [PATCH] add throttling, token counters to operation (#4318) Adds context embedding to pkg/count, and embeds the backup, restore, and export operation ctx with the operations count.bus. The bus is extracted from the ctx in api middleware to track throttling response counts, as well as token consumption counts. Counts are globally tracked, not on a per-time or per resource basis. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :sunflower: Feature #### Test Plan - [x] :zap: Unit test - [x] :green_heart: E2E --- .../m365/graph/concurrency_middleware.go | 4 +++ src/internal/m365/graph/middleware.go | 4 +++ src/internal/operations/backup.go | 2 ++ src/internal/operations/restore.go | 2 ++ src/pkg/count/count.go | 32 +++++++++++++++++++ src/pkg/count/keys.go | 14 ++++++-- 6 files changed, 55 insertions(+), 3 deletions(-) diff --git a/src/internal/m365/graph/concurrency_middleware.go b/src/internal/m365/graph/concurrency_middleware.go index a9e45f17c..6d651391e 100644 --- a/src/internal/m365/graph/concurrency_middleware.go +++ b/src/internal/m365/graph/concurrency_middleware.go @@ -11,6 +11,7 @@ import ( khttp "github.com/microsoft/kiota-http-go" "golang.org/x/time/rate" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" ) @@ -318,6 +319,9 @@ func (mw *throttlingMiddleware) Intercept( return resp, nil } + countBus := count.Ctx(req.Context()) + countBus.Inc(count.ThrottledAPICalls) + // if all prior conditions pass, we need to add a fence that blocks // calls, globally, from progressing until the timeout retry-after // passes. diff --git a/src/internal/m365/graph/middleware.go b/src/internal/m365/graph/middleware.go index fcf76346a..63789ab23 100644 --- a/src/internal/m365/graph/middleware.go +++ b/src/internal/m365/graph/middleware.go @@ -17,6 +17,7 @@ import ( "github.com/alcionai/corso/src/internal/common/pii" "github.com/alcionai/corso/src/internal/events" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/logger" ) @@ -411,6 +412,9 @@ func (mw *MetricsMiddleware) Intercept( xmrui = 1 } + countBus := count.Ctx(req.Context()) + countBus.Add(count.APICallTokensConsumed, int64(xmrui)) + events.IncN(xmrui, events.APICall, xmruHeader) return resp, err diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 00b280c80..80b74277e 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -202,6 +202,8 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx}) defer flushMetrics() + ctx = count.Embed(ctx, op.Counter) + // Check if the protected resource has the service enabled in order for us // to run a backup. enabled, err := op.bp.IsServiceEnabled( diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index ed9fc863c..dc05836e3 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -136,6 +136,8 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx}) defer flushMetrics() + ctx = count.Embed(ctx, op.Counter) + ctx = clues.Add( ctx, "tenant_id", clues.Hide(op.acct.ID()), diff --git a/src/pkg/count/count.go b/src/pkg/count/count.go index bee986a26..cfaa17030 100644 --- a/src/pkg/count/count.go +++ b/src/pkg/count/count.go @@ -1,6 +1,8 @@ package count import ( + "context" + "github.com/puzpuzpuz/xsync/v2" ) @@ -106,3 +108,33 @@ func (b *Bus) TotalValues() map[string]int64 { return b.Values() } + +// --------------------------------------------------------------------------- +// context embedding +// --------------------------------------------------------------------------- + +type countKey string + +const ctxKey countKey = "corsoCounter" + +// Ctx retrieves the count.Bus embedded in the context. +func Ctx(ctx context.Context) *Bus { + l := ctx.Value(ctxKey) + if l == nil { + return New() + } + + return l.(*Bus) +} + +// Embed allows users to embed their own count.Bus within the context. +func Embed( + ctx context.Context, + bus *Bus, +) context.Context { + if bus == nil { + bus = New() + } + + return context.WithValue(ctx, ctxKey, bus) +} diff --git a/src/pkg/count/keys.go b/src/pkg/count/keys.go index 593af4b46..8ebf2e20c 100644 --- a/src/pkg/count/keys.go +++ b/src/pkg/count/keys.go @@ -3,10 +3,18 @@ package count type key string const ( + // count of bucket-tokens consumed by api calls. + APICallTokensConsumed key = "api-call-tokens-consumed" + // count of times that items had collisions during restore, + // and that collision was solved by replacing the item. + CollisionReplace key = "collision-replace" + // count of times that items had collisions during restore, + // and that collision was solved by skipping the item. + CollisionSkip key = "collision-skip" // NewItemCreated should be used for non-skip, non-replace, // non-meta item creation counting. IE: use it specifically // for counting new items (no collision) or copied items. - NewItemCreated key = "new-item-created" - CollisionReplace key = "collision-replace" - CollisionSkip key = "collision-skip" + NewItemCreated key = "new-item-created" + // count of api calls that resulted in failure due to throttling. + ThrottledAPICalls key = "throttled-api-calls" )