From a0508cc44292eb7983748774a3e08657b1759cf8 Mon Sep 17 00:00:00 2001 From: Keepers Date: Mon, 19 Sep 2022 13:40:43 -0600 Subject: [PATCH] use account id, mock event bus (#870) ## Description 1/ introduce an ID() method int the account package which provides the tenantID of the configured account provider. 2/ introduce a event bus mock for testing. ## Type of change - [x] :sunflower: Feature ## Issue(s) * #741 ## Test Plan - [x] :zap: Unit test --- src/internal/events/events.go | 30 ++++---------- src/internal/events/events_test.go | 4 +- src/internal/events/mock/mock.go | 42 +++++++++++++++++++ src/internal/operations/backup.go | 2 +- src/internal/operations/backup_test.go | 17 ++++++-- src/internal/operations/operation.go | 4 +- src/internal/operations/restore.go | 2 +- src/internal/operations/restore_test.go | 19 ++++++--- src/pkg/account/account.go | 55 ++++++++++++++++++++++--- src/pkg/account/account_test.go | 13 ++++-- src/pkg/account/m365.go | 9 ++++ src/pkg/repository/repository.go | 6 +-- 12 files changed, 155 insertions(+), 48 deletions(-) create mode 100644 src/internal/events/mock/mock.go diff --git a/src/internal/events/events.go b/src/internal/events/events.go index 73253dadb..0f37e1130 100644 --- a/src/internal/events/events.go +++ b/src/internal/events/events.go @@ -9,7 +9,6 @@ import ( analytics "github.com/rudderlabs/analytics-go" - "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/storage" @@ -41,6 +40,11 @@ const ( ItemsWritten = "items-written" ) +type Eventer interface { + Event(context.Context, string, map[string]any) + Close() error +} + // Bus handles all event communication into the events package. type Bus struct { client analytics.Client @@ -54,12 +58,12 @@ var ( DataPlaneURL string ) -func NewBus(s storage.Storage, a account.Account, opts control.Options) Bus { +func NewBus(s storage.Storage, tenID string, opts control.Options) Bus { if opts.DisableMetrics { return Bus{} } - hash := repoHash(s, a) + hash := repoHash(s, tenID) envWK := os.Getenv("RUDDERSTACK_CORSO_WRITE_KEY") if len(envWK) > 0 { @@ -132,25 +136,9 @@ func storageID(s storage.Storage) string { return id } -func accountID(a account.Account) string { - var id string - - switch a.Provider { - case account.ProviderM365: - m, err := a.M365Config() - if err != nil { - return id - } - - id += m.TenantID - } - - return id -} - -func repoHash(s storage.Storage, a account.Account) string { +func repoHash(s storage.Storage, tenID string) string { sum := md5.Sum( - []byte(storageID(s) + accountID(a)), + []byte(storageID(s) + tenID), ) return fmt.Sprintf("%x", sum) diff --git a/src/internal/events/events_test.go b/src/internal/events/events_test.go index 1f47779d2..b55dd97e7 100644 --- a/src/internal/events/events_test.go +++ b/src/internal/events/events_test.go @@ -50,11 +50,11 @@ func (suite *EventsIntegrationSuite) TestNewBus() { ) require.NoError(t, err) - b := events.NewBus(s, a, control.Options{}) + b := events.NewBus(s, a.ID(), control.Options{}) require.NotEmpty(t, b) require.NoError(t, b.Close()) - b2 := events.NewBus(s, a, control.Options{DisableMetrics: true}) + b2 := events.NewBus(s, a.ID(), control.Options{DisableMetrics: true}) require.Empty(t, b2) require.NoError(t, b2.Close()) } diff --git a/src/internal/events/mock/mock.go b/src/internal/events/mock/mock.go new file mode 100644 index 000000000..076b5ac57 --- /dev/null +++ b/src/internal/events/mock/mock.go @@ -0,0 +1,42 @@ +package mock + +import ( + "context" + + "github.com/pkg/errors" +) + +type mockBus struct { + TimesCalled map[string]int + CalledWith map[string][]map[string]any + TimesClosed int +} + +func NewBus() *mockBus { + return &mockBus{ + TimesCalled: map[string]int{}, + CalledWith: map[string][]map[string]any{}, + } +} + +func (b *mockBus) Event(ctx context.Context, key string, data map[string]any) { + b.TimesCalled[key] = b.TimesCalled[key] + 1 + + cw := b.CalledWith[key] + if len(cw) == 0 { + cw = []map[string]any{} + } + + cw = append(cw, data) + b.CalledWith[key] = cw +} + +func (b *mockBus) Close() error { + b.TimesClosed++ + + if b.TimesClosed > 1 { + return errors.New("multiple closes on mockBus") + } + + return nil +} diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 01ec55b7e..b14747383 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -47,7 +47,7 @@ func NewBackupOperation( sw *store.Wrapper, acct account.Account, selector selectors.Selector, - bus events.Bus, + bus events.Eventer, ) (BackupOperation, error) { op := BackupOperation{ operation: newOperation(opts, bus, kw, sw), diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index 833aa48b0..8b560fa04 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -12,6 +12,7 @@ import ( "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/events" + evmock "github.com/alcionai/corso/src/internal/events/mock" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/account" @@ -61,7 +62,7 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() { sw, acct, selectors.Selector{}, - events.Bus{}) + evmock.NewBus()) require.NoError(t, err) require.NoError(t, op.persistResults(now, &stats)) @@ -128,7 +129,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { test.sw, test.acct, selectors.Selector{}, - events.Bus{}) + evmock.NewBus()) test.errCheck(t, err) }) } @@ -193,6 +194,8 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() { require.NoError(t, err) defer ms.Close(ctx) + mb := evmock.NewBus() + sw := store.NewKopiaStore(ms) selected := test.selectFunc() bo, err := NewBackupOperation( @@ -202,7 +205,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() { sw, acct, *selected, - events.Bus{}) + mb) require.NoError(t, err) require.NoError(t, bo.Run(ctx)) @@ -213,6 +216,8 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() { assert.Greater(t, bo.Results.ItemsWritten, 0) assert.Zero(t, bo.Results.ReadErrors) assert.Zero(t, bo.Results.WriteErrors) + assert.Equal(t, 1, mb.TimesCalled[events.BackupStart], "backup-start events") + assert.Equal(t, 1, mb.TimesCalled[events.BackupEnd], "backup-end events") }) } } @@ -246,6 +251,8 @@ func (suite *BackupOpIntegrationSuite) TestBackupOneDrive_Run() { sw := store.NewKopiaStore(ms) + mb := evmock.NewBus() + sel := selectors.NewOneDriveBackup() sel.Include(sel.Users([]string{m365UserID})) @@ -256,7 +263,7 @@ func (suite *BackupOpIntegrationSuite) TestBackupOneDrive_Run() { sw, acct, sel.Selector, - events.Bus{}) + mb) require.NoError(t, err) require.NoError(t, bo.Run(ctx)) @@ -266,4 +273,6 @@ func (suite *BackupOpIntegrationSuite) TestBackupOneDrive_Run() { assert.Equal(t, bo.Results.ItemsRead, bo.Results.ItemsWritten) assert.NoError(t, bo.Results.ReadErrors) assert.NoError(t, bo.Results.WriteErrors) + assert.Equal(t, 1, mb.TimesCalled[events.BackupStart], "backup-start events") + assert.Equal(t, 1, mb.TimesCalled[events.BackupEnd], "backup-end events") } diff --git a/src/internal/operations/operation.go b/src/internal/operations/operation.go index 81ef08841..1ced4395a 100644 --- a/src/internal/operations/operation.go +++ b/src/internal/operations/operation.go @@ -42,14 +42,14 @@ type operation struct { Options control.Options `json:"options"` Status opStatus `json:"status"` - bus events.Bus + bus events.Eventer kopia *kopia.Wrapper store *store.Wrapper } func newOperation( opts control.Options, - bus events.Bus, + bus events.Eventer, kw *kopia.Wrapper, sw *store.Wrapper, ) operation { diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index a5320f21d..4f88cb908 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -50,7 +50,7 @@ func NewRestoreOperation( acct account.Account, backupID model.StableID, sel selectors.Selector, - bus events.Bus, + bus events.Eventer, ) (RestoreOperation, error) { op := RestoreOperation{ operation: newOperation(opts, bus, kw, sw), diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index 66cd04b7f..9a8bd056e 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/events" + evmock "github.com/alcionai/corso/src/internal/events/mock" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/tester" @@ -65,7 +66,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { acct, "foo", selectors.Selector{}, - events.Bus{}) + evmock.NewBus()) require.NoError(t, err) require.NoError(t, op.persistResults(ctx, now, &stats)) @@ -148,7 +149,7 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() { sw, acct, bsel.Selector, - events.Bus{}) + evmock.NewBus()) require.NoError(t, err) require.NoError(t, bo.Run(ctx)) require.NotEmpty(t, bo.Results.BackupID) @@ -200,7 +201,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { test.acct, "backup-id", selectors.Selector{}, - events.Bus{}) + evmock.NewBus()) test.errCheck(t, err) }) } @@ -213,6 +214,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() { rsel := selectors.NewExchangeRestore() rsel.Include(rsel.Users([]string{tester.M365UserID(t)})) + mb := evmock.NewBus() + ro, err := NewRestoreOperation( ctx, control.Options{}, @@ -221,7 +224,7 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() { tester.NewM365Account(t), suite.backupID, rsel.Selector, - events.Bus{}) + mb) require.NoError(t, err) require.NoError(t, ro.Run(ctx), "restoreOp.Run()") @@ -232,6 +235,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() { assert.Zero(t, ro.Results.ReadErrors, "errors while reading restore data") assert.Zero(t, ro.Results.WriteErrors, "errors while writing restore data") assert.Equal(t, suite.numItems, ro.Results.ItemsWritten, "backup and restore wrote the same num of items") + assert.Equal(t, 1, mb.TimesCalled[events.RestoreStart], "restore-start events") + assert.Equal(t, 1, mb.TimesCalled[events.RestoreEnd], "restore-end events") } func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() { @@ -241,6 +246,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() { rsel := selectors.NewExchangeRestore() rsel.Include(rsel.Users(selectors.None())) + mb := evmock.NewBus() + ro, err := NewRestoreOperation( ctx, control.Options{}, @@ -249,7 +256,9 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() { tester.NewM365Account(t), suite.backupID, rsel.Selector, - events.Bus{}) + mb) require.NoError(t, err) require.Error(t, ro.Run(ctx), "restoreOp.Run() should have 0 results") + assert.Equal(t, 1, mb.TimesCalled[events.RestoreStart], "restore-start events") + assert.Equal(t, 0, mb.TimesCalled[events.RestoreEnd], "restore-end events") } diff --git a/src/pkg/account/account.go b/src/pkg/account/account.go index f844eda01..f501f428f 100644 --- a/src/pkg/account/account.go +++ b/src/pkg/account/account.go @@ -26,12 +26,57 @@ type Account struct { Config map[string]string } -// NewAccount aggregates all the supplied configurations into a single configuration -func NewAccount(p accountProvider, cfgs ...common.StringConfigurer) (Account, error) { - cs, err := common.UnionStringConfigs(cfgs...) +type providerIDer interface { + common.StringConfigurer - return Account{ + providerID(accountProvider) string +} + +// NewAccount aggregates all the supplied configurations into a single configuration +func NewAccount(p accountProvider, cfgs ...providerIDer) (Account, error) { + var ( + pid string + scs = make([]common.StringConfigurer, len(cfgs)) + ) + + for i, c := range cfgs { + scs[i] = c.(common.StringConfigurer) + + if len(c.providerID(p)) > 0 { + pid = c.providerID(p) + } + } + + cs, err := common.UnionStringConfigs(scs...) + + a := Account{ Provider: p, Config: cs, - }, err + } + + a = setProviderID(a, p, pid) + + return a, err +} + +func setProviderID(a Account, p accountProvider, id string) Account { + if len(a.Config) == 0 { + a.Config = map[string]string{} + } + + a.Config[p.String()+"-tenant-id"] = id + + return a +} + +// ID returns the primary tenant ID held by its configuration. +// Ex: if the account uses an M365 provider, the M365 tenant ID +// is returned. If the account contains no ID info, returns an +// empty string. +func (a Account) ID() string { + if len(a.Config) == 0 { + return "" + } + + return a.Config[a.Provider.String()+"-tenant-id"] } diff --git a/src/pkg/account/account_test.go b/src/pkg/account/account_test.go index a6582fb21..36237114c 100644 --- a/src/pkg/account/account_test.go +++ b/src/pkg/account/account_test.go @@ -9,9 +9,14 @@ import ( type testConfig struct { expect string + id string err error } +func (c testConfig) providerID(ap accountProvider) string { + return c.id +} + func (c testConfig) StringConfig() (map[string]string, error) { return map[string]string{"expect": c.expect}, c.err } @@ -31,10 +36,10 @@ func (suite *AccountSuite) TestNewAccount() { c testConfig errCheck assert.ErrorAssertionFunc }{ - {"unknown no error", ProviderUnknown, testConfig{"configVal", nil}, assert.NoError}, - {"m365 no error", ProviderM365, testConfig{"configVal", nil}, assert.NoError}, - {"unknown w/ error", ProviderUnknown, testConfig{"configVal", assert.AnError}, assert.Error}, - {"m365 w/ error", ProviderM365, testConfig{"configVal", assert.AnError}, assert.Error}, + {"unknown no error", ProviderUnknown, testConfig{"configVal", "", nil}, assert.NoError}, + {"m365 no error", ProviderM365, testConfig{"configVal", "", nil}, assert.NoError}, + {"unknown w/ error", ProviderUnknown, testConfig{"configVal", "", assert.AnError}, assert.Error}, + {"m365 w/ error", ProviderM365, testConfig{"configVal", "", assert.AnError}, assert.Error}, } for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { diff --git a/src/pkg/account/m365.go b/src/pkg/account/m365.go index c175ebb4c..2a3f9d77f 100644 --- a/src/pkg/account/m365.go +++ b/src/pkg/account/m365.go @@ -36,6 +36,15 @@ func (c M365Config) StringConfig() (map[string]string, error) { return cfg, c.validate() } +// providerID returns the c.TenantID if ap is a ProviderM365. +func (c M365Config) providerID(ap accountProvider) string { + if ap == ProviderM365 { + return c.TenantID + } + + return "" +} + // M365Config retrieves the M365Config details from the Account config. func (a Account) M365Config() (M365Config, error) { c := M365Config{} diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 20b68a1da..e072a9cdb 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -30,7 +30,7 @@ type Repository struct { Storage storage.Storage // the storage provider details and configuration Opts control.Options - Bus events.Bus + Bus events.Eventer dataLayer *kopia.Wrapper modelStore *kopia.ModelStore } @@ -72,7 +72,7 @@ func Initialize( Version: "v1", Account: acct, Storage: s, - Bus: events.NewBus(s, acct, opts), + Bus: events.NewBus(s, acct.ID(), opts), dataLayer: w, modelStore: ms, } @@ -116,7 +116,7 @@ func Connect( Version: "v1", Account: acct, Storage: s, - Bus: events.NewBus(s, acct, opts), + Bus: events.NewBus(s, acct.ID(), opts), dataLayer: w, modelStore: ms, }