With global ctrs

This commit is contained in:
Abhishek Pandey 2023-10-06 19:52:36 +05:30
parent 480e37cd29
commit 22b43b7255
2 changed files with 42 additions and 70 deletions

View File

@ -2,16 +2,13 @@ package events
import ( import (
"context" "context"
"fmt"
"log" "log"
"os" "os"
"time" "time"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
"go.opentelemetry.io/contrib/propagators/aws/xray"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0" semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -19,12 +16,12 @@ import (
metricSdk "go.opentelemetry.io/otel/sdk/metric" metricSdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
) )
var Ctr metric.Int64Counter
type collector struct { type collector struct {
growCounter metric.Int64Counter meter metric.Meter
meter metric.Meter
} }
func Newcollector(mp metric.MeterProvider) collector { func Newcollector(mp metric.MeterProvider) collector {
@ -40,28 +37,19 @@ func (rmc *collector) RegisterMetricsClient(ctx context.Context, cfg Config) {
go func() { go func() {
for { for {
rmc.updateCounter(ctx) rmc.updateCounter(ctx)
time.Sleep(time.Second * 10) time.Sleep(time.Second * 1)
} }
}() }()
} }
func (rmc *collector) registerCounter() { func (rmc *collector) registerCounter() {
ctr, err := rmc.meter.Int64Counter( Ctr, _ = rmc.meter.Int64Counter(growCounter)
growCounter,
metric.WithDescription("counter"),
metric.WithUnit("count"),
)
if err != nil {
fmt.Println(err)
}
rmc.growCounter = ctr
} }
func (rmc *collector) updateCounter(ctx context.Context) { func (rmc *collector) updateCounter(ctx context.Context) {
logger.Ctx(ctx).Infow("updateCounter") logger.Ctx(ctx).Infow("updateCounter")
rmc.growCounter.Add(ctx, 20) Ctr.Add(ctx, 20)
} }
type Config struct { type Config struct {
@ -69,7 +57,7 @@ type Config struct {
Port string Port string
} }
func StartClient(ctx context.Context) (func(context.Context) error, error) { func StartClient(ctx context.Context) *metricSdk.MeterProvider {
res := resource.NewWithAttributes( res := resource.NewWithAttributes(
semconv.SchemaURL, semconv.SchemaURL,
semconv.ServiceName("corso"), semconv.ServiceName("corso"),
@ -77,67 +65,56 @@ func StartClient(ctx context.Context) (func(context.Context) error, error) {
if _, present := os.LookupEnv("OTEL_RESOURCE_ATTRIBUTES"); present { if _, present := os.LookupEnv("OTEL_RESOURCE_ATTRIBUTES"); present {
envResource, err := resource.New(ctx, resource.WithFromEnv()) envResource, err := resource.New(ctx, resource.WithFromEnv())
if err != nil { if err != nil {
return nil, err return nil
} }
res = envResource res = envResource
} }
// Setup trace related // Setup trace related
tp, err := setupTraceProvider(ctx, res) // tp, err := setupTraceProvider(ctx, res)
if err != nil { // if err != nil {
return nil, err // return nil
} // }
otel.SetTracerProvider(tp) // otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(xray.Propagator{}) // Set AWS X-Ray propagator // otel.SetTextMapPropagator(xray.Propagator{}) // Set AWS X-Ray propagator
exp, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithInsecure(), otlpmetricgrpc.WithEndpoint("0.0.0.0:4317"), otlpmetricgrpc.WithDialOption(grpc.WithBlock())) exp, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithInsecure(), otlpmetricgrpc.WithEndpoint("0.0.0.0:4317"), otlpmetricgrpc.WithDialOption(grpc.WithBlock()))
if err != nil { if err != nil {
log.Fatalf("failed to create new OTLP metric exporter: %v", err) log.Fatalf("failed to create new OTLP metric exporter: %v", err)
} }
meterProvider := metricSdk.NewMeterProvider(metricSdk.WithResource(res), metricSdk.WithReader(metricSdk.NewPeriodicReader(exp))) meterProvider := metricSdk.NewMeterProvider(
metricSdk.WithReader(metricSdk.NewPeriodicReader(exp)),
metricSdk.WithResource(res),
)
otel.SetMeterProvider(meterProvider) otel.SetMeterProvider(meterProvider)
return func(context.Context) (err error) { return meterProvider
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
// pushes any last exports to the receiver
err = meterProvider.Shutdown(ctx)
if err != nil {
return err
}
err = tp.Shutdown(ctx)
if err != nil {
return err
}
return nil
}, nil
} }
// setupTraceProvider configures a trace exporter and an AWS X-Ray ID Generator. // // setupTraceProvider configures a trace exporter and an AWS X-Ray ID Generator.
func setupTraceProvider(ctx context.Context, res *resource.Resource) (*sdktrace.TracerProvider, error) { // func setupTraceProvider(ctx context.Context, res *resource.Resource) (*sdktrace.TracerProvider, error) {
// INSECURE !! NOT TO BE USED FOR ANYTHING IN PRODUCTION // // INSECURE !! NOT TO BE USED FOR ANYTHING IN PRODUCTION
// traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithInsecure()) // // traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithInsecure())
// Create and start new OTLP trace exporter // // Create and start new OTLP trace exporter
traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithInsecure(), otlptracegrpc.WithEndpoint("0.0.0.0:4317"), otlptracegrpc.WithDialOption(grpc.WithBlock())) // traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithInsecure(), otlptracegrpc.WithEndpoint("0.0.0.0:4317"), otlptracegrpc.WithDialOption(grpc.WithBlock()))
if err != nil { // if err != nil {
log.Fatalf("failed to create new OTLP trace exporter: %v", err) // log.Fatalf("failed to create new OTLP trace exporter: %v", err)
} // }
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
idg := xray.NewIDGenerator() // idg := xray.NewIDGenerator()
tp := sdktrace.NewTracerProvider( // tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()), // sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithBatcher(traceExporter), // sdktrace.WithBatcher(traceExporter),
sdktrace.WithResource(res), // sdktrace.WithResource(res),
sdktrace.WithIDGenerator(idg), // sdktrace.WithIDGenerator(idg),
) // )
return tp, nil // return tp, nil
} // }

View File

@ -13,7 +13,6 @@ import (
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
analytics "github.com/rudderlabs/analytics-go" analytics "github.com/rudderlabs/analytics-go"
"go.opentelemetry.io/otel"
"github.com/alcionai/corso/src/internal/version" "github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
@ -212,13 +211,9 @@ const (
) )
func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) { func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) {
_, err := StartClient(ctx) mp := StartClient(ctx)
if err != nil {
logger.CtxErr(ctx, err).Error("metrics bus constructor")
return ctx, func() {}
}
mp := otel.GetMeterProvider() //mpx := otel.GetMeterProvider()
rmc := Newcollector(mp) rmc := Newcollector(mp)
rmc.RegisterMetricsClient(ctx, Config{}) rmc.RegisterMetricsClient(ctx, Config{})