use account id, mock event bus (#870)
## Description 1/ introduce an ID() method int the account package which provides the tenantID of the configured account provider. 2/ introduce a event bus mock for testing. ## Type of change - [x] 🌻 Feature ## Issue(s) * #741 ## Test Plan - [x] ⚡ Unit test
This commit is contained in:
parent
d983a77a57
commit
a0508cc442
@ -9,7 +9,6 @@ import (
|
||||
|
||||
analytics "github.com/rudderlabs/analytics-go"
|
||||
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/storage"
|
||||
@ -41,6 +40,11 @@ const (
|
||||
ItemsWritten = "items-written"
|
||||
)
|
||||
|
||||
type Eventer interface {
|
||||
Event(context.Context, string, map[string]any)
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Bus handles all event communication into the events package.
|
||||
type Bus struct {
|
||||
client analytics.Client
|
||||
@ -54,12 +58,12 @@ var (
|
||||
DataPlaneURL string
|
||||
)
|
||||
|
||||
func NewBus(s storage.Storage, a account.Account, opts control.Options) Bus {
|
||||
func NewBus(s storage.Storage, tenID string, opts control.Options) Bus {
|
||||
if opts.DisableMetrics {
|
||||
return Bus{}
|
||||
}
|
||||
|
||||
hash := repoHash(s, a)
|
||||
hash := repoHash(s, tenID)
|
||||
|
||||
envWK := os.Getenv("RUDDERSTACK_CORSO_WRITE_KEY")
|
||||
if len(envWK) > 0 {
|
||||
@ -132,25 +136,9 @@ func storageID(s storage.Storage) string {
|
||||
return id
|
||||
}
|
||||
|
||||
func accountID(a account.Account) string {
|
||||
var id string
|
||||
|
||||
switch a.Provider {
|
||||
case account.ProviderM365:
|
||||
m, err := a.M365Config()
|
||||
if err != nil {
|
||||
return id
|
||||
}
|
||||
|
||||
id += m.TenantID
|
||||
}
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
func repoHash(s storage.Storage, a account.Account) string {
|
||||
func repoHash(s storage.Storage, tenID string) string {
|
||||
sum := md5.Sum(
|
||||
[]byte(storageID(s) + accountID(a)),
|
||||
[]byte(storageID(s) + tenID),
|
||||
)
|
||||
|
||||
return fmt.Sprintf("%x", sum)
|
||||
|
||||
@ -50,11 +50,11 @@ func (suite *EventsIntegrationSuite) TestNewBus() {
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
b := events.NewBus(s, a, control.Options{})
|
||||
b := events.NewBus(s, a.ID(), control.Options{})
|
||||
require.NotEmpty(t, b)
|
||||
require.NoError(t, b.Close())
|
||||
|
||||
b2 := events.NewBus(s, a, control.Options{DisableMetrics: true})
|
||||
b2 := events.NewBus(s, a.ID(), control.Options{DisableMetrics: true})
|
||||
require.Empty(t, b2)
|
||||
require.NoError(t, b2.Close())
|
||||
}
|
||||
|
||||
42
src/internal/events/mock/mock.go
Normal file
42
src/internal/events/mock/mock.go
Normal file
@ -0,0 +1,42 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type mockBus struct {
|
||||
TimesCalled map[string]int
|
||||
CalledWith map[string][]map[string]any
|
||||
TimesClosed int
|
||||
}
|
||||
|
||||
func NewBus() *mockBus {
|
||||
return &mockBus{
|
||||
TimesCalled: map[string]int{},
|
||||
CalledWith: map[string][]map[string]any{},
|
||||
}
|
||||
}
|
||||
|
||||
func (b *mockBus) Event(ctx context.Context, key string, data map[string]any) {
|
||||
b.TimesCalled[key] = b.TimesCalled[key] + 1
|
||||
|
||||
cw := b.CalledWith[key]
|
||||
if len(cw) == 0 {
|
||||
cw = []map[string]any{}
|
||||
}
|
||||
|
||||
cw = append(cw, data)
|
||||
b.CalledWith[key] = cw
|
||||
}
|
||||
|
||||
func (b *mockBus) Close() error {
|
||||
b.TimesClosed++
|
||||
|
||||
if b.TimesClosed > 1 {
|
||||
return errors.New("multiple closes on mockBus")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -47,7 +47,7 @@ func NewBackupOperation(
|
||||
sw *store.Wrapper,
|
||||
acct account.Account,
|
||||
selector selectors.Selector,
|
||||
bus events.Bus,
|
||||
bus events.Eventer,
|
||||
) (BackupOperation, error) {
|
||||
op := BackupOperation{
|
||||
operation: newOperation(opts, bus, kw, sw),
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/events"
|
||||
evmock "github.com/alcionai/corso/src/internal/events/mock"
|
||||
"github.com/alcionai/corso/src/internal/kopia"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
@ -61,7 +62,7 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
|
||||
sw,
|
||||
acct,
|
||||
selectors.Selector{},
|
||||
events.Bus{})
|
||||
evmock.NewBus())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, op.persistResults(now, &stats))
|
||||
@ -128,7 +129,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
||||
test.sw,
|
||||
test.acct,
|
||||
selectors.Selector{},
|
||||
events.Bus{})
|
||||
evmock.NewBus())
|
||||
test.errCheck(t, err)
|
||||
})
|
||||
}
|
||||
@ -193,6 +194,8 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
|
||||
require.NoError(t, err)
|
||||
defer ms.Close(ctx)
|
||||
|
||||
mb := evmock.NewBus()
|
||||
|
||||
sw := store.NewKopiaStore(ms)
|
||||
selected := test.selectFunc()
|
||||
bo, err := NewBackupOperation(
|
||||
@ -202,7 +205,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
|
||||
sw,
|
||||
acct,
|
||||
*selected,
|
||||
events.Bus{})
|
||||
mb)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, bo.Run(ctx))
|
||||
@ -213,6 +216,8 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
|
||||
assert.Greater(t, bo.Results.ItemsWritten, 0)
|
||||
assert.Zero(t, bo.Results.ReadErrors)
|
||||
assert.Zero(t, bo.Results.WriteErrors)
|
||||
assert.Equal(t, 1, mb.TimesCalled[events.BackupStart], "backup-start events")
|
||||
assert.Equal(t, 1, mb.TimesCalled[events.BackupEnd], "backup-end events")
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -246,6 +251,8 @@ func (suite *BackupOpIntegrationSuite) TestBackupOneDrive_Run() {
|
||||
|
||||
sw := store.NewKopiaStore(ms)
|
||||
|
||||
mb := evmock.NewBus()
|
||||
|
||||
sel := selectors.NewOneDriveBackup()
|
||||
sel.Include(sel.Users([]string{m365UserID}))
|
||||
|
||||
@ -256,7 +263,7 @@ func (suite *BackupOpIntegrationSuite) TestBackupOneDrive_Run() {
|
||||
sw,
|
||||
acct,
|
||||
sel.Selector,
|
||||
events.Bus{})
|
||||
mb)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, bo.Run(ctx))
|
||||
@ -266,4 +273,6 @@ func (suite *BackupOpIntegrationSuite) TestBackupOneDrive_Run() {
|
||||
assert.Equal(t, bo.Results.ItemsRead, bo.Results.ItemsWritten)
|
||||
assert.NoError(t, bo.Results.ReadErrors)
|
||||
assert.NoError(t, bo.Results.WriteErrors)
|
||||
assert.Equal(t, 1, mb.TimesCalled[events.BackupStart], "backup-start events")
|
||||
assert.Equal(t, 1, mb.TimesCalled[events.BackupEnd], "backup-end events")
|
||||
}
|
||||
|
||||
@ -42,14 +42,14 @@ type operation struct {
|
||||
Options control.Options `json:"options"`
|
||||
Status opStatus `json:"status"`
|
||||
|
||||
bus events.Bus
|
||||
bus events.Eventer
|
||||
kopia *kopia.Wrapper
|
||||
store *store.Wrapper
|
||||
}
|
||||
|
||||
func newOperation(
|
||||
opts control.Options,
|
||||
bus events.Bus,
|
||||
bus events.Eventer,
|
||||
kw *kopia.Wrapper,
|
||||
sw *store.Wrapper,
|
||||
) operation {
|
||||
|
||||
@ -50,7 +50,7 @@ func NewRestoreOperation(
|
||||
acct account.Account,
|
||||
backupID model.StableID,
|
||||
sel selectors.Selector,
|
||||
bus events.Bus,
|
||||
bus events.Eventer,
|
||||
) (RestoreOperation, error) {
|
||||
op := RestoreOperation{
|
||||
operation: newOperation(opts, bus, kw, sw),
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/events"
|
||||
evmock "github.com/alcionai/corso/src/internal/events/mock"
|
||||
"github.com/alcionai/corso/src/internal/kopia"
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
@ -65,7 +66,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
|
||||
acct,
|
||||
"foo",
|
||||
selectors.Selector{},
|
||||
events.Bus{})
|
||||
evmock.NewBus())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, op.persistResults(ctx, now, &stats))
|
||||
@ -148,7 +149,7 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() {
|
||||
sw,
|
||||
acct,
|
||||
bsel.Selector,
|
||||
events.Bus{})
|
||||
evmock.NewBus())
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, bo.Run(ctx))
|
||||
require.NotEmpty(t, bo.Results.BackupID)
|
||||
@ -200,7 +201,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
|
||||
test.acct,
|
||||
"backup-id",
|
||||
selectors.Selector{},
|
||||
events.Bus{})
|
||||
evmock.NewBus())
|
||||
test.errCheck(t, err)
|
||||
})
|
||||
}
|
||||
@ -213,6 +214,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() {
|
||||
rsel := selectors.NewExchangeRestore()
|
||||
rsel.Include(rsel.Users([]string{tester.M365UserID(t)}))
|
||||
|
||||
mb := evmock.NewBus()
|
||||
|
||||
ro, err := NewRestoreOperation(
|
||||
ctx,
|
||||
control.Options{},
|
||||
@ -221,7 +224,7 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() {
|
||||
tester.NewM365Account(t),
|
||||
suite.backupID,
|
||||
rsel.Selector,
|
||||
events.Bus{})
|
||||
mb)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, ro.Run(ctx), "restoreOp.Run()")
|
||||
@ -232,6 +235,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() {
|
||||
assert.Zero(t, ro.Results.ReadErrors, "errors while reading restore data")
|
||||
assert.Zero(t, ro.Results.WriteErrors, "errors while writing restore data")
|
||||
assert.Equal(t, suite.numItems, ro.Results.ItemsWritten, "backup and restore wrote the same num of items")
|
||||
assert.Equal(t, 1, mb.TimesCalled[events.RestoreStart], "restore-start events")
|
||||
assert.Equal(t, 1, mb.TimesCalled[events.RestoreEnd], "restore-end events")
|
||||
}
|
||||
|
||||
func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() {
|
||||
@ -241,6 +246,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() {
|
||||
rsel := selectors.NewExchangeRestore()
|
||||
rsel.Include(rsel.Users(selectors.None()))
|
||||
|
||||
mb := evmock.NewBus()
|
||||
|
||||
ro, err := NewRestoreOperation(
|
||||
ctx,
|
||||
control.Options{},
|
||||
@ -249,7 +256,9 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() {
|
||||
tester.NewM365Account(t),
|
||||
suite.backupID,
|
||||
rsel.Selector,
|
||||
events.Bus{})
|
||||
mb)
|
||||
require.NoError(t, err)
|
||||
require.Error(t, ro.Run(ctx), "restoreOp.Run() should have 0 results")
|
||||
assert.Equal(t, 1, mb.TimesCalled[events.RestoreStart], "restore-start events")
|
||||
assert.Equal(t, 0, mb.TimesCalled[events.RestoreEnd], "restore-end events")
|
||||
}
|
||||
|
||||
@ -26,12 +26,57 @@ type Account struct {
|
||||
Config map[string]string
|
||||
}
|
||||
|
||||
// NewAccount aggregates all the supplied configurations into a single configuration
|
||||
func NewAccount(p accountProvider, cfgs ...common.StringConfigurer) (Account, error) {
|
||||
cs, err := common.UnionStringConfigs(cfgs...)
|
||||
type providerIDer interface {
|
||||
common.StringConfigurer
|
||||
|
||||
return Account{
|
||||
providerID(accountProvider) string
|
||||
}
|
||||
|
||||
// NewAccount aggregates all the supplied configurations into a single configuration
|
||||
func NewAccount(p accountProvider, cfgs ...providerIDer) (Account, error) {
|
||||
var (
|
||||
pid string
|
||||
scs = make([]common.StringConfigurer, len(cfgs))
|
||||
)
|
||||
|
||||
for i, c := range cfgs {
|
||||
scs[i] = c.(common.StringConfigurer)
|
||||
|
||||
if len(c.providerID(p)) > 0 {
|
||||
pid = c.providerID(p)
|
||||
}
|
||||
}
|
||||
|
||||
cs, err := common.UnionStringConfigs(scs...)
|
||||
|
||||
a := Account{
|
||||
Provider: p,
|
||||
Config: cs,
|
||||
}, err
|
||||
}
|
||||
|
||||
a = setProviderID(a, p, pid)
|
||||
|
||||
return a, err
|
||||
}
|
||||
|
||||
func setProviderID(a Account, p accountProvider, id string) Account {
|
||||
if len(a.Config) == 0 {
|
||||
a.Config = map[string]string{}
|
||||
}
|
||||
|
||||
a.Config[p.String()+"-tenant-id"] = id
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// ID returns the primary tenant ID held by its configuration.
|
||||
// Ex: if the account uses an M365 provider, the M365 tenant ID
|
||||
// is returned. If the account contains no ID info, returns an
|
||||
// empty string.
|
||||
func (a Account) ID() string {
|
||||
if len(a.Config) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
return a.Config[a.Provider.String()+"-tenant-id"]
|
||||
}
|
||||
|
||||
@ -9,9 +9,14 @@ import (
|
||||
|
||||
type testConfig struct {
|
||||
expect string
|
||||
id string
|
||||
err error
|
||||
}
|
||||
|
||||
func (c testConfig) providerID(ap accountProvider) string {
|
||||
return c.id
|
||||
}
|
||||
|
||||
func (c testConfig) StringConfig() (map[string]string, error) {
|
||||
return map[string]string{"expect": c.expect}, c.err
|
||||
}
|
||||
@ -31,10 +36,10 @@ func (suite *AccountSuite) TestNewAccount() {
|
||||
c testConfig
|
||||
errCheck assert.ErrorAssertionFunc
|
||||
}{
|
||||
{"unknown no error", ProviderUnknown, testConfig{"configVal", nil}, assert.NoError},
|
||||
{"m365 no error", ProviderM365, testConfig{"configVal", nil}, assert.NoError},
|
||||
{"unknown w/ error", ProviderUnknown, testConfig{"configVal", assert.AnError}, assert.Error},
|
||||
{"m365 w/ error", ProviderM365, testConfig{"configVal", assert.AnError}, assert.Error},
|
||||
{"unknown no error", ProviderUnknown, testConfig{"configVal", "", nil}, assert.NoError},
|
||||
{"m365 no error", ProviderM365, testConfig{"configVal", "", nil}, assert.NoError},
|
||||
{"unknown w/ error", ProviderUnknown, testConfig{"configVal", "", assert.AnError}, assert.Error},
|
||||
{"m365 w/ error", ProviderM365, testConfig{"configVal", "", assert.AnError}, assert.Error},
|
||||
}
|
||||
for _, test := range table {
|
||||
suite.T().Run(test.name, func(t *testing.T) {
|
||||
|
||||
@ -36,6 +36,15 @@ func (c M365Config) StringConfig() (map[string]string, error) {
|
||||
return cfg, c.validate()
|
||||
}
|
||||
|
||||
// providerID returns the c.TenantID if ap is a ProviderM365.
|
||||
func (c M365Config) providerID(ap accountProvider) string {
|
||||
if ap == ProviderM365 {
|
||||
return c.TenantID
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
// M365Config retrieves the M365Config details from the Account config.
|
||||
func (a Account) M365Config() (M365Config, error) {
|
||||
c := M365Config{}
|
||||
|
||||
@ -30,7 +30,7 @@ type Repository struct {
|
||||
Storage storage.Storage // the storage provider details and configuration
|
||||
Opts control.Options
|
||||
|
||||
Bus events.Bus
|
||||
Bus events.Eventer
|
||||
dataLayer *kopia.Wrapper
|
||||
modelStore *kopia.ModelStore
|
||||
}
|
||||
@ -72,7 +72,7 @@ func Initialize(
|
||||
Version: "v1",
|
||||
Account: acct,
|
||||
Storage: s,
|
||||
Bus: events.NewBus(s, acct, opts),
|
||||
Bus: events.NewBus(s, acct.ID(), opts),
|
||||
dataLayer: w,
|
||||
modelStore: ms,
|
||||
}
|
||||
@ -116,7 +116,7 @@ func Connect(
|
||||
Version: "v1",
|
||||
Account: acct,
|
||||
Storage: s,
|
||||
Bus: events.NewBus(s, acct, opts),
|
||||
Bus: events.NewBus(s, acct.ID(), opts),
|
||||
dataLayer: w,
|
||||
modelStore: ms,
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user