From a6ba26cd929a95268dd26806f75a6ee4bfe7d28a Mon Sep 17 00:00:00 2001 From: Keepers Date: Thu, 15 Sep 2022 12:03:20 -0600 Subject: [PATCH] generate events (#863) ## Description Adds events for repo init, backup start and end, and restore start and end. Not all expected values are available. TODOs have been marked for values that need future implementation. ## Type of change - [x] :sunflower: Feature ## Issue(s) * #741 ## Test Plan - [x] :zap: Unit test --- src/internal/events/events.go | 60 +++++++++++++++++++---- src/internal/events/events_test.go | 26 +++++++++- src/internal/operations/backup.go | 32 +++++++++++- src/internal/operations/backup_test.go | 18 +++++-- src/internal/operations/operation.go | 4 ++ src/internal/operations/operation_test.go | 5 +- src/internal/operations/restore.go | 40 ++++++++++++++- src/internal/operations/restore_test.go | 25 +++++++--- src/pkg/repository/repository.go | 41 ++++++++++------ 9 files changed, 210 insertions(+), 41 deletions(-) diff --git a/src/internal/events/events.go b/src/internal/events/events.go index f2b094f6e..3f757792c 100644 --- a/src/internal/events/events.go +++ b/src/internal/events/events.go @@ -9,7 +9,9 @@ import ( analytics "github.com/rudderlabs/analytics-go" + "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/storage" ) // keys for ease of use @@ -19,9 +21,11 @@ const ( payload = "payload" // Event Keys - RepoInit = "repo-init" - BackupStart = "backup-start" - BackupEnd = "backup-end" + RepoInit = "repo-init" + BackupStart = "backup-start" + BackupEnd = "backup-end" + RestoreStart = "restore-start" + RestoreEnd = "restore-end" // Event Data Keys BackupID = "backup-id" @@ -32,6 +36,8 @@ const ( StartTime = "start-time" Duration = "duration" Status = "status" + ItemsRead = "items-read" + ItemsWritten = "items-written" ) // Bus handles all event communication into the events package. @@ -47,8 +53,8 @@ var ( DataPlaneURL string ) -func NewBus(repoProvider, bucket, prefix, tenantID string) Bus { - hash := repoHash(repoProvider, bucket, prefix, tenantID) +func NewBus(s storage.Storage, a account.Account) Bus { + hash := repoHash(s, a) envWK := os.Getenv("RUDDERSTACK_CORSO_WRITE_KEY") if len(envWK) > 0 { @@ -88,11 +94,15 @@ func (b Bus) Event(ctx context.Context, key string, data map[string]any) { props := analytics. NewProperties(). Set(repoID, b.repoID). - Set(corsoVersion, b.version). - Set(payload, data) + Set(corsoVersion, b.version) + + if len(data) > 0 { + props.Set(payload, data) + } err := b.client.Enqueue(analytics.Track{ Event: key, + UserId: b.repoID, Timestamp: time.Now().UTC(), Properties: props, }) @@ -101,9 +111,41 @@ func (b Bus) Event(ctx context.Context, key string, data map[string]any) { } } -func repoHash(repoProvider, bucket, prefix, tenantID string) string { +func storageID(s storage.Storage) string { + id := s.Provider.String() + + switch s.Provider { + case storage.ProviderS3: + s3, err := s.S3Config() + if err != nil { + return id + } + + id += s3.Bucket + s3.Prefix + } + + return id +} + +func accountID(a account.Account) string { + var id string + + switch a.Provider { + case account.ProviderM365: + m, err := a.M365Config() + if err != nil { + return id + } + + id += m.TenantID + } + + return id +} + +func repoHash(s storage.Storage, a account.Account) string { sum := md5.Sum( - []byte(repoProvider + bucket + prefix + tenantID), + []byte(storageID(s) + accountID(a)), ) return fmt.Sprintf("%x", sum) diff --git a/src/internal/events/events_test.go b/src/internal/events/events_test.go index 8892283d6..a906d07d6 100644 --- a/src/internal/events/events_test.go +++ b/src/internal/events/events_test.go @@ -8,6 +8,9 @@ import ( "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/credentials" + "github.com/alcionai/corso/src/pkg/storage" ) type EventsIntegrationSuite struct { @@ -25,7 +28,28 @@ func TestMetricsIntegrationSuite(t *testing.T) { func (suite *EventsIntegrationSuite) TestNewBus() { t := suite.T() - b := events.NewBus("s3", "bckt", "prfx", "tenid") + s, err := storage.NewStorage( + storage.ProviderS3, + storage.S3Config{ + Bucket: "bckt", + Prefix: "prfx", + }, + ) + require.NoError(t, err) + + a, err := account.NewAccount( + account.ProviderM365, + account.M365Config{ + M365: credentials.M365{ + ClientID: "id", + ClientSecret: "secret", + }, + TenantID: "tid", + }, + ) + require.NoError(t, err) + + b := events.NewBus(s, a) require.NotEmpty(t, b) require.NoError(t, b.Close()) diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 96fbdcc44..01ec55b7e 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -9,6 +9,7 @@ import ( "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/stats" @@ -46,9 +47,10 @@ func NewBackupOperation( sw *store.Wrapper, acct account.Account, selector selectors.Selector, + bus events.Bus, ) (BackupOperation, error) { op := BackupOperation{ - operation: newOperation(opts, kw, sw), + operation: newOperation(opts, bus, kw, sw), Selectors: selector, Version: "v0", account: acct, @@ -80,12 +82,23 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { var ( opStats backupStats backupDetails *details.Details + startTime = time.Now() ) // TODO: persist initial state of backupOperation in modelstore + op.bus.Event( + ctx, + events.BackupStart, + map[string]any{ + events.StartTime: startTime, + // TODO: initial backup ID, + // TODO: events.ExchangeResources: , + }, + ) + // persist operation results to the model store on exit defer func() { - err = op.persistResults(time.Now(), &opStats) + err = op.persistResults(startTime, &opStats) if err != nil { return } @@ -185,5 +198,20 @@ func (op *BackupOperation) createBackupModels( op.Results.BackupID = b.ID + op.bus.Event( + ctx, + events.BackupEnd, + map[string]any{ + events.BackupID: b.ID, + events.Status: op.Status, + events.StartTime: op.Results.StartedAt, + events.EndTime: op.Results.CompletedAt, + events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt), + // TODO: events.ExchangeResources: , + // TODO: events.ExchangeDataObserved: , + // TODO: events.ExchangeDataStored: , + }, + ) + return nil } diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index fe7cd3354..833aa48b0 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/account" @@ -53,7 +54,14 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() { } ) - op, err := NewBackupOperation(ctx, control.Options{}, kw, sw, acct, selectors.Selector{}) + op, err := NewBackupOperation( + ctx, + control.Options{}, + kw, + sw, + acct, + selectors.Selector{}, + events.Bus{}) require.NoError(t, err) require.NoError(t, op.persistResults(now, &stats)) @@ -119,7 +127,8 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { test.kw, test.sw, test.acct, - selectors.Selector{}) + selectors.Selector{}, + events.Bus{}) test.errCheck(t, err) }) } @@ -193,7 +202,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() { sw, acct, *selected, - ) + events.Bus{}) require.NoError(t, err) require.NoError(t, bo.Run(ctx)) @@ -246,7 +255,8 @@ func (suite *BackupOpIntegrationSuite) TestBackupOneDrive_Run() { kw, sw, acct, - sel.Selector) + sel.Selector, + events.Bus{}) require.NoError(t, err) require.NoError(t, bo.Run(ctx)) diff --git a/src/internal/operations/operation.go b/src/internal/operations/operation.go index 62e9eec37..81ef08841 100644 --- a/src/internal/operations/operation.go +++ b/src/internal/operations/operation.go @@ -5,6 +5,7 @@ import ( "github.com/pkg/errors" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/store" @@ -41,18 +42,21 @@ type operation struct { Options control.Options `json:"options"` Status opStatus `json:"status"` + bus events.Bus kopia *kopia.Wrapper store *store.Wrapper } func newOperation( opts control.Options, + bus events.Bus, kw *kopia.Wrapper, sw *store.Wrapper, ) operation { return operation{ CreatedAt: time.Now(), Options: opts, + bus: bus, kopia: kw, store: sw, Status: InProgress, diff --git a/src/internal/operations/operation_test.go b/src/internal/operations/operation_test.go index d98c4f6b2..5c1922646 100644 --- a/src/internal/operations/operation_test.go +++ b/src/internal/operations/operation_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/store" @@ -22,7 +23,7 @@ func TestOperationSuite(t *testing.T) { func (suite *OperationSuite) TestNewOperation() { t := suite.T() - op := newOperation(control.Options{}, nil, nil) + op := newOperation(control.Options{}, events.Bus{}, nil, nil) assert.Greater(t, op.CreatedAt, time.Time{}) } @@ -42,7 +43,7 @@ func (suite *OperationSuite) TestOperation_Validate() { } for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { - op := newOperation(control.Options{}, test.kw, test.sw) + op := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw) test.errCheck(t, op.validate()) }) } diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 3d660a323..61d2058e8 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -10,6 +10,7 @@ import ( "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/path" @@ -48,9 +49,10 @@ func NewRestoreOperation( acct account.Account, backupID model.StableID, sel selectors.Selector, + bus events.Bus, ) (RestoreOperation, error) { op := RestoreOperation{ - operation: newOperation(opts, kw, sw), + operation: newOperation(opts, bus, kw, sw), BackupID: backupID, Selectors: sel, Version: "v0", @@ -80,12 +82,28 @@ type restoreStats struct { // Run begins a synchronous restore operation. func (op *RestoreOperation) Run(ctx context.Context) (err error) { + startTime := time.Now() + // TODO: persist initial state of restoreOperation in modelstore + op.bus.Event( + ctx, + events.RestoreStart, + map[string]any{ + events.StartTime: startTime, + events.BackupID: op.BackupID, + // TODO: initial backup ID, + // TODO: events.ExchangeResources: , + // TODO: source backup time, + // TODO: restore options, + }, + ) + // persist operation results to the model store on exit opStats := restoreStats{} // TODO: persist results? + defer func() { - err = op.persistResults(time.Now(), &opStats) + err = op.persistResults(ctx, startTime, &opStats) if err != nil { return } @@ -170,6 +188,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (err error) { // writes the restoreOperation outcome to the modelStore. func (op *RestoreOperation) persistResults( + ctx context.Context, started time.Time, opStats *restoreStats, ) error { @@ -192,5 +211,22 @@ func (op *RestoreOperation) persistResults( op.Results.ItemsRead = len(opStats.cs) // TODO: file count, not collection count op.Results.ItemsWritten = opStats.gc.Successful + op.bus.Event( + ctx, + events.RestoreEnd, + map[string]any{ + // TODO: RestoreID + events.BackupID: op.BackupID, + events.Status: op.Status, + events.StartTime: op.Results.StartedAt, + events.EndTime: op.Results.CompletedAt, + events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt), + events.ItemsRead: op.Results.ItemsRead, + events.ItemsWritten: op.Results.ItemsWritten, + // TODO: events.ExchangeResources: , + // TODO: events.ExchangeDataObserved: , + }, + ) + return nil } diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index ea600f9d9..66cd04b7f 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -13,6 +13,7 @@ import ( "github.com/alcionai/corso/src/internal/connector/exchange" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/tester" @@ -56,10 +57,18 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { } ) - op, err := NewRestoreOperation(ctx, control.Options{}, kw, sw, acct, "foo", selectors.Selector{}) + op, err := NewRestoreOperation( + ctx, + control.Options{}, + kw, + sw, + acct, + "foo", + selectors.Selector{}, + events.Bus{}) require.NoError(t, err) - require.NoError(t, op.persistResults(now, &stats)) + require.NoError(t, op.persistResults(ctx, now, &stats)) assert.Equal(t, op.Status.String(), Completed.String(), "status") assert.Equal(t, op.Results.ItemsRead, len(stats.cs), "items read") @@ -138,7 +147,8 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() { kw, sw, acct, - bsel.Selector) + bsel.Selector, + events.Bus{}) require.NoError(t, err) require.NoError(t, bo.Run(ctx)) require.NotEmpty(t, bo.Results.BackupID) @@ -189,7 +199,8 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { test.sw, test.acct, "backup-id", - selectors.Selector{}) + selectors.Selector{}, + events.Bus{}) test.errCheck(t, err) }) } @@ -209,7 +220,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() { suite.sw, tester.NewM365Account(t), suite.backupID, - rsel.Selector) + rsel.Selector, + events.Bus{}) require.NoError(t, err) require.NoError(t, ro.Run(ctx), "restoreOp.Run()") @@ -236,7 +248,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() { suite.sw, tester.NewM365Account(t), suite.backupID, - rsel.Selector) + rsel.Selector, + events.Bus{}) require.NoError(t, err) require.Error(t, ro.Run(ctx), "restoreOp.Run() should have 0 results") } diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index b5405bf28..c4817d7b5 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -5,8 +5,8 @@ import ( "time" "github.com/google/uuid" - "github.com/pkg/errors" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/operations" @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/storage" "github.com/alcionai/corso/src/pkg/store" @@ -28,7 +29,7 @@ type Repository struct { Account account.Account // the user's m365 account connection details Storage storage.Storage // the storage provider details and configuration - // Bus events.Bus + Bus events.Bus dataLayer *kopia.Wrapper modelStore *kopia.ModelStore } @@ -69,10 +70,13 @@ func Initialize( Version: "v1", Account: acct, Storage: s, + Bus: events.NewBus(s, acct), dataLayer: w, modelStore: ms, } + r.Bus.Event(ctx, events.RepoInit, nil) + return &r, nil } @@ -109,6 +113,7 @@ func Connect( Version: "v1", Account: acct, Storage: s, + Bus: events.NewBus(s, acct), dataLayer: w, modelStore: ms, } @@ -117,23 +122,27 @@ func Connect( } func (r *Repository) Close(ctx context.Context) error { + if err := r.Bus.Close(); err != nil { + logger.Ctx(ctx).Debugw("closing the event bus", "err", err) + } + if r.dataLayer != nil { - err := r.dataLayer.Close(ctx) - r.dataLayer = nil - - if err != nil { - return errors.Wrap(err, "closing corso DataLayer") + if err := r.dataLayer.Close(ctx); err != nil { + logger.Ctx(ctx).Debugw("closing Datalayer", "err", err) } + + r.dataLayer = nil } - if r.modelStore == nil { - return nil + if r.modelStore != nil { + if err := r.modelStore.Close(ctx); err != nil { + logger.Ctx(ctx).Debugw("closing modelStore", "err", err) + } + + r.modelStore = nil } - err := r.modelStore.Close(ctx) - r.modelStore = nil - - return errors.Wrap(err, "closing corso ModelStore") + return nil } // NewBackup generates a BackupOperation runner. @@ -148,7 +157,8 @@ func (r Repository) NewBackup( r.dataLayer, store.NewKopiaStore(r.modelStore), r.Account, - selector) + selector, + r.Bus) } // NewRestore generates a restoreOperation runner. @@ -165,7 +175,8 @@ func (r Repository) NewRestore( store.NewKopiaStore(r.modelStore), r.Account, model.StableID(backupID), - sel) + sel, + r.Bus) } // backups lists a backup by id