generate events (#863)

## Description

Adds events for repo init, backup start and end,
and restore start and end.  Not all expected
values are available.  TODOs have been marked
for values that need future implementation.

## Type of change

- [x] 🌻 Feature

## Issue(s)

* #741

## Test Plan

- [x]  Unit test
This commit is contained in:
Keepers 2022-09-15 12:03:20 -06:00 committed by GitHub
parent ed52a07e2f
commit a6ba26cd92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 210 additions and 41 deletions

View File

@ -9,7 +9,9 @@ import (
analytics "github.com/rudderlabs/analytics-go"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/storage"
)
// keys for ease of use
@ -22,6 +24,8 @@ const (
RepoInit = "repo-init"
BackupStart = "backup-start"
BackupEnd = "backup-end"
RestoreStart = "restore-start"
RestoreEnd = "restore-end"
// Event Data Keys
BackupID = "backup-id"
@ -32,6 +36,8 @@ const (
StartTime = "start-time"
Duration = "duration"
Status = "status"
ItemsRead = "items-read"
ItemsWritten = "items-written"
)
// Bus handles all event communication into the events package.
@ -47,8 +53,8 @@ var (
DataPlaneURL string
)
func NewBus(repoProvider, bucket, prefix, tenantID string) Bus {
hash := repoHash(repoProvider, bucket, prefix, tenantID)
func NewBus(s storage.Storage, a account.Account) Bus {
hash := repoHash(s, a)
envWK := os.Getenv("RUDDERSTACK_CORSO_WRITE_KEY")
if len(envWK) > 0 {
@ -88,11 +94,15 @@ func (b Bus) Event(ctx context.Context, key string, data map[string]any) {
props := analytics.
NewProperties().
Set(repoID, b.repoID).
Set(corsoVersion, b.version).
Set(payload, data)
Set(corsoVersion, b.version)
if len(data) > 0 {
props.Set(payload, data)
}
err := b.client.Enqueue(analytics.Track{
Event: key,
UserId: b.repoID,
Timestamp: time.Now().UTC(),
Properties: props,
})
@ -101,9 +111,41 @@ func (b Bus) Event(ctx context.Context, key string, data map[string]any) {
}
}
func repoHash(repoProvider, bucket, prefix, tenantID string) string {
func storageID(s storage.Storage) string {
id := s.Provider.String()
switch s.Provider {
case storage.ProviderS3:
s3, err := s.S3Config()
if err != nil {
return id
}
id += s3.Bucket + s3.Prefix
}
return id
}
func accountID(a account.Account) string {
var id string
switch a.Provider {
case account.ProviderM365:
m, err := a.M365Config()
if err != nil {
return id
}
id += m.TenantID
}
return id
}
func repoHash(s storage.Storage, a account.Account) string {
sum := md5.Sum(
[]byte(repoProvider + bucket + prefix + tenantID),
[]byte(storageID(s) + accountID(a)),
)
return fmt.Sprintf("%x", sum)

View File

@ -8,6 +8,9 @@ import (
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/credentials"
"github.com/alcionai/corso/src/pkg/storage"
)
type EventsIntegrationSuite struct {
@ -25,7 +28,28 @@ func TestMetricsIntegrationSuite(t *testing.T) {
func (suite *EventsIntegrationSuite) TestNewBus() {
t := suite.T()
b := events.NewBus("s3", "bckt", "prfx", "tenid")
s, err := storage.NewStorage(
storage.ProviderS3,
storage.S3Config{
Bucket: "bckt",
Prefix: "prfx",
},
)
require.NoError(t, err)
a, err := account.NewAccount(
account.ProviderM365,
account.M365Config{
M365: credentials.M365{
ClientID: "id",
ClientSecret: "secret",
},
TenantID: "tid",
},
)
require.NoError(t, err)
b := events.NewBus(s, a)
require.NotEmpty(t, b)
require.NoError(t, b.Close())

View File

@ -9,6 +9,7 @@ import (
"github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/model"
"github.com/alcionai/corso/src/internal/stats"
@ -46,9 +47,10 @@ func NewBackupOperation(
sw *store.Wrapper,
acct account.Account,
selector selectors.Selector,
bus events.Bus,
) (BackupOperation, error) {
op := BackupOperation{
operation: newOperation(opts, kw, sw),
operation: newOperation(opts, bus, kw, sw),
Selectors: selector,
Version: "v0",
account: acct,
@ -80,12 +82,23 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
var (
opStats backupStats
backupDetails *details.Details
startTime = time.Now()
)
// TODO: persist initial state of backupOperation in modelstore
op.bus.Event(
ctx,
events.BackupStart,
map[string]any{
events.StartTime: startTime,
// TODO: initial backup ID,
// TODO: events.ExchangeResources: <count of resources>,
},
)
// persist operation results to the model store on exit
defer func() {
err = op.persistResults(time.Now(), &opStats)
err = op.persistResults(startTime, &opStats)
if err != nil {
return
}
@ -185,5 +198,20 @@ func (op *BackupOperation) createBackupModels(
op.Results.BackupID = b.ID
op.bus.Event(
ctx,
events.BackupEnd,
map[string]any{
events.BackupID: b.ID,
events.Status: op.Status,
events.StartTime: op.Results.StartedAt,
events.EndTime: op.Results.CompletedAt,
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
// TODO: events.ExchangeResources: <count of resources>,
// TODO: events.ExchangeDataObserved: <amount of data retrieved>,
// TODO: events.ExchangeDataStored: <amount of data stored>,
},
)
return nil
}

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account"
@ -53,7 +54,14 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
}
)
op, err := NewBackupOperation(ctx, control.Options{}, kw, sw, acct, selectors.Selector{})
op, err := NewBackupOperation(
ctx,
control.Options{},
kw,
sw,
acct,
selectors.Selector{},
events.Bus{})
require.NoError(t, err)
require.NoError(t, op.persistResults(now, &stats))
@ -119,7 +127,8 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
test.kw,
test.sw,
test.acct,
selectors.Selector{})
selectors.Selector{},
events.Bus{})
test.errCheck(t, err)
})
}
@ -193,7 +202,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
sw,
acct,
*selected,
)
events.Bus{})
require.NoError(t, err)
require.NoError(t, bo.Run(ctx))
@ -246,7 +255,8 @@ func (suite *BackupOpIntegrationSuite) TestBackupOneDrive_Run() {
kw,
sw,
acct,
sel.Selector)
sel.Selector,
events.Bus{})
require.NoError(t, err)
require.NoError(t, bo.Run(ctx))

View File

@ -5,6 +5,7 @@ import (
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/store"
@ -41,18 +42,21 @@ type operation struct {
Options control.Options `json:"options"`
Status opStatus `json:"status"`
bus events.Bus
kopia *kopia.Wrapper
store *store.Wrapper
}
func newOperation(
opts control.Options,
bus events.Bus,
kw *kopia.Wrapper,
sw *store.Wrapper,
) operation {
return operation{
CreatedAt: time.Now(),
Options: opts,
bus: bus,
kopia: kw,
store: sw,
Status: InProgress,

View File

@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/store"
@ -22,7 +23,7 @@ func TestOperationSuite(t *testing.T) {
func (suite *OperationSuite) TestNewOperation() {
t := suite.T()
op := newOperation(control.Options{}, nil, nil)
op := newOperation(control.Options{}, events.Bus{}, nil, nil)
assert.Greater(t, op.CreatedAt, time.Time{})
}
@ -42,7 +43,7 @@ func (suite *OperationSuite) TestOperation_Validate() {
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
op := newOperation(control.Options{}, test.kw, test.sw)
op := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw)
test.errCheck(t, op.validate())
})
}

View File

@ -10,6 +10,7 @@ import (
"github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/model"
"github.com/alcionai/corso/src/internal/path"
@ -48,9 +49,10 @@ func NewRestoreOperation(
acct account.Account,
backupID model.StableID,
sel selectors.Selector,
bus events.Bus,
) (RestoreOperation, error) {
op := RestoreOperation{
operation: newOperation(opts, kw, sw),
operation: newOperation(opts, bus, kw, sw),
BackupID: backupID,
Selectors: sel,
Version: "v0",
@ -80,12 +82,28 @@ type restoreStats struct {
// Run begins a synchronous restore operation.
func (op *RestoreOperation) Run(ctx context.Context) (err error) {
startTime := time.Now()
// TODO: persist initial state of restoreOperation in modelstore
op.bus.Event(
ctx,
events.RestoreStart,
map[string]any{
events.StartTime: startTime,
events.BackupID: op.BackupID,
// TODO: initial backup ID,
// TODO: events.ExchangeResources: <count of resources>,
// TODO: source backup time,
// TODO: restore options,
},
)
// persist operation results to the model store on exit
opStats := restoreStats{}
// TODO: persist results?
defer func() {
err = op.persistResults(time.Now(), &opStats)
err = op.persistResults(ctx, startTime, &opStats)
if err != nil {
return
}
@ -170,6 +188,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (err error) {
// writes the restoreOperation outcome to the modelStore.
func (op *RestoreOperation) persistResults(
ctx context.Context,
started time.Time,
opStats *restoreStats,
) error {
@ -192,5 +211,22 @@ func (op *RestoreOperation) persistResults(
op.Results.ItemsRead = len(opStats.cs) // TODO: file count, not collection count
op.Results.ItemsWritten = opStats.gc.Successful
op.bus.Event(
ctx,
events.RestoreEnd,
map[string]any{
// TODO: RestoreID
events.BackupID: op.BackupID,
events.Status: op.Status,
events.StartTime: op.Results.StartedAt,
events.EndTime: op.Results.CompletedAt,
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
events.ItemsRead: op.Results.ItemsRead,
events.ItemsWritten: op.Results.ItemsWritten,
// TODO: events.ExchangeResources: <count of resources>,
// TODO: events.ExchangeDataObserved: <amount of data retrieved>,
},
)
return nil
}

View File

@ -13,6 +13,7 @@ import (
"github.com/alcionai/corso/src/internal/connector/exchange"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/model"
"github.com/alcionai/corso/src/internal/tester"
@ -56,10 +57,18 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
}
)
op, err := NewRestoreOperation(ctx, control.Options{}, kw, sw, acct, "foo", selectors.Selector{})
op, err := NewRestoreOperation(
ctx,
control.Options{},
kw,
sw,
acct,
"foo",
selectors.Selector{},
events.Bus{})
require.NoError(t, err)
require.NoError(t, op.persistResults(now, &stats))
require.NoError(t, op.persistResults(ctx, now, &stats))
assert.Equal(t, op.Status.String(), Completed.String(), "status")
assert.Equal(t, op.Results.ItemsRead, len(stats.cs), "items read")
@ -138,7 +147,8 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() {
kw,
sw,
acct,
bsel.Selector)
bsel.Selector,
events.Bus{})
require.NoError(t, err)
require.NoError(t, bo.Run(ctx))
require.NotEmpty(t, bo.Results.BackupID)
@ -189,7 +199,8 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
test.sw,
test.acct,
"backup-id",
selectors.Selector{})
selectors.Selector{},
events.Bus{})
test.errCheck(t, err)
})
}
@ -209,7 +220,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() {
suite.sw,
tester.NewM365Account(t),
suite.backupID,
rsel.Selector)
rsel.Selector,
events.Bus{})
require.NoError(t, err)
require.NoError(t, ro.Run(ctx), "restoreOp.Run()")
@ -236,7 +248,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() {
suite.sw,
tester.NewM365Account(t),
suite.backupID,
rsel.Selector)
rsel.Selector,
events.Bus{})
require.NoError(t, err)
require.Error(t, ro.Run(ctx), "restoreOp.Run() should have 0 results")
}

View File

@ -5,8 +5,8 @@ import (
"time"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/model"
"github.com/alcionai/corso/src/internal/operations"
@ -14,6 +14,7 @@ import (
"github.com/alcionai/corso/src/pkg/backup"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/storage"
"github.com/alcionai/corso/src/pkg/store"
@ -28,7 +29,7 @@ type Repository struct {
Account account.Account // the user's m365 account connection details
Storage storage.Storage // the storage provider details and configuration
// Bus events.Bus
Bus events.Bus
dataLayer *kopia.Wrapper
modelStore *kopia.ModelStore
}
@ -69,10 +70,13 @@ func Initialize(
Version: "v1",
Account: acct,
Storage: s,
Bus: events.NewBus(s, acct),
dataLayer: w,
modelStore: ms,
}
r.Bus.Event(ctx, events.RepoInit, nil)
return &r, nil
}
@ -109,6 +113,7 @@ func Connect(
Version: "v1",
Account: acct,
Storage: s,
Bus: events.NewBus(s, acct),
dataLayer: w,
modelStore: ms,
}
@ -117,23 +122,27 @@ func Connect(
}
func (r *Repository) Close(ctx context.Context) error {
if err := r.Bus.Close(); err != nil {
logger.Ctx(ctx).Debugw("closing the event bus", "err", err)
}
if r.dataLayer != nil {
err := r.dataLayer.Close(ctx)
if err := r.dataLayer.Close(ctx); err != nil {
logger.Ctx(ctx).Debugw("closing Datalayer", "err", err)
}
r.dataLayer = nil
if err != nil {
return errors.Wrap(err, "closing corso DataLayer")
}
}
if r.modelStore == nil {
return nil
if r.modelStore != nil {
if err := r.modelStore.Close(ctx); err != nil {
logger.Ctx(ctx).Debugw("closing modelStore", "err", err)
}
err := r.modelStore.Close(ctx)
r.modelStore = nil
}
return errors.Wrap(err, "closing corso ModelStore")
return nil
}
// NewBackup generates a BackupOperation runner.
@ -148,7 +157,8 @@ func (r Repository) NewBackup(
r.dataLayer,
store.NewKopiaStore(r.modelStore),
r.Account,
selector)
selector,
r.Bus)
}
// NewRestore generates a restoreOperation runner.
@ -165,7 +175,8 @@ func (r Repository) NewRestore(
store.NewKopiaStore(r.modelStore),
r.Account,
model.StableID(backupID),
sel)
sel,
r.Bus)
}
// backups lists a backup by id