add throttling, token counters to operation (#4318)
Adds context embedding to pkg/count, and embeds the backup, restore, and export operation ctx with the operations count.bus. The bus is extracted from the ctx in api middleware to track throttling response counts, as well as token consumption counts. Counts are globally tracked, not on a per-time or per resource basis. --- #### Does this PR need a docs update or release note? - [x] ⛔ No #### Type of change - [x] 🌻 Feature #### Test Plan - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
bf3a65a351
commit
6bc40ea2aa
@ -11,6 +11,7 @@ import (
|
|||||||
khttp "github.com/microsoft/kiota-http-go"
|
khttp "github.com/microsoft/kiota-http-go"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/pkg/count"
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
"github.com/alcionai/corso/src/pkg/path"
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
)
|
)
|
||||||
@ -318,6 +319,9 @@ func (mw *throttlingMiddleware) Intercept(
|
|||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
countBus := count.Ctx(req.Context())
|
||||||
|
countBus.Inc(count.ThrottledAPICalls)
|
||||||
|
|
||||||
// if all prior conditions pass, we need to add a fence that blocks
|
// if all prior conditions pass, we need to add a fence that blocks
|
||||||
// calls, globally, from progressing until the timeout retry-after
|
// calls, globally, from progressing until the timeout retry-after
|
||||||
// passes.
|
// passes.
|
||||||
|
|||||||
@ -17,6 +17,7 @@ import (
|
|||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/common/pii"
|
"github.com/alcionai/corso/src/internal/common/pii"
|
||||||
"github.com/alcionai/corso/src/internal/events"
|
"github.com/alcionai/corso/src/internal/events"
|
||||||
|
"github.com/alcionai/corso/src/pkg/count"
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -411,6 +412,9 @@ func (mw *MetricsMiddleware) Intercept(
|
|||||||
xmrui = 1
|
xmrui = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
countBus := count.Ctx(req.Context())
|
||||||
|
countBus.Add(count.APICallTokensConsumed, int64(xmrui))
|
||||||
|
|
||||||
events.IncN(xmrui, events.APICall, xmruHeader)
|
events.IncN(xmrui, events.APICall, xmruHeader)
|
||||||
|
|
||||||
return resp, err
|
return resp, err
|
||||||
|
|||||||
@ -202,6 +202,8 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
|
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
|
||||||
defer flushMetrics()
|
defer flushMetrics()
|
||||||
|
|
||||||
|
ctx = count.Embed(ctx, op.Counter)
|
||||||
|
|
||||||
// Check if the protected resource has the service enabled in order for us
|
// Check if the protected resource has the service enabled in order for us
|
||||||
// to run a backup.
|
// to run a backup.
|
||||||
enabled, err := op.bp.IsServiceEnabled(
|
enabled, err := op.bp.IsServiceEnabled(
|
||||||
|
|||||||
@ -136,6 +136,8 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
|
|||||||
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
|
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
|
||||||
defer flushMetrics()
|
defer flushMetrics()
|
||||||
|
|
||||||
|
ctx = count.Embed(ctx, op.Counter)
|
||||||
|
|
||||||
ctx = clues.Add(
|
ctx = clues.Add(
|
||||||
ctx,
|
ctx,
|
||||||
"tenant_id", clues.Hide(op.acct.ID()),
|
"tenant_id", clues.Hide(op.acct.ID()),
|
||||||
|
|||||||
@ -1,6 +1,8 @@
|
|||||||
package count
|
package count
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"github.com/puzpuzpuz/xsync/v2"
|
"github.com/puzpuzpuz/xsync/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -106,3 +108,33 @@ func (b *Bus) TotalValues() map[string]int64 {
|
|||||||
|
|
||||||
return b.Values()
|
return b.Values()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// context embedding
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type countKey string
|
||||||
|
|
||||||
|
const ctxKey countKey = "corsoCounter"
|
||||||
|
|
||||||
|
// Ctx retrieves the count.Bus embedded in the context.
|
||||||
|
func Ctx(ctx context.Context) *Bus {
|
||||||
|
l := ctx.Value(ctxKey)
|
||||||
|
if l == nil {
|
||||||
|
return New()
|
||||||
|
}
|
||||||
|
|
||||||
|
return l.(*Bus)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Embed allows users to embed their own count.Bus within the context.
|
||||||
|
func Embed(
|
||||||
|
ctx context.Context,
|
||||||
|
bus *Bus,
|
||||||
|
) context.Context {
|
||||||
|
if bus == nil {
|
||||||
|
bus = New()
|
||||||
|
}
|
||||||
|
|
||||||
|
return context.WithValue(ctx, ctxKey, bus)
|
||||||
|
}
|
||||||
|
|||||||
@ -3,10 +3,18 @@ package count
|
|||||||
type key string
|
type key string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
// count of bucket-tokens consumed by api calls.
|
||||||
|
APICallTokensConsumed key = "api-call-tokens-consumed"
|
||||||
|
// count of times that items had collisions during restore,
|
||||||
|
// and that collision was solved by replacing the item.
|
||||||
|
CollisionReplace key = "collision-replace"
|
||||||
|
// count of times that items had collisions during restore,
|
||||||
|
// and that collision was solved by skipping the item.
|
||||||
|
CollisionSkip key = "collision-skip"
|
||||||
// NewItemCreated should be used for non-skip, non-replace,
|
// NewItemCreated should be used for non-skip, non-replace,
|
||||||
// non-meta item creation counting. IE: use it specifically
|
// non-meta item creation counting. IE: use it specifically
|
||||||
// for counting new items (no collision) or copied items.
|
// for counting new items (no collision) or copied items.
|
||||||
NewItemCreated key = "new-item-created"
|
NewItemCreated key = "new-item-created"
|
||||||
CollisionReplace key = "collision-replace"
|
// count of api calls that resulted in failure due to throttling.
|
||||||
CollisionSkip key = "collision-skip"
|
ThrottledAPICalls key = "throttled-api-calls"
|
||||||
)
|
)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user