Add graph metrics

This commit is contained in:
Abhishek Pandey 2023-10-06 22:43:54 +05:30
parent 252ec4f7a7
commit 31c435af16
6 changed files with 95 additions and 114 deletions

View File

@ -2,9 +2,8 @@ package events
import ( import (
"context" "context"
"log" "io"
"os" "os"
"time"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
@ -26,58 +25,89 @@ type collector struct {
meter metric.Meter meter metric.Meter
} }
func Newcollector(mp metric.MeterProvider) collector { const (
APITokens = "api_tokens"
GrowCounter = "grow_counter"
RLTokens = "rate_limit_tokens"
)
// Array of metric keys
var metricKeys = []string{
APITokens,
GrowCounter,
RLTokens,
}
// Map of metricsCategory to metric.Int64Counter
var data = map[string]metric.Int64Counter{}
func NewCollector(mp metric.MeterProvider) {
rmc := collector{} rmc := collector{}
rmc.meter = mp.Meter("corso-meter") rmc.meter = mp.Meter("corso-meter")
rmc.registerCounter()
return rmc for _, key := range metricKeys {
} data[key], _ = rmc.meter.Int64Counter(key)
func (rmc *collector) RegisterMetricsClient(ctx context.Context, cfg Config) {
go func() {
for {
rmc.updateCounter(ctx)
time.Sleep(time.Second * 1)
}
}()
}
func (rmc *collector) registerCounter() {
Ctr, _ = rmc.meter.Int64Counter(growCounter)
AsyncCtr, _ = rmc.meter.Int64ObservableCounter("async_counter")
cb := func(_ context.Context, o metric.Observer) error {
logger.Ctx(context.Background()).Infow("Async counter callback")
token += 100
o.ObserveInt64(AsyncCtr, token)
return nil
}
_, err := rmc.meter.RegisterCallback(
cb,
AsyncCtr,
)
if err != nil {
log.Fatalf("failed to register callback: %v", err)
} }
} }
func (rmc *collector) updateCounter(ctx context.Context) { func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) {
logger.Ctx(ctx).Infow("updateCounter") mp := StartClient(ctx)
NewCollector(mp)
Ctr.Add(ctx, 20) return ctx, func() {}
} }
type Config struct { // Inc increments the given category by 1.
Host string func Inc(ctx context.Context, cat string) {
Port string ctr := data[cat]
ctr.Add(context.Background(), 1)
} }
// IncN increments the given category by N.
func IncN(ctx context.Context, n int, cat string) {
ctr := data[cat]
ctr.Add(context.Background(), int64(n))
}
// func (rmc *collector) RegisterMetricsClient(ctx context.Context) {
// go func() {
// for {
// rmc.updateCounter(ctx)
// time.Sleep(time.Second * 1)
// }
// }()
// }
// func (rmc *collector) registerCounter() {
// Ctr, _ = rmc.meter.Int64Counter(growCounter)
// AsyncCtr, _ = rmc.meter.Int64ObservableCounter("async_counter")
// cb := func(_ context.Context, o metric.Observer) error {
// logger.Ctx(context.Background()).Infow("Async counter callback")
// token += 100
// o.ObserveInt64(AsyncCtr, token)
// return nil
// }
// _, err := rmc.meter.RegisterCallback(
// cb,
// AsyncCtr,
// )
// if err != nil {
// log.Fatalf("failed to register callback: %v", err)
// }
// }
// func (rmc *collector) updateCounter(ctx context.Context) {
// logger.Ctx(ctx).Infow("updateCounter")
// Ctr.Add(ctx, 20)
// }
func StartClient(ctx context.Context) *metricSdk.MeterProvider { func StartClient(ctx context.Context) *metricSdk.MeterProvider {
res := resource.NewWithAttributes( res := resource.NewWithAttributes(
semconv.SchemaURL, semconv.SchemaURL,
@ -99,7 +129,7 @@ func StartClient(ctx context.Context) *metricSdk.MeterProvider {
otlpmetricgrpc.WithTemporalitySelector(metricSdk.DefaultTemporalitySelector), otlpmetricgrpc.WithTemporalitySelector(metricSdk.DefaultTemporalitySelector),
) )
if err != nil { if err != nil {
log.Fatalf("failed to create new OTLP metric exporter: %v", err) logger.CtxErr(ctx, err).Error("creating metrics exporter")
} }
meterProvider := metricSdk.NewMeterProvider( meterProvider := metricSdk.NewMeterProvider(
@ -111,18 +141,3 @@ func StartClient(ctx context.Context) *metricSdk.MeterProvider {
return meterProvider return meterProvider
} }
// func deltaSelector(kind metricSdk.InstrumentKind) metricdata.Temporality {
// switch kind {
// case metricSdk.InstrumentKindCounter,
// metricSdk.InstrumentKindHistogram,
// metricSdk.InstrumentKindObservableGauge,
// metricSdk.InstrumentKindObservableCounter:
// return metricdata.DeltaTemporality
// case metricSdk.InstrumentKindUpDownCounter,
// metricSdk.InstrumentKindObservableUpDownCounter:
// return metricdata.CumulativeTemporality
// }
// panic("unknown instrument kind")
// }

View File

@ -5,13 +5,11 @@ import (
"crypto/md5" "crypto/md5"
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"io"
"math" "math"
"os" "os"
"time" "time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/armon/go-metrics"
analytics "github.com/rudderlabs/analytics-go" analytics "github.com/rudderlabs/analytics-go"
"github.com/alcionai/corso/src/internal/version" "github.com/alcionai/corso/src/internal/version"
@ -196,44 +194,10 @@ func tenantHash(tenID string) string {
// metrics aggregation // metrics aggregation
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
type metricsCategory string // metrics collection buckets
// metrics collection bucket // // Since records the duration between the provided time and now, in millis.
const ( // func Since(start time.Time, cat metricsCategory, keys ...string) {
APICall = "api_call" // cats := append([]string{string(cat)}, keys...)
growCounter = "grow_counter" // metrics.MeasureSince(cats, start)
) // }
// configurations
const (
// reportInterval = 1 * time.Minute
// retentionDuration = 2 * time.Minute
)
func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) {
mp := StartClient(ctx)
//mpx := otel.GetMeterProvider()
rmc := Newcollector(mp)
rmc.RegisterMetricsClient(ctx, Config{})
return ctx, func() {}
}
// Inc increments the given category by 1.
func Inc(cat metricsCategory, keys ...string) {
cats := append([]string{string(cat)}, keys...)
metrics.IncrCounter(cats, 1)
}
// IncN increments the given category by N.
func IncN(n int, cat metricsCategory, keys ...string) {
cats := append([]string{string(cat)}, keys...)
metrics.IncrCounter(cats, float32(n))
}
// Since records the duration between the provided time and now, in millis.
func Since(start time.Time, cat metricsCategory, keys ...string) {
cats := append([]string{string(cat)}, keys...)
metrics.MeasureSince(cats, start)
}

View File

@ -11,6 +11,7 @@ import (
khttp "github.com/microsoft/kiota-http-go" khttp "github.com/microsoft/kiota-http-go"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
@ -191,6 +192,9 @@ func QueueRequest(ctx context.Context) {
defaultConsumed = driveDefaultLC defaultConsumed = driveDefaultLC
} }
t := limiter.Tokens()
events.IncN(ctx, int(t), events.RLTokens)
consume := ctxLimiterConsumption(ctx, defaultConsumed) consume := ctxLimiterConsumption(ctx, defaultConsumed)
if err := limiter.WaitN(ctx, consume); err != nil { if err := limiter.WaitN(ctx, consume); err != nil {

View File

@ -11,7 +11,6 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/net/http2" "golang.org/x/net/http2"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/version" "github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
) )
@ -123,7 +122,7 @@ func (hw httpWrapper) Request(
} }
logger.Ctx(ctx).Debug("http2 stream error") logger.Ctx(ctx).Debug("http2 stream error")
events.Inc(events.APICall, "streamerror") //events.Inc(events.APICall, "streamerror")
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
} }

View File

@ -333,23 +333,23 @@ func (mw *MetricsMiddleware) Intercept(
req *http.Request, req *http.Request,
) (*http.Response, error) { ) (*http.Response, error) {
var ( var (
start = time.Now() //start = time.Now()
resp, err = pipeline.Next(req, middlewareIndex) resp, err = pipeline.Next(req, middlewareIndex)
status = "nil-resp" //status = "nil-resp"
) )
if resp == nil { if resp == nil {
return resp, err return resp, err
} }
if resp != nil { // if resp != nil {
status = resp.Status // status = resp.Status
} // }
events.Inc(events.APICall) //events.Inc(events.APICall)
events.Inc(events.APICall, status) //events.Inc(events.APICall, status)
events.Since(start, events.APICall) // events.Since(start, events.APICall)
events.Since(start, events.APICall, status) // events.Since(start, events.APICall, status)
// track the graph "resource cost" for each call (if not provided, assume 1) // track the graph "resource cost" for each call (if not provided, assume 1)
@ -365,7 +365,7 @@ func (mw *MetricsMiddleware) Intercept(
countBus := count.Ctx(req.Context()) countBus := count.Ctx(req.Context())
countBus.Add(count.APICallTokensConsumed, int64(xmrui)) countBus.Add(count.APICallTokensConsumed, int64(xmrui))
events.IncN(xmrui, events.APICall, xmruHeader) events.IncN(req.Context(), xmrui, events.APITokens)
return resp, err return resp, err
} }

View File

@ -16,7 +16,6 @@ import (
"github.com/alcionai/corso/src/internal/common/crash" "github.com/alcionai/corso/src/internal/common/crash"
"github.com/alcionai/corso/src/internal/common/idname" "github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/pkg/filters" "github.com/alcionai/corso/src/pkg/filters"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
@ -366,7 +365,7 @@ func (aw *adapterWrap) Send(
} }
logger.Ctx(ictx).Debug("http connection error") logger.Ctx(ictx).Debug("http connection error")
events.Inc(events.APICall, "connectionerror") //events.Inc(events.APICall, "connectionerror")
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
} }