diff --git a/src/internal/events/adot_sdk_client.go b/src/internal/events/adot_sdk_client.go index 4b2bcd3fc..cc0347591 100644 --- a/src/internal/events/adot_sdk_client.go +++ b/src/internal/events/adot_sdk_client.go @@ -2,9 +2,8 @@ package events import ( "context" - "log" + "io" "os" - "time" "github.com/alcionai/corso/src/pkg/logger" "go.opentelemetry.io/otel" @@ -26,58 +25,89 @@ type collector struct { meter metric.Meter } -func Newcollector(mp metric.MeterProvider) collector { +const ( + APITokens = "api_tokens" + GrowCounter = "grow_counter" + RLTokens = "rate_limit_tokens" +) + +// Array of metric keys +var metricKeys = []string{ + APITokens, + GrowCounter, + RLTokens, +} + +// Map of metricsCategory to metric.Int64Counter +var data = map[string]metric.Int64Counter{} + +func NewCollector(mp metric.MeterProvider) { rmc := collector{} rmc.meter = mp.Meter("corso-meter") - rmc.registerCounter() - return rmc -} - -func (rmc *collector) RegisterMetricsClient(ctx context.Context, cfg Config) { - go func() { - for { - rmc.updateCounter(ctx) - time.Sleep(time.Second * 1) - } - }() - -} - -func (rmc *collector) registerCounter() { - Ctr, _ = rmc.meter.Int64Counter(growCounter) - AsyncCtr, _ = rmc.meter.Int64ObservableCounter("async_counter") - - cb := func(_ context.Context, o metric.Observer) error { - logger.Ctx(context.Background()).Infow("Async counter callback") - token += 100 - o.ObserveInt64(AsyncCtr, token) - - return nil - } - - _, err := rmc.meter.RegisterCallback( - cb, - AsyncCtr, - ) - - if err != nil { - log.Fatalf("failed to register callback: %v", err) + for _, key := range metricKeys { + data[key], _ = rmc.meter.Int64Counter(key) } } -func (rmc *collector) updateCounter(ctx context.Context) { - logger.Ctx(ctx).Infow("updateCounter") +func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) { + mp := StartClient(ctx) + NewCollector(mp) - Ctr.Add(ctx, 20) + return ctx, func() {} } -type Config struct { - Host string - Port string +// Inc increments the given category by 1. +func Inc(ctx context.Context, cat string) { + ctr := data[cat] + ctr.Add(context.Background(), 1) } +// IncN increments the given category by N. +func IncN(ctx context.Context, n int, cat string) { + ctr := data[cat] + ctr.Add(context.Background(), int64(n)) +} + +// func (rmc *collector) RegisterMetricsClient(ctx context.Context) { +// go func() { +// for { +// rmc.updateCounter(ctx) +// time.Sleep(time.Second * 1) +// } +// }() + +// } + +// func (rmc *collector) registerCounter() { +// Ctr, _ = rmc.meter.Int64Counter(growCounter) +// AsyncCtr, _ = rmc.meter.Int64ObservableCounter("async_counter") + +// cb := func(_ context.Context, o metric.Observer) error { +// logger.Ctx(context.Background()).Infow("Async counter callback") +// token += 100 +// o.ObserveInt64(AsyncCtr, token) + +// return nil +// } + +// _, err := rmc.meter.RegisterCallback( +// cb, +// AsyncCtr, +// ) + +// if err != nil { +// log.Fatalf("failed to register callback: %v", err) +// } +// } + +// func (rmc *collector) updateCounter(ctx context.Context) { +// logger.Ctx(ctx).Infow("updateCounter") + +// Ctr.Add(ctx, 20) +// } + func StartClient(ctx context.Context) *metricSdk.MeterProvider { res := resource.NewWithAttributes( semconv.SchemaURL, @@ -99,7 +129,7 @@ func StartClient(ctx context.Context) *metricSdk.MeterProvider { otlpmetricgrpc.WithTemporalitySelector(metricSdk.DefaultTemporalitySelector), ) if err != nil { - log.Fatalf("failed to create new OTLP metric exporter: %v", err) + logger.CtxErr(ctx, err).Error("creating metrics exporter") } meterProvider := metricSdk.NewMeterProvider( @@ -111,18 +141,3 @@ func StartClient(ctx context.Context) *metricSdk.MeterProvider { return meterProvider } - -// func deltaSelector(kind metricSdk.InstrumentKind) metricdata.Temporality { -// switch kind { -// case metricSdk.InstrumentKindCounter, -// metricSdk.InstrumentKindHistogram, -// metricSdk.InstrumentKindObservableGauge, -// metricSdk.InstrumentKindObservableCounter: -// return metricdata.DeltaTemporality -// case metricSdk.InstrumentKindUpDownCounter, -// metricSdk.InstrumentKindObservableUpDownCounter: -// return metricdata.CumulativeTemporality -// } - -// panic("unknown instrument kind") -// } diff --git a/src/internal/events/events.go b/src/internal/events/events.go index 3f3509444..d93992574 100644 --- a/src/internal/events/events.go +++ b/src/internal/events/events.go @@ -5,13 +5,11 @@ import ( "crypto/md5" "crypto/sha256" "fmt" - "io" "math" "os" "time" "github.com/alcionai/clues" - "github.com/armon/go-metrics" analytics "github.com/rudderlabs/analytics-go" "github.com/alcionai/corso/src/internal/version" @@ -196,44 +194,10 @@ func tenantHash(tenID string) string { // metrics aggregation // --------------------------------------------------------------------------- -type metricsCategory string +// metrics collection buckets -// metrics collection bucket -const ( - APICall = "api_call" - growCounter = "grow_counter" -) - -// configurations -const ( -// reportInterval = 1 * time.Minute -// retentionDuration = 2 * time.Minute -) - -func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) { - mp := StartClient(ctx) - - //mpx := otel.GetMeterProvider() - rmc := Newcollector(mp) - rmc.RegisterMetricsClient(ctx, Config{}) - - return ctx, func() {} -} - -// Inc increments the given category by 1. -func Inc(cat metricsCategory, keys ...string) { - cats := append([]string{string(cat)}, keys...) - metrics.IncrCounter(cats, 1) -} - -// IncN increments the given category by N. -func IncN(n int, cat metricsCategory, keys ...string) { - cats := append([]string{string(cat)}, keys...) - metrics.IncrCounter(cats, float32(n)) -} - -// Since records the duration between the provided time and now, in millis. -func Since(start time.Time, cat metricsCategory, keys ...string) { - cats := append([]string{string(cat)}, keys...) - metrics.MeasureSince(cats, start) -} +// // Since records the duration between the provided time and now, in millis. +// func Since(start time.Time, cat metricsCategory, keys ...string) { +// cats := append([]string{string(cat)}, keys...) +// metrics.MeasureSince(cats, start) +// } diff --git a/src/internal/m365/graph/concurrency_middleware.go b/src/internal/m365/graph/concurrency_middleware.go index 6d651391e..5caa02a48 100644 --- a/src/internal/m365/graph/concurrency_middleware.go +++ b/src/internal/m365/graph/concurrency_middleware.go @@ -11,6 +11,7 @@ import ( khttp "github.com/microsoft/kiota-http-go" "golang.org/x/time/rate" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -191,6 +192,9 @@ func QueueRequest(ctx context.Context) { defaultConsumed = driveDefaultLC } + t := limiter.Tokens() + events.IncN(ctx, int(t), events.RLTokens) + consume := ctxLimiterConsumption(ctx, defaultConsumed) if err := limiter.WaitN(ctx, consume); err != nil { diff --git a/src/internal/m365/graph/http_wrapper.go b/src/internal/m365/graph/http_wrapper.go index 0948b9b9e..17f57de28 100644 --- a/src/internal/m365/graph/http_wrapper.go +++ b/src/internal/m365/graph/http_wrapper.go @@ -11,7 +11,6 @@ import ( "github.com/pkg/errors" "golang.org/x/net/http2" - "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/version" "github.com/alcionai/corso/src/pkg/logger" ) @@ -123,7 +122,7 @@ func (hw httpWrapper) Request( } logger.Ctx(ctx).Debug("http2 stream error") - events.Inc(events.APICall, "streamerror") + //events.Inc(events.APICall, "streamerror") time.Sleep(3 * time.Second) } diff --git a/src/internal/m365/graph/middleware.go b/src/internal/m365/graph/middleware.go index d5935a5b0..0a1f7b120 100644 --- a/src/internal/m365/graph/middleware.go +++ b/src/internal/m365/graph/middleware.go @@ -333,23 +333,23 @@ func (mw *MetricsMiddleware) Intercept( req *http.Request, ) (*http.Response, error) { var ( - start = time.Now() + //start = time.Now() resp, err = pipeline.Next(req, middlewareIndex) - status = "nil-resp" + //status = "nil-resp" ) if resp == nil { return resp, err } - if resp != nil { - status = resp.Status - } + // if resp != nil { + // status = resp.Status + // } - events.Inc(events.APICall) - events.Inc(events.APICall, status) - events.Since(start, events.APICall) - events.Since(start, events.APICall, status) + //events.Inc(events.APICall) + //events.Inc(events.APICall, status) + // events.Since(start, events.APICall) + // events.Since(start, events.APICall, status) // track the graph "resource cost" for each call (if not provided, assume 1) @@ -365,7 +365,7 @@ func (mw *MetricsMiddleware) Intercept( countBus := count.Ctx(req.Context()) countBus.Add(count.APICallTokensConsumed, int64(xmrui)) - events.IncN(xmrui, events.APICall, xmruHeader) + events.IncN(req.Context(), xmrui, events.APITokens) return resp, err } diff --git a/src/internal/m365/graph/service.go b/src/internal/m365/graph/service.go index 0b54b5589..ffc364999 100644 --- a/src/internal/m365/graph/service.go +++ b/src/internal/m365/graph/service.go @@ -16,7 +16,6 @@ import ( "github.com/alcionai/corso/src/internal/common/crash" "github.com/alcionai/corso/src/internal/common/idname" - "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/pkg/filters" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -366,7 +365,7 @@ func (aw *adapterWrap) Send( } logger.Ctx(ictx).Debug("http connection error") - events.Inc(events.APICall, "connectionerror") + //events.Inc(events.APICall, "connectionerror") time.Sleep(3 * time.Second) }