Utilize metrics summary instead of dumping metrics through syscalls

This commit is contained in:
Abhishek Pandey 2023-04-11 20:35:37 -07:00
parent 569dbf18ce
commit 753acffec7

View File

@ -1,8 +1,10 @@
package events
import (
"bytes"
"context"
"crypto/md5"
"encoding/json"
"fmt"
"io"
"os"
@ -173,8 +175,8 @@ const APICall m = "api_call"
// configurations
const (
reportInterval = 1 * time.Minute
retentionDuration = 2 * time.Minute
reportInterval = 100 * time.Millisecond
retentionDuration = 200 * time.Millisecond
)
// NewMetrics embeds a metrics bus into the provided context. The bus can be
@ -186,28 +188,28 @@ func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) {
// logging rates; that's handled by dumpMetrics().
sink = metrics.NewInmemSink(reportInterval, retentionDuration)
cfg = metrics.DefaultConfig("corso")
sig = metrics.NewInmemSignal(sink, sentSignal, w)
//sig = metrics.NewInmemSignal(sink, sentSignal, w)
)
cfg.EnableHostname = false
cfg.EnableRuntimeMetrics = false
cfg.EnableRuntimeMetrics = true
gm, err := metrics.NewGlobal(cfg, sink)
if err != nil {
logger.CtxErr(ctx, err).Error("metrics bus constructor")
sig.Stop()
//sig.Stop()
return ctx, func() {}
}
stop := make(chan struct{})
go dumpMetrics(ctx, stop, sig)
go dumpMetrics(ctx, stop, sink)
flush := func() {
signalDump(ctx)
time.Sleep(500 * time.Millisecond)
close(stop)
sig.Stop()
//sig.Stop()
gm.Shutdown()
}
@ -220,19 +222,32 @@ func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) {
// it to dump metrics to the provided writer (which should be the logger).
// Expectation is for users to call this in a goroutine. Any signal or close() on the stop chan
// will exit the loop.
func dumpMetrics(ctx context.Context, stop <-chan struct{}, sig *metrics.InmemSignal) {
func dumpMetrics(ctx context.Context, stop <-chan struct{}, sink *metrics.InmemSink) {
tock := time.NewTicker(reportInterval)
for {
select {
case <-tock.C:
signalDump(ctx)
//signalDump(ctx)
raw, _ := sink.DisplayMetrics(nil, nil)
result := raw.(metrics.MetricsSummary)
summary, _ := dumpMetricsSummary(result)
fmt.Fprintln(os.Stderr, string(summary))
case <-stop:
return
}
}
}
func dumpMetricsSummary(summary metrics.MetricsSummary) ([]byte, error) {
buf := bytes.NewBuffer(nil)
if err := json.NewEncoder(buf).Encode(summary); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Inc increments the given category by 1.
func Inc(cat m, keys ...string) {
cats := append([]string{string(cat)}, keys...)