From 3fa45009cba5772d36bff4f3c2b884d98ae2f073 Mon Sep 17 00:00:00 2001 From: Keepers Date: Thu, 27 Oct 2022 09:47:26 -0600 Subject: [PATCH] hand corso logger to rudderstack (#1325) ## Description Use the corso logger in analytics, to allow control over log outputs from that package. ## Type of change - [x] :hamster: Trivial/Minor ## Issue(s) * #1324 ## Test Plan - [x] :muscle: Manual --- src/internal/events/events.go | 20 +++++++-- src/internal/events/events_test.go | 9 ++++- src/pkg/logger/logger.go | 65 ++++++++++++++++++++++++++++++ src/pkg/repository/repository.go | 14 ++++++- 4 files changed, 100 insertions(+), 8 deletions(-) diff --git a/src/internal/events/events.go b/src/internal/events/events.go index 3f63bd787..600db9001 100644 --- a/src/internal/events/events.go +++ b/src/internal/events/events.go @@ -7,6 +7,7 @@ import ( "os" "time" + "github.com/pkg/errors" analytics "github.com/rudderlabs/analytics-go" "github.com/alcionai/corso/src/pkg/control" @@ -61,9 +62,9 @@ var ( RudderStackDataPlaneURL string ) -func NewBus(s storage.Storage, tenID string, opts control.Options) Bus { +func NewBus(ctx context.Context, s storage.Storage, tenID string, opts control.Options) (Bus, error) { if opts.DisableMetrics { - return Bus{} + return Bus{}, nil } hash := repoHash(s, tenID) @@ -79,15 +80,26 @@ func NewBus(s storage.Storage, tenID string, opts control.Options) Bus { } var client analytics.Client + if len(RudderStackWriteKey) > 0 && len(RudderStackDataPlaneURL) > 0 { - client = analytics.New(RudderStackWriteKey, RudderStackDataPlaneURL) + var err error + client, err = analytics.NewWithConfig( + RudderStackWriteKey, + RudderStackDataPlaneURL, + analytics.Config{ + Logger: logger.WrapCtx(ctx, logger.ForceDebugLogLevel()), + }) + + if err != nil { + return Bus{}, errors.Wrap(err, "configuring event bus") + } } return Bus{ client: client, repoID: hash, version: "vTODO", // TODO: corso versioning implementation - } + }, nil } func (b Bus) Close() error { diff --git a/src/internal/events/events_test.go b/src/internal/events/events_test.go index 85c3aba05..8911c280a 100644 --- a/src/internal/events/events_test.go +++ b/src/internal/events/events_test.go @@ -27,6 +27,9 @@ func TestMetricsIntegrationSuite(t *testing.T) { } func (suite *EventsIntegrationSuite) TestNewBus() { + ctx, flush := tester.NewContext() + defer flush() + t := suite.T() s, err := storage.NewStorage( @@ -50,11 +53,13 @@ func (suite *EventsIntegrationSuite) TestNewBus() { ) require.NoError(t, err) - b := events.NewBus(s, a.ID(), control.Options{}) + b, err := events.NewBus(ctx, s, a.ID(), control.Options{}) require.NotEmpty(t, b) + require.NoError(t, err) require.NoError(t, b.Close()) - b2 := events.NewBus(s, a.ID(), control.Options{DisableMetrics: true}) + b2, err := events.NewBus(ctx, s, a.ID(), control.Options{DisableMetrics: true}) require.Empty(t, b2) + require.NoError(t, err) require.NoError(t, b2.Close()) } diff --git a/src/pkg/logger/logger.go b/src/pkg/logger/logger.go index 03c6eb93e..231334d89 100644 --- a/src/pkg/logger/logger.go +++ b/src/pkg/logger/logger.go @@ -136,6 +136,10 @@ func singleton(level logLevel) *zap.SugaredLogger { return loggerton } +// ------------------------------------------------------------------------------------------------ +// context management +// ------------------------------------------------------------------------------------------------ + type loggingKey string const ctxKey loggingKey = "corsoLogger" @@ -196,3 +200,64 @@ func levelOf(lvl string) logLevel { func Flush(ctx context.Context) { _ = Ctx(ctx).Sync() } + +// ------------------------------------------------------------------------------------------------ +// log wrapper for downstream api compliance +// ------------------------------------------------------------------------------------------------ + +type wrapper struct { + zap.SugaredLogger + + forceDebugLogLevel bool +} + +func (w *wrapper) process(opts ...option) { + for _, opt := range opts { + opt(w) + } +} + +type option func(*wrapper) + +// ForceDebugLogLevel reduces all logs emitted in the wrapper to +// debug level, independent of their original log level. Useful +// for silencing noisy dependency packages without losing the info +// altogether. +func ForceDebugLogLevel() option { + return func(w *wrapper) { + w.forceDebugLogLevel = true + } +} + +// Wrap returns the logger in the package with an extended api used for +// dependency package interface compliance. +func WrapCtx(ctx context.Context, opts ...option) *wrapper { + return Wrap(Ctx(ctx), opts...) +} + +// Wrap returns the sugaredLogger with an extended api used for +// dependency package interface compliance. +func Wrap(zsl *zap.SugaredLogger, opts ...option) *wrapper { + w := &wrapper{SugaredLogger: *zsl} + w.process(opts...) + + return w +} + +func (w *wrapper) Logf(tmpl string, args ...any) { + if w.forceDebugLogLevel { + w.SugaredLogger.Debugf(tmpl, args...) + return + } + + w.SugaredLogger.Infof(tmpl, args...) +} + +func (w *wrapper) Errorf(tmpl string, args ...any) { + if w.forceDebugLogLevel { + w.SugaredLogger.Debugf(tmpl, args...) + return + } + + w.SugaredLogger.Errorf(tmpl, args...) +} diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 96e20b572..3e91ea964 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -102,12 +102,17 @@ func Initialize( return nil, err } + bus, err := events.NewBus(ctx, s, acct.ID(), opts) + if err != nil { + return nil, err + } + r := &repository{ ID: uuid.New(), Version: "v1", Account: acct, Storage: s, - Bus: events.NewBus(s, acct.ID(), opts), + Bus: bus, dataLayer: w, modelStore: ms, } @@ -146,12 +151,17 @@ func Connect( return nil, err } + bus, err := events.NewBus(ctx, s, acct.ID(), opts) + if err != nil { + return nil, err + } + // todo: ID and CreatedAt should get retrieved from a stored kopia config. return &repository{ Version: "v1", Account: acct, Storage: s, - Bus: events.NewBus(s, acct.ID(), opts), + Bus: bus, dataLayer: w, modelStore: ms, }, nil