From 753acffec747c9adf0a16c22ec8405af00e80eff Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Tue, 11 Apr 2023 20:35:37 -0700 Subject: [PATCH] Utilize metrics summary instead of dumping metrics through syscalls --- src/internal/events/events.go | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/src/internal/events/events.go b/src/internal/events/events.go index 51d6c7e98..9fdae3387 100644 --- a/src/internal/events/events.go +++ b/src/internal/events/events.go @@ -1,8 +1,10 @@ package events import ( + "bytes" "context" "crypto/md5" + "encoding/json" "fmt" "io" "os" @@ -173,8 +175,8 @@ const APICall m = "api_call" // configurations const ( - reportInterval = 1 * time.Minute - retentionDuration = 2 * time.Minute + reportInterval = 100 * time.Millisecond + retentionDuration = 200 * time.Millisecond ) // NewMetrics embeds a metrics bus into the provided context. The bus can be @@ -186,28 +188,28 @@ func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) { // logging rates; that's handled by dumpMetrics(). sink = metrics.NewInmemSink(reportInterval, retentionDuration) cfg = metrics.DefaultConfig("corso") - sig = metrics.NewInmemSignal(sink, sentSignal, w) + //sig = metrics.NewInmemSignal(sink, sentSignal, w) ) cfg.EnableHostname = false - cfg.EnableRuntimeMetrics = false + cfg.EnableRuntimeMetrics = true gm, err := metrics.NewGlobal(cfg, sink) if err != nil { logger.CtxErr(ctx, err).Error("metrics bus constructor") - sig.Stop() + //sig.Stop() return ctx, func() {} } stop := make(chan struct{}) - go dumpMetrics(ctx, stop, sig) + go dumpMetrics(ctx, stop, sink) flush := func() { signalDump(ctx) time.Sleep(500 * time.Millisecond) close(stop) - sig.Stop() + //sig.Stop() gm.Shutdown() } @@ -220,19 +222,32 @@ func NewMetrics(ctx context.Context, w io.Writer) (context.Context, func()) { // it to dump metrics to the provided writer (which should be the logger). // Expectation is for users to call this in a goroutine. Any signal or close() on the stop chan // will exit the loop. -func dumpMetrics(ctx context.Context, stop <-chan struct{}, sig *metrics.InmemSignal) { +func dumpMetrics(ctx context.Context, stop <-chan struct{}, sink *metrics.InmemSink) { tock := time.NewTicker(reportInterval) for { select { case <-tock.C: - signalDump(ctx) + //signalDump(ctx) + raw, _ := sink.DisplayMetrics(nil, nil) + result := raw.(metrics.MetricsSummary) + summary, _ := dumpMetricsSummary(result) + fmt.Fprintln(os.Stderr, string(summary)) + case <-stop: return } } } +func dumpMetricsSummary(summary metrics.MetricsSummary) ([]byte, error) { + buf := bytes.NewBuffer(nil) + if err := json.NewEncoder(buf).Encode(summary); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + // Inc increments the given category by 1. func Inc(cat m, keys ...string) { cats := append([]string{string(cat)}, keys...)