From ea3d5d7b9fb7be20287972b6d4e31dda6378e886 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Fri, 6 Oct 2023 23:33:03 +0530 Subject: [PATCH] Add guage --- src/internal/events/adot_sdk_client.go | 27 ++++++++++++++----- .../m365/graph/concurrency_middleware.go | 19 +++++++++++-- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/internal/events/adot_sdk_client.go b/src/internal/events/adot_sdk_client.go index ff4fcc215..2f926624a 100644 --- a/src/internal/events/adot_sdk_client.go +++ b/src/internal/events/adot_sdk_client.go @@ -19,6 +19,7 @@ import ( var Ctr metric.Int64Counter var AsyncCtr metric.Int64ObservableCounter +var RLGauge metric.Int64ObservableGauge var token int64 type collector struct { @@ -35,11 +36,13 @@ const ( var metricKeys = []string{ APITokens, GrowCounter, - RLTokens, + // RLTokens, } // Map of metricsCategory to metric.Int64Counter -var data = map[string]metric.Int64Counter{} +var counter = map[string]metric.Int64Counter{} + +//var guages = map[string]metric.Int64ObservableGauge{} func NewCollector(mp metric.MeterProvider) { rmc := collector{} @@ -47,12 +50,22 @@ func NewCollector(mp metric.MeterProvider) { rmc.meter = mp.Meter("corso-meter") for _, key := range metricKeys { - data[key], _ = rmc.meter.Int64Counter(key) + counter[key], _ = rmc.meter.Int64Counter(key) } - // // Create a new guage - // g, _ := rmc.meter.Int64ObservableGauge(RLTokens) +} +func RegisterGauge(ctx context.Context, name string, cb func(_ context.Context, o metric.Observer) error) { + RLGauge, _ := otel.Meter("corso-meter").Int64ObservableGauge(name) + + _, err := otel.Meter("corso-meter").RegisterCallback( + cb, + RLGauge, + ) + + if err != nil { + logger.CtxErr(ctx, err).Errorw("failed to register callback") + } } func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) { @@ -64,13 +77,13 @@ func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) { // Inc increments the given category by 1. func Inc(ctx context.Context, cat string) { - ctr := data[cat] + ctr := counter[cat] ctr.Add(ctx, 1) } // IncN increments the given category by N. func IncN(ctx context.Context, n int, cat string) { - ctr := data[cat] + ctr := counter[cat] ctr.Add(ctx, int64(n)) } diff --git a/src/internal/m365/graph/concurrency_middleware.go b/src/internal/m365/graph/concurrency_middleware.go index 5caa02a48..9a7787701 100644 --- a/src/internal/m365/graph/concurrency_middleware.go +++ b/src/internal/m365/graph/concurrency_middleware.go @@ -9,6 +9,7 @@ import ( "github.com/alcionai/clues" khttp "github.com/microsoft/kiota-http-go" + "go.opentelemetry.io/otel/metric" "golang.org/x/time/rate" "github.com/alcionai/corso/src/internal/events" @@ -159,6 +160,19 @@ const ( PermissionsLC = 5 ) +var twonce sync.Once + +func RegisterRLMetrics(ctx context.Context) { + twonce.Do(func() { + cb := func(_ context.Context, o metric.Observer) error { + o.ObserveInt64(events.RLGauge, int64(ctxLimiter(ctx).Tokens())) + return nil + } + + events.RegisterGauge(ctx, events.RLTokens, cb) + }) +} + // ConsumeNTokens ensures any calls using this context will consume // n rate-limiter tokens. Default is 1, and this value does not need // to be established in the context to consume the default tokens. @@ -192,8 +206,7 @@ func QueueRequest(ctx context.Context) { defaultConsumed = driveDefaultLC } - t := limiter.Tokens() - events.IncN(ctx, int(t), events.RLTokens) + // events.IncN(ctx, int(t), events.RLTokens) consume := ctxLimiterConsumption(ctx, defaultConsumed) @@ -210,7 +223,9 @@ func (mw *RateLimiterMiddleware) Intercept( middlewareIndex int, req *http.Request, ) (*http.Response, error) { + RegisterRLMetrics(req.Context()) QueueRequest(req.Context()) + return pipeline.Next(req, middlewareIndex) }