diff --git a/src/internal/events/events.go b/src/internal/events/events.go index f2b094f6e..3f757792c 100644 --- a/src/internal/events/events.go +++ b/src/internal/events/events.go @@ -9,7 +9,9 @@ import ( analytics "github.com/rudderlabs/analytics-go" + "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/storage" ) // keys for ease of use @@ -19,9 +21,11 @@ const ( payload = "payload" // Event Keys - RepoInit = "repo-init" - BackupStart = "backup-start" - BackupEnd = "backup-end" + RepoInit = "repo-init" + BackupStart = "backup-start" + BackupEnd = "backup-end" + RestoreStart = "restore-start" + RestoreEnd = "restore-end" // Event Data Keys BackupID = "backup-id" @@ -32,6 +36,8 @@ const ( StartTime = "start-time" Duration = "duration" Status = "status" + ItemsRead = "items-read" + ItemsWritten = "items-written" ) // Bus handles all event communication into the events package. @@ -47,8 +53,8 @@ var ( DataPlaneURL string ) -func NewBus(repoProvider, bucket, prefix, tenantID string) Bus { - hash := repoHash(repoProvider, bucket, prefix, tenantID) +func NewBus(s storage.Storage, a account.Account) Bus { + hash := repoHash(s, a) envWK := os.Getenv("RUDDERSTACK_CORSO_WRITE_KEY") if len(envWK) > 0 { @@ -88,11 +94,15 @@ func (b Bus) Event(ctx context.Context, key string, data map[string]any) { props := analytics. NewProperties(). Set(repoID, b.repoID). - Set(corsoVersion, b.version). - Set(payload, data) + Set(corsoVersion, b.version) + + if len(data) > 0 { + props.Set(payload, data) + } err := b.client.Enqueue(analytics.Track{ Event: key, + UserId: b.repoID, Timestamp: time.Now().UTC(), Properties: props, }) @@ -101,9 +111,41 @@ func (b Bus) Event(ctx context.Context, key string, data map[string]any) { } } -func repoHash(repoProvider, bucket, prefix, tenantID string) string { +func storageID(s storage.Storage) string { + id := s.Provider.String() + + switch s.Provider { + case storage.ProviderS3: + s3, err := s.S3Config() + if err != nil { + return id + } + + id += s3.Bucket + s3.Prefix + } + + return id +} + +func accountID(a account.Account) string { + var id string + + switch a.Provider { + case account.ProviderM365: + m, err := a.M365Config() + if err != nil { + return id + } + + id += m.TenantID + } + + return id +} + +func repoHash(s storage.Storage, a account.Account) string { sum := md5.Sum( - []byte(repoProvider + bucket + prefix + tenantID), + []byte(storageID(s) + accountID(a)), ) return fmt.Sprintf("%x", sum) diff --git a/src/internal/events/events_test.go b/src/internal/events/events_test.go index 8892283d6..a906d07d6 100644 --- a/src/internal/events/events_test.go +++ b/src/internal/events/events_test.go @@ -8,6 +8,9 @@ import ( "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/credentials" + "github.com/alcionai/corso/src/pkg/storage" ) type EventsIntegrationSuite struct { @@ -25,7 +28,28 @@ func TestMetricsIntegrationSuite(t *testing.T) { func (suite *EventsIntegrationSuite) TestNewBus() { t := suite.T() - b := events.NewBus("s3", "bckt", "prfx", "tenid") + s, err := storage.NewStorage( + storage.ProviderS3, + storage.S3Config{ + Bucket: "bckt", + Prefix: "prfx", + }, + ) + require.NoError(t, err) + + a, err := account.NewAccount( + account.ProviderM365, + account.M365Config{ + M365: credentials.M365{ + ClientID: "id", + ClientSecret: "secret", + }, + TenantID: "tid", + }, + ) + require.NoError(t, err) + + b := events.NewBus(s, a) require.NotEmpty(t, b) require.NoError(t, b.Close()) diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 96fbdcc44..01ec55b7e 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -9,6 +9,7 @@ import ( "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/stats" @@ -46,9 +47,10 @@ func NewBackupOperation( sw *store.Wrapper, acct account.Account, selector selectors.Selector, + bus events.Bus, ) (BackupOperation, error) { op := BackupOperation{ - operation: newOperation(opts, kw, sw), + operation: newOperation(opts, bus, kw, sw), Selectors: selector, Version: "v0", account: acct, @@ -80,12 +82,23 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { var ( opStats backupStats backupDetails *details.Details + startTime = time.Now() ) // TODO: persist initial state of backupOperation in modelstore + op.bus.Event( + ctx, + events.BackupStart, + map[string]any{ + events.StartTime: startTime, + // TODO: initial backup ID, + // TODO: events.ExchangeResources: , + }, + ) + // persist operation results to the model store on exit defer func() { - err = op.persistResults(time.Now(), &opStats) + err = op.persistResults(startTime, &opStats) if err != nil { return } @@ -185,5 +198,20 @@ func (op *BackupOperation) createBackupModels( op.Results.BackupID = b.ID + op.bus.Event( + ctx, + events.BackupEnd, + map[string]any{ + events.BackupID: b.ID, + events.Status: op.Status, + events.StartTime: op.Results.StartedAt, + events.EndTime: op.Results.CompletedAt, + events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt), + // TODO: events.ExchangeResources: , + // TODO: events.ExchangeDataObserved: , + // TODO: events.ExchangeDataStored: , + }, + ) + return nil } diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index fe7cd3354..833aa48b0 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/account" @@ -53,7 +54,14 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() { } ) - op, err := NewBackupOperation(ctx, control.Options{}, kw, sw, acct, selectors.Selector{}) + op, err := NewBackupOperation( + ctx, + control.Options{}, + kw, + sw, + acct, + selectors.Selector{}, + events.Bus{}) require.NoError(t, err) require.NoError(t, op.persistResults(now, &stats)) @@ -119,7 +127,8 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { test.kw, test.sw, test.acct, - selectors.Selector{}) + selectors.Selector{}, + events.Bus{}) test.errCheck(t, err) }) } @@ -193,7 +202,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() { sw, acct, *selected, - ) + events.Bus{}) require.NoError(t, err) require.NoError(t, bo.Run(ctx)) @@ -246,7 +255,8 @@ func (suite *BackupOpIntegrationSuite) TestBackupOneDrive_Run() { kw, sw, acct, - sel.Selector) + sel.Selector, + events.Bus{}) require.NoError(t, err) require.NoError(t, bo.Run(ctx)) diff --git a/src/internal/operations/operation.go b/src/internal/operations/operation.go index 62e9eec37..81ef08841 100644 --- a/src/internal/operations/operation.go +++ b/src/internal/operations/operation.go @@ -5,6 +5,7 @@ import ( "github.com/pkg/errors" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/store" @@ -41,18 +42,21 @@ type operation struct { Options control.Options `json:"options"` Status opStatus `json:"status"` + bus events.Bus kopia *kopia.Wrapper store *store.Wrapper } func newOperation( opts control.Options, + bus events.Bus, kw *kopia.Wrapper, sw *store.Wrapper, ) operation { return operation{ CreatedAt: time.Now(), Options: opts, + bus: bus, kopia: kw, store: sw, Status: InProgress, diff --git a/src/internal/operations/operation_test.go b/src/internal/operations/operation_test.go index d98c4f6b2..5c1922646 100644 --- a/src/internal/operations/operation_test.go +++ b/src/internal/operations/operation_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/store" @@ -22,7 +23,7 @@ func TestOperationSuite(t *testing.T) { func (suite *OperationSuite) TestNewOperation() { t := suite.T() - op := newOperation(control.Options{}, nil, nil) + op := newOperation(control.Options{}, events.Bus{}, nil, nil) assert.Greater(t, op.CreatedAt, time.Time{}) } @@ -42,7 +43,7 @@ func (suite *OperationSuite) TestOperation_Validate() { } for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { - op := newOperation(control.Options{}, test.kw, test.sw) + op := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw) test.errCheck(t, op.validate()) }) } diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 3d660a323..61d2058e8 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -10,6 +10,7 @@ import ( "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/path" @@ -48,9 +49,10 @@ func NewRestoreOperation( acct account.Account, backupID model.StableID, sel selectors.Selector, + bus events.Bus, ) (RestoreOperation, error) { op := RestoreOperation{ - operation: newOperation(opts, kw, sw), + operation: newOperation(opts, bus, kw, sw), BackupID: backupID, Selectors: sel, Version: "v0", @@ -80,12 +82,28 @@ type restoreStats struct { // Run begins a synchronous restore operation. func (op *RestoreOperation) Run(ctx context.Context) (err error) { + startTime := time.Now() + // TODO: persist initial state of restoreOperation in modelstore + op.bus.Event( + ctx, + events.RestoreStart, + map[string]any{ + events.StartTime: startTime, + events.BackupID: op.BackupID, + // TODO: initial backup ID, + // TODO: events.ExchangeResources: , + // TODO: source backup time, + // TODO: restore options, + }, + ) + // persist operation results to the model store on exit opStats := restoreStats{} // TODO: persist results? + defer func() { - err = op.persistResults(time.Now(), &opStats) + err = op.persistResults(ctx, startTime, &opStats) if err != nil { return } @@ -170,6 +188,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (err error) { // writes the restoreOperation outcome to the modelStore. func (op *RestoreOperation) persistResults( + ctx context.Context, started time.Time, opStats *restoreStats, ) error { @@ -192,5 +211,22 @@ func (op *RestoreOperation) persistResults( op.Results.ItemsRead = len(opStats.cs) // TODO: file count, not collection count op.Results.ItemsWritten = opStats.gc.Successful + op.bus.Event( + ctx, + events.RestoreEnd, + map[string]any{ + // TODO: RestoreID + events.BackupID: op.BackupID, + events.Status: op.Status, + events.StartTime: op.Results.StartedAt, + events.EndTime: op.Results.CompletedAt, + events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt), + events.ItemsRead: op.Results.ItemsRead, + events.ItemsWritten: op.Results.ItemsWritten, + // TODO: events.ExchangeResources: , + // TODO: events.ExchangeDataObserved: , + }, + ) + return nil } diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index ea600f9d9..66cd04b7f 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -13,6 +13,7 @@ import ( "github.com/alcionai/corso/src/internal/connector/exchange" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/tester" @@ -56,10 +57,18 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { } ) - op, err := NewRestoreOperation(ctx, control.Options{}, kw, sw, acct, "foo", selectors.Selector{}) + op, err := NewRestoreOperation( + ctx, + control.Options{}, + kw, + sw, + acct, + "foo", + selectors.Selector{}, + events.Bus{}) require.NoError(t, err) - require.NoError(t, op.persistResults(now, &stats)) + require.NoError(t, op.persistResults(ctx, now, &stats)) assert.Equal(t, op.Status.String(), Completed.String(), "status") assert.Equal(t, op.Results.ItemsRead, len(stats.cs), "items read") @@ -138,7 +147,8 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() { kw, sw, acct, - bsel.Selector) + bsel.Selector, + events.Bus{}) require.NoError(t, err) require.NoError(t, bo.Run(ctx)) require.NotEmpty(t, bo.Results.BackupID) @@ -189,7 +199,8 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { test.sw, test.acct, "backup-id", - selectors.Selector{}) + selectors.Selector{}, + events.Bus{}) test.errCheck(t, err) }) } @@ -209,7 +220,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() { suite.sw, tester.NewM365Account(t), suite.backupID, - rsel.Selector) + rsel.Selector, + events.Bus{}) require.NoError(t, err) require.NoError(t, ro.Run(ctx), "restoreOp.Run()") @@ -236,7 +248,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() { suite.sw, tester.NewM365Account(t), suite.backupID, - rsel.Selector) + rsel.Selector, + events.Bus{}) require.NoError(t, err) require.Error(t, ro.Run(ctx), "restoreOp.Run() should have 0 results") } diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index b5405bf28..c4817d7b5 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -5,8 +5,8 @@ import ( "time" "github.com/google/uuid" - "github.com/pkg/errors" + "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/operations" @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/storage" "github.com/alcionai/corso/src/pkg/store" @@ -28,7 +29,7 @@ type Repository struct { Account account.Account // the user's m365 account connection details Storage storage.Storage // the storage provider details and configuration - // Bus events.Bus + Bus events.Bus dataLayer *kopia.Wrapper modelStore *kopia.ModelStore } @@ -69,10 +70,13 @@ func Initialize( Version: "v1", Account: acct, Storage: s, + Bus: events.NewBus(s, acct), dataLayer: w, modelStore: ms, } + r.Bus.Event(ctx, events.RepoInit, nil) + return &r, nil } @@ -109,6 +113,7 @@ func Connect( Version: "v1", Account: acct, Storage: s, + Bus: events.NewBus(s, acct), dataLayer: w, modelStore: ms, } @@ -117,23 +122,27 @@ func Connect( } func (r *Repository) Close(ctx context.Context) error { + if err := r.Bus.Close(); err != nil { + logger.Ctx(ctx).Debugw("closing the event bus", "err", err) + } + if r.dataLayer != nil { - err := r.dataLayer.Close(ctx) - r.dataLayer = nil - - if err != nil { - return errors.Wrap(err, "closing corso DataLayer") + if err := r.dataLayer.Close(ctx); err != nil { + logger.Ctx(ctx).Debugw("closing Datalayer", "err", err) } + + r.dataLayer = nil } - if r.modelStore == nil { - return nil + if r.modelStore != nil { + if err := r.modelStore.Close(ctx); err != nil { + logger.Ctx(ctx).Debugw("closing modelStore", "err", err) + } + + r.modelStore = nil } - err := r.modelStore.Close(ctx) - r.modelStore = nil - - return errors.Wrap(err, "closing corso ModelStore") + return nil } // NewBackup generates a BackupOperation runner. @@ -148,7 +157,8 @@ func (r Repository) NewBackup( r.dataLayer, store.NewKopiaStore(r.modelStore), r.Account, - selector) + selector, + r.Bus) } // NewRestore generates a restoreOperation runner. @@ -165,7 +175,8 @@ func (r Repository) NewRestore( store.NewKopiaStore(r.modelStore), r.Account, model.StableID(backupID), - sel) + sel, + r.Bus) } // backups lists a backup by id