add bup/restore ids for events (#946)
## Description Start-end event pairs want ids as a means to match each pair. Restore operations now gen a transient id for this purpose. Backup ops pre- generate the base ID instead of waiting for the model store to generate the ID at the end of the operation. ## Type of change - [x] 🌻 Feature ## Issue(s) * #894 ## Test Plan - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
1df48997ae
commit
984df8fcfb
@ -37,6 +37,7 @@ const (
|
|||||||
ItemsRead = "items-read"
|
ItemsRead = "items-read"
|
||||||
ItemsWritten = "items-written"
|
ItemsWritten = "items-written"
|
||||||
Resources = "resources"
|
Resources = "resources"
|
||||||
|
RestoreID = "restore-id"
|
||||||
Service = "service"
|
Service = "service"
|
||||||
StartTime = "start-time"
|
StartTime = "start-time"
|
||||||
Status = "status"
|
Status = "status"
|
||||||
|
|||||||
@ -107,7 +107,7 @@ func putInner(
|
|||||||
}
|
}
|
||||||
|
|
||||||
base := m.Base()
|
base := m.Base()
|
||||||
if create {
|
if create && len(base.ID) == 0 {
|
||||||
base.ID = model.StableID(uuid.NewString())
|
base.ID = model.StableID(uuid.NewString())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -260,6 +260,54 @@ func (suite *ModelStoreIntegrationSuite) TestPutGet() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *ModelStoreIntegrationSuite) TestPutGet_PreSetID() {
|
||||||
|
mdl := model.BackupOpSchema
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
baseID string
|
||||||
|
expect assert.ComparisonAssertionFunc
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "genreate new id",
|
||||||
|
baseID: "",
|
||||||
|
expect: assert.NotEqual,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "use provided id",
|
||||||
|
baseID: uuid.NewString(),
|
||||||
|
expect: assert.Equal,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range table {
|
||||||
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
|
foo := &fooModel{
|
||||||
|
BaseModel: model.BaseModel{ID: model.StableID(test.baseID)},
|
||||||
|
Bar: uuid.NewString(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Avoid some silly test errors from comparing nil to empty map.
|
||||||
|
foo.Tags = map[string]string{}
|
||||||
|
|
||||||
|
err := suite.m.Put(suite.ctx, mdl, foo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
test.expect(t, model.StableID(test.baseID), foo.ID)
|
||||||
|
require.NotEmpty(t, foo.ModelStoreID)
|
||||||
|
require.NotEmpty(t, foo.ID)
|
||||||
|
|
||||||
|
returned := &fooModel{}
|
||||||
|
err = suite.m.Get(suite.ctx, mdl, foo.ID, returned)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, foo, returned)
|
||||||
|
|
||||||
|
err = suite.m.GetWithModelStoreID(suite.ctx, mdl, foo.ModelStoreID, returned)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, foo, returned)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (suite *ModelStoreIntegrationSuite) TestPutGet_WithTags() {
|
func (suite *ModelStoreIntegrationSuite) TestPutGet_WithTags() {
|
||||||
t := suite.T()
|
t := suite.T()
|
||||||
theModelType := model.BackupOpSchema
|
theModelType := model.BackupOpSchema
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
multierror "github.com/hashicorp/go-multierror"
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
@ -87,7 +88,8 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
backupDetails *details.Details
|
backupDetails *details.Details
|
||||||
startTime = time.Now()
|
startTime = time.Now()
|
||||||
)
|
)
|
||||||
// TODO: persist initial state of backupOperation in modelstore
|
|
||||||
|
op.Results.BackupID = model.StableID(uuid.NewString())
|
||||||
|
|
||||||
op.bus.Event(
|
op.bus.Event(
|
||||||
ctx,
|
ctx,
|
||||||
@ -95,7 +97,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
map[string]any{
|
map[string]any{
|
||||||
events.StartTime: startTime,
|
events.StartTime: startTime,
|
||||||
events.Service: op.Selectors.Service.String(),
|
events.Service: op.Selectors.Service.String(),
|
||||||
// TODO: initial backup ID,
|
events.BackupID: op.Results.BackupID,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -195,6 +197,7 @@ func (op *BackupOperation) createBackupModels(
|
|||||||
|
|
||||||
b := backup.New(
|
b := backup.New(
|
||||||
snapID, string(backupDetails.ModelStoreID), op.Status.String(),
|
snapID, string(backupDetails.ModelStoreID), op.Status.String(),
|
||||||
|
op.Results.BackupID,
|
||||||
op.Selectors,
|
op.Selectors,
|
||||||
op.Results.ReadWrites,
|
op.Results.ReadWrites,
|
||||||
op.Results.StartAndEndTime,
|
op.Results.StartAndEndTime,
|
||||||
@ -205,20 +208,18 @@ func (op *BackupOperation) createBackupModels(
|
|||||||
return errors.Wrap(err, "creating backup model")
|
return errors.Wrap(err, "creating backup model")
|
||||||
}
|
}
|
||||||
|
|
||||||
op.Results.BackupID = b.ID
|
|
||||||
|
|
||||||
op.bus.Event(
|
op.bus.Event(
|
||||||
ctx,
|
ctx,
|
||||||
events.BackupEnd,
|
events.BackupEnd,
|
||||||
map[string]any{
|
map[string]any{
|
||||||
events.BackupID: b.ID,
|
events.BackupID: b.ID,
|
||||||
events.Service: op.Selectors.Service.String(),
|
|
||||||
events.Status: op.Status,
|
|
||||||
events.StartTime: op.Results.StartedAt,
|
|
||||||
events.EndTime: op.Results.CompletedAt,
|
|
||||||
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
|
||||||
events.DataStored: op.Results.BytesUploaded,
|
events.DataStored: op.Results.BytesUploaded,
|
||||||
|
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
||||||
|
events.EndTime: op.Results.CompletedAt,
|
||||||
events.Resources: op.Results.ResourceOwners,
|
events.Resources: op.Results.ResourceOwners,
|
||||||
|
events.Service: op.Selectors.Service.String(),
|
||||||
|
events.StartTime: op.Results.StartedAt,
|
||||||
|
events.Status: op.Status,
|
||||||
// TODO: events.ExchangeDataObserved: <amount of data retrieved>,
|
// TODO: events.ExchangeDataObserved: <amount of data retrieved>,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@ -227,6 +227,9 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
|
|||||||
assert.Zero(t, bo.Results.WriteErrors)
|
assert.Zero(t, bo.Results.WriteErrors)
|
||||||
assert.Equal(t, 1, mb.TimesCalled[events.BackupStart], "backup-start events")
|
assert.Equal(t, 1, mb.TimesCalled[events.BackupStart], "backup-start events")
|
||||||
assert.Equal(t, 1, mb.TimesCalled[events.BackupEnd], "backup-end events")
|
assert.Equal(t, 1, mb.TimesCalled[events.BackupEnd], "backup-end events")
|
||||||
|
assert.Equal(t,
|
||||||
|
mb.CalledWith[events.BackupStart][0][events.BackupID],
|
||||||
|
bo.Results.BackupID, "backupID pre-declaration")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -287,4 +290,7 @@ func (suite *BackupOpIntegrationSuite) TestBackupOneDrive_Run() {
|
|||||||
assert.NoError(t, bo.Results.WriteErrors)
|
assert.NoError(t, bo.Results.WriteErrors)
|
||||||
assert.Equal(t, 1, mb.TimesCalled[events.BackupStart], "backup-start events")
|
assert.Equal(t, 1, mb.TimesCalled[events.BackupStart], "backup-start events")
|
||||||
assert.Equal(t, 1, mb.TimesCalled[events.BackupEnd], "backup-end events")
|
assert.Equal(t, 1, mb.TimesCalled[events.BackupEnd], "backup-end events")
|
||||||
|
assert.Equal(t,
|
||||||
|
mb.CalledWith[events.BackupStart][0][events.BackupID],
|
||||||
|
bo.Results.BackupID, "backupID pre-declaration")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
multierror "github.com/hashicorp/go-multierror"
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
@ -82,6 +83,9 @@ type restoreStats struct {
|
|||||||
resourceCount int
|
resourceCount int
|
||||||
started bool
|
started bool
|
||||||
readErr, writeErr error
|
readErr, writeErr error
|
||||||
|
|
||||||
|
// a transient value only used to pair up start-end events.
|
||||||
|
restoreID string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run begins a synchronous restore operation.
|
// Run begins a synchronous restore operation.
|
||||||
@ -91,8 +95,8 @@ func (op *RestoreOperation) Run(ctx context.Context) (err error) {
|
|||||||
// persist operation results to the model store on exit
|
// persist operation results to the model store on exit
|
||||||
opStats := restoreStats{
|
opStats := restoreStats{
|
||||||
bytesRead: &stats.ByteCounter{},
|
bytesRead: &stats.ByteCounter{},
|
||||||
|
restoreID: uuid.NewString(),
|
||||||
}
|
}
|
||||||
// TODO: persist results?
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
err = op.persistResults(ctx, startTime, &opStats)
|
err = op.persistResults(ctx, startTime, &opStats)
|
||||||
@ -110,7 +114,6 @@ func (op *RestoreOperation) Run(ctx context.Context) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: persist initial state of restoreOperation in modelstore
|
|
||||||
op.bus.Event(
|
op.bus.Event(
|
||||||
ctx,
|
ctx,
|
||||||
events.RestoreStart,
|
events.RestoreStart,
|
||||||
@ -118,6 +121,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (err error) {
|
|||||||
events.StartTime: startTime,
|
events.StartTime: startTime,
|
||||||
events.BackupID: op.BackupID,
|
events.BackupID: op.BackupID,
|
||||||
events.BackupCreateTime: b.CreationTime,
|
events.BackupCreateTime: b.CreationTime,
|
||||||
|
events.RestoreID: opStats.restoreID,
|
||||||
// TODO: restore options,
|
// TODO: restore options,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@ -237,15 +241,17 @@ func (op *RestoreOperation) persistResults(
|
|||||||
map[string]any{
|
map[string]any{
|
||||||
// TODO: RestoreID
|
// TODO: RestoreID
|
||||||
events.BackupID: op.BackupID,
|
events.BackupID: op.BackupID,
|
||||||
events.Service: op.Selectors.Service.String(),
|
events.DataRetrieved: op.Results.BytesRead,
|
||||||
events.Status: op.Status,
|
|
||||||
events.StartTime: op.Results.StartedAt,
|
|
||||||
events.EndTime: op.Results.CompletedAt,
|
|
||||||
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
||||||
|
events.EndTime: op.Results.CompletedAt,
|
||||||
events.ItemsRead: op.Results.ItemsRead,
|
events.ItemsRead: op.Results.ItemsRead,
|
||||||
events.ItemsWritten: op.Results.ItemsWritten,
|
events.ItemsWritten: op.Results.ItemsWritten,
|
||||||
events.Resources: op.Results.ResourceOwners,
|
events.Resources: op.Results.ResourceOwners,
|
||||||
events.DataRetrieved: op.Results.BytesRead,
|
events.RestoreID: opStats.restoreID,
|
||||||
|
events.Service: op.Selectors.Service.String(),
|
||||||
|
events.StartTime: op.Results.StartedAt,
|
||||||
|
events.Status: op.Status,
|
||||||
|
// TODO: events.ExchangeDataObserved: <amount of data retrieved>,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -42,11 +42,15 @@ var _ print.Printable = &Backup{}
|
|||||||
|
|
||||||
func New(
|
func New(
|
||||||
snapshotID, detailsID, status string,
|
snapshotID, detailsID, status string,
|
||||||
|
id model.StableID,
|
||||||
selector selectors.Selector,
|
selector selectors.Selector,
|
||||||
rw stats.ReadWrites,
|
rw stats.ReadWrites,
|
||||||
se stats.StartAndEndTime,
|
se stats.StartAndEndTime,
|
||||||
) *Backup {
|
) *Backup {
|
||||||
return &Backup{
|
return &Backup{
|
||||||
|
BaseModel: model.BaseModel{
|
||||||
|
ID: id,
|
||||||
|
},
|
||||||
CreationTime: time.Now(),
|
CreationTime: time.Now(),
|
||||||
SnapshotID: snapshotID,
|
SnapshotID: snapshotID,
|
||||||
DetailsID: detailsID,
|
DetailsID: detailsID,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user