corso/src/internal/events/adot_sdk_client.go
Abhishek Pandey a08c36c910 Fix for reals
2023-10-07 01:00:00 +05:30

150 lines
3.1 KiB
Go

package events
import (
"context"
"io"
"log"
"os"
"github.com/alcionai/corso/src/pkg/logger"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"google.golang.org/grpc"
"go.opentelemetry.io/otel/metric"
metricSdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
)
var Ctr metric.Int64Counter
var AsyncCtr metric.Int64ObservableCounter
var token int64
type collector struct {
meter metric.Meter
}
const (
APITokens = "api_tokens"
GrowCounter = "grow_counter"
RLTokens = "rate_limit_tokens"
)
// Array of metric keys
var metricKeys = []string{
APITokens,
GrowCounter,
// RLTokens,
}
var globalMeter metric.Meter
// Map of metricsCategory to metric.Int64Counter
var counter = map[string]metric.Int64Counter{}
func NewCollector(mp metric.MeterProvider) {
rmc := collector{}
rmc.meter = mp.Meter("corso-meter")
globalMeter = rmc.meter
for _, key := range metricKeys {
counter[key], _ = rmc.meter.Int64Counter(key)
}
AsyncCtr, _ = rmc.meter.Int64ObservableCounter("async_counter")
cb := func(_ context.Context, o metric.Observer) error {
logger.Ctx(context.Background()).Infow("Async counter callback")
token += 100
o.ObserveInt64(AsyncCtr, token)
return nil
}
_, err := rmc.meter.RegisterCallback(
cb,
AsyncCtr,
)
if err != nil {
log.Fatalf("failed to register callback: %v", err)
}
}
func CreateGauge(ctx context.Context, name string) metric.Int64ObservableCounter {
RLGauge, _ := globalMeter.Int64ObservableCounter(name)
return RLGauge
}
func RegisterGauge(
ctx context.Context,
rlg metric.Int64ObservableCounter,
cb func(_ context.Context, o metric.Observer) error) {
_, err := globalMeter.RegisterCallback(
cb,
rlg,
)
if err != nil {
logger.CtxErr(ctx, err).Errorw("failed to register callback")
}
}
func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) {
mp := StartClient(ctx)
NewCollector(mp)
return ctx, func() {}
}
// Inc increments the given category by 1.
func Inc(ctx context.Context, cat string) {
ctr := counter[cat]
ctr.Add(ctx, 1)
}
// IncN increments the given category by N.
func IncN(ctx context.Context, n int, cat string) {
ctr := counter[cat]
ctr.Add(ctx, int64(n))
}
func StartClient(ctx context.Context) *metricSdk.MeterProvider {
res := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("corso"),
)
if _, present := os.LookupEnv("OTEL_RESOURCE_ATTRIBUTES"); present {
envResource, err := resource.New(ctx, resource.WithFromEnv())
if err != nil {
return nil
}
res = envResource
}
exp, err := otlpmetricgrpc.New(
ctx,
otlpmetricgrpc.WithInsecure(),
otlpmetricgrpc.WithEndpoint("0.0.0.0:4317"),
otlpmetricgrpc.WithDialOption(grpc.WithBlock()),
otlpmetricgrpc.WithTemporalitySelector(metricSdk.DefaultTemporalitySelector),
)
if err != nil {
logger.CtxErr(ctx, err).Error("creating metrics exporter")
}
meterProvider := metricSdk.NewMeterProvider(
metricSdk.WithReader(metricSdk.NewPeriodicReader(exp)),
metricSdk.WithResource(res),
)
otel.SetMeterProvider(meterProvider)
return meterProvider
}