hand corso logger to rudderstack (#1325)
## Description Use the corso logger in analytics, to allow control over log outputs from that package. ## Type of change - [x] 🐹 Trivial/Minor ## Issue(s) * #1324 ## Test Plan - [x] 💪 Manual
This commit is contained in:
parent
ba7344b963
commit
3fa45009cb
@ -7,6 +7,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
analytics "github.com/rudderlabs/analytics-go"
|
analytics "github.com/rudderlabs/analytics-go"
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
@ -61,9 +62,9 @@ var (
|
|||||||
RudderStackDataPlaneURL string
|
RudderStackDataPlaneURL string
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewBus(s storage.Storage, tenID string, opts control.Options) Bus {
|
func NewBus(ctx context.Context, s storage.Storage, tenID string, opts control.Options) (Bus, error) {
|
||||||
if opts.DisableMetrics {
|
if opts.DisableMetrics {
|
||||||
return Bus{}
|
return Bus{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
hash := repoHash(s, tenID)
|
hash := repoHash(s, tenID)
|
||||||
@ -79,15 +80,26 @@ func NewBus(s storage.Storage, tenID string, opts control.Options) Bus {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var client analytics.Client
|
var client analytics.Client
|
||||||
|
|
||||||
if len(RudderStackWriteKey) > 0 && len(RudderStackDataPlaneURL) > 0 {
|
if len(RudderStackWriteKey) > 0 && len(RudderStackDataPlaneURL) > 0 {
|
||||||
client = analytics.New(RudderStackWriteKey, RudderStackDataPlaneURL)
|
var err error
|
||||||
|
client, err = analytics.NewWithConfig(
|
||||||
|
RudderStackWriteKey,
|
||||||
|
RudderStackDataPlaneURL,
|
||||||
|
analytics.Config{
|
||||||
|
Logger: logger.WrapCtx(ctx, logger.ForceDebugLogLevel()),
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return Bus{}, errors.Wrap(err, "configuring event bus")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return Bus{
|
return Bus{
|
||||||
client: client,
|
client: client,
|
||||||
repoID: hash,
|
repoID: hash,
|
||||||
version: "vTODO", // TODO: corso versioning implementation
|
version: "vTODO", // TODO: corso versioning implementation
|
||||||
}
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b Bus) Close() error {
|
func (b Bus) Close() error {
|
||||||
|
|||||||
@ -27,6 +27,9 @@ func TestMetricsIntegrationSuite(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (suite *EventsIntegrationSuite) TestNewBus() {
|
func (suite *EventsIntegrationSuite) TestNewBus() {
|
||||||
|
ctx, flush := tester.NewContext()
|
||||||
|
defer flush()
|
||||||
|
|
||||||
t := suite.T()
|
t := suite.T()
|
||||||
|
|
||||||
s, err := storage.NewStorage(
|
s, err := storage.NewStorage(
|
||||||
@ -50,11 +53,13 @@ func (suite *EventsIntegrationSuite) TestNewBus() {
|
|||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
b := events.NewBus(s, a.ID(), control.Options{})
|
b, err := events.NewBus(ctx, s, a.ID(), control.Options{})
|
||||||
require.NotEmpty(t, b)
|
require.NotEmpty(t, b)
|
||||||
|
require.NoError(t, err)
|
||||||
require.NoError(t, b.Close())
|
require.NoError(t, b.Close())
|
||||||
|
|
||||||
b2 := events.NewBus(s, a.ID(), control.Options{DisableMetrics: true})
|
b2, err := events.NewBus(ctx, s, a.ID(), control.Options{DisableMetrics: true})
|
||||||
require.Empty(t, b2)
|
require.Empty(t, b2)
|
||||||
|
require.NoError(t, err)
|
||||||
require.NoError(t, b2.Close())
|
require.NoError(t, b2.Close())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -136,6 +136,10 @@ func singleton(level logLevel) *zap.SugaredLogger {
|
|||||||
return loggerton
|
return loggerton
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------------------------------------------------------
|
||||||
|
// context management
|
||||||
|
// ------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
type loggingKey string
|
type loggingKey string
|
||||||
|
|
||||||
const ctxKey loggingKey = "corsoLogger"
|
const ctxKey loggingKey = "corsoLogger"
|
||||||
@ -196,3 +200,64 @@ func levelOf(lvl string) logLevel {
|
|||||||
func Flush(ctx context.Context) {
|
func Flush(ctx context.Context) {
|
||||||
_ = Ctx(ctx).Sync()
|
_ = Ctx(ctx).Sync()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------------------------------------------------------
|
||||||
|
// log wrapper for downstream api compliance
|
||||||
|
// ------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type wrapper struct {
|
||||||
|
zap.SugaredLogger
|
||||||
|
|
||||||
|
forceDebugLogLevel bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapper) process(opts ...option) {
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(w)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type option func(*wrapper)
|
||||||
|
|
||||||
|
// ForceDebugLogLevel reduces all logs emitted in the wrapper to
|
||||||
|
// debug level, independent of their original log level. Useful
|
||||||
|
// for silencing noisy dependency packages without losing the info
|
||||||
|
// altogether.
|
||||||
|
func ForceDebugLogLevel() option {
|
||||||
|
return func(w *wrapper) {
|
||||||
|
w.forceDebugLogLevel = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wrap returns the logger in the package with an extended api used for
|
||||||
|
// dependency package interface compliance.
|
||||||
|
func WrapCtx(ctx context.Context, opts ...option) *wrapper {
|
||||||
|
return Wrap(Ctx(ctx), opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wrap returns the sugaredLogger with an extended api used for
|
||||||
|
// dependency package interface compliance.
|
||||||
|
func Wrap(zsl *zap.SugaredLogger, opts ...option) *wrapper {
|
||||||
|
w := &wrapper{SugaredLogger: *zsl}
|
||||||
|
w.process(opts...)
|
||||||
|
|
||||||
|
return w
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapper) Logf(tmpl string, args ...any) {
|
||||||
|
if w.forceDebugLogLevel {
|
||||||
|
w.SugaredLogger.Debugf(tmpl, args...)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.SugaredLogger.Infof(tmpl, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *wrapper) Errorf(tmpl string, args ...any) {
|
||||||
|
if w.forceDebugLogLevel {
|
||||||
|
w.SugaredLogger.Debugf(tmpl, args...)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.SugaredLogger.Errorf(tmpl, args...)
|
||||||
|
}
|
||||||
|
|||||||
@ -102,12 +102,17 @@ func Initialize(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bus, err := events.NewBus(ctx, s, acct.ID(), opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
r := &repository{
|
r := &repository{
|
||||||
ID: uuid.New(),
|
ID: uuid.New(),
|
||||||
Version: "v1",
|
Version: "v1",
|
||||||
Account: acct,
|
Account: acct,
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Bus: events.NewBus(s, acct.ID(), opts),
|
Bus: bus,
|
||||||
dataLayer: w,
|
dataLayer: w,
|
||||||
modelStore: ms,
|
modelStore: ms,
|
||||||
}
|
}
|
||||||
@ -146,12 +151,17 @@ func Connect(
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bus, err := events.NewBus(ctx, s, acct.ID(), opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
// todo: ID and CreatedAt should get retrieved from a stored kopia config.
|
// todo: ID and CreatedAt should get retrieved from a stored kopia config.
|
||||||
return &repository{
|
return &repository{
|
||||||
Version: "v1",
|
Version: "v1",
|
||||||
Account: acct,
|
Account: acct,
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Bus: events.NewBus(s, acct.ID(), opts),
|
Bus: bus,
|
||||||
dataLayer: w,
|
dataLayer: w,
|
||||||
modelStore: ms,
|
modelStore: ms,
|
||||||
}, nil
|
}, nil
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user