Add guage
This commit is contained in:
parent
4c4dcf46be
commit
ea3d5d7b9f
@ -19,6 +19,7 @@ import (
|
|||||||
|
|
||||||
var Ctr metric.Int64Counter
|
var Ctr metric.Int64Counter
|
||||||
var AsyncCtr metric.Int64ObservableCounter
|
var AsyncCtr metric.Int64ObservableCounter
|
||||||
|
var RLGauge metric.Int64ObservableGauge
|
||||||
var token int64
|
var token int64
|
||||||
|
|
||||||
type collector struct {
|
type collector struct {
|
||||||
@ -35,11 +36,13 @@ const (
|
|||||||
var metricKeys = []string{
|
var metricKeys = []string{
|
||||||
APITokens,
|
APITokens,
|
||||||
GrowCounter,
|
GrowCounter,
|
||||||
RLTokens,
|
// RLTokens,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Map of metricsCategory to metric.Int64Counter
|
// Map of metricsCategory to metric.Int64Counter
|
||||||
var data = map[string]metric.Int64Counter{}
|
var counter = map[string]metric.Int64Counter{}
|
||||||
|
|
||||||
|
//var guages = map[string]metric.Int64ObservableGauge{}
|
||||||
|
|
||||||
func NewCollector(mp metric.MeterProvider) {
|
func NewCollector(mp metric.MeterProvider) {
|
||||||
rmc := collector{}
|
rmc := collector{}
|
||||||
@ -47,12 +50,22 @@ func NewCollector(mp metric.MeterProvider) {
|
|||||||
rmc.meter = mp.Meter("corso-meter")
|
rmc.meter = mp.Meter("corso-meter")
|
||||||
|
|
||||||
for _, key := range metricKeys {
|
for _, key := range metricKeys {
|
||||||
data[key], _ = rmc.meter.Int64Counter(key)
|
counter[key], _ = rmc.meter.Int64Counter(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// // Create a new guage
|
}
|
||||||
// g, _ := rmc.meter.Int64ObservableGauge(RLTokens)
|
|
||||||
|
|
||||||
|
func RegisterGauge(ctx context.Context, name string, cb func(_ context.Context, o metric.Observer) error) {
|
||||||
|
RLGauge, _ := otel.Meter("corso-meter").Int64ObservableGauge(name)
|
||||||
|
|
||||||
|
_, err := otel.Meter("corso-meter").RegisterCallback(
|
||||||
|
cb,
|
||||||
|
RLGauge,
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logger.CtxErr(ctx, err).Errorw("failed to register callback")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) {
|
func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) {
|
||||||
@ -64,13 +77,13 @@ func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) {
|
|||||||
|
|
||||||
// Inc increments the given category by 1.
|
// Inc increments the given category by 1.
|
||||||
func Inc(ctx context.Context, cat string) {
|
func Inc(ctx context.Context, cat string) {
|
||||||
ctr := data[cat]
|
ctr := counter[cat]
|
||||||
ctr.Add(ctx, 1)
|
ctr.Add(ctx, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IncN increments the given category by N.
|
// IncN increments the given category by N.
|
||||||
func IncN(ctx context.Context, n int, cat string) {
|
func IncN(ctx context.Context, n int, cat string) {
|
||||||
ctr := data[cat]
|
ctr := counter[cat]
|
||||||
ctr.Add(ctx, int64(n))
|
ctr.Add(ctx, int64(n))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -9,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"github.com/alcionai/clues"
|
"github.com/alcionai/clues"
|
||||||
khttp "github.com/microsoft/kiota-http-go"
|
khttp "github.com/microsoft/kiota-http-go"
|
||||||
|
"go.opentelemetry.io/otel/metric"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/events"
|
"github.com/alcionai/corso/src/internal/events"
|
||||||
@ -159,6 +160,19 @@ const (
|
|||||||
PermissionsLC = 5
|
PermissionsLC = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var twonce sync.Once
|
||||||
|
|
||||||
|
func RegisterRLMetrics(ctx context.Context) {
|
||||||
|
twonce.Do(func() {
|
||||||
|
cb := func(_ context.Context, o metric.Observer) error {
|
||||||
|
o.ObserveInt64(events.RLGauge, int64(ctxLimiter(ctx).Tokens()))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
events.RegisterGauge(ctx, events.RLTokens, cb)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// ConsumeNTokens ensures any calls using this context will consume
|
// ConsumeNTokens ensures any calls using this context will consume
|
||||||
// n rate-limiter tokens. Default is 1, and this value does not need
|
// n rate-limiter tokens. Default is 1, and this value does not need
|
||||||
// to be established in the context to consume the default tokens.
|
// to be established in the context to consume the default tokens.
|
||||||
@ -192,8 +206,7 @@ func QueueRequest(ctx context.Context) {
|
|||||||
defaultConsumed = driveDefaultLC
|
defaultConsumed = driveDefaultLC
|
||||||
}
|
}
|
||||||
|
|
||||||
t := limiter.Tokens()
|
// events.IncN(ctx, int(t), events.RLTokens)
|
||||||
events.IncN(ctx, int(t), events.RLTokens)
|
|
||||||
|
|
||||||
consume := ctxLimiterConsumption(ctx, defaultConsumed)
|
consume := ctxLimiterConsumption(ctx, defaultConsumed)
|
||||||
|
|
||||||
@ -210,7 +223,9 @@ func (mw *RateLimiterMiddleware) Intercept(
|
|||||||
middlewareIndex int,
|
middlewareIndex int,
|
||||||
req *http.Request,
|
req *http.Request,
|
||||||
) (*http.Response, error) {
|
) (*http.Response, error) {
|
||||||
|
RegisterRLMetrics(req.Context())
|
||||||
QueueRequest(req.Context())
|
QueueRequest(req.Context())
|
||||||
|
|
||||||
return pipeline.Next(req, middlewareIndex)
|
return pipeline.Next(req, middlewareIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user