remove readErr and writeErr from operations (#2587)
## Description Now that fault is in place, we can remove the readErrs and writeErrs from operation persistence. ## Does this PR need a docs update or release note? - [x] ⛔ No ## Type of change - [x] 🧹 Tech Debt/Cleanup ## Issue(s) * #1970 ## Test Plan - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
b5c6c77b62
commit
3a2203ded5
@ -6,7 +6,6 @@ import (
|
|||||||
|
|
||||||
"github.com/alcionai/clues"
|
"github.com/alcionai/clues"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
multierror "github.com/hashicorp/go-multierror"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/common"
|
"github.com/alcionai/corso/src/internal/common"
|
||||||
@ -93,10 +92,9 @@ func (op BackupOperation) validate() error {
|
|||||||
// pointer wrapping the values, while those values
|
// pointer wrapping the values, while those values
|
||||||
// get populated asynchronously.
|
// get populated asynchronously.
|
||||||
type backupStats struct {
|
type backupStats struct {
|
||||||
k *kopia.BackupStats
|
k *kopia.BackupStats
|
||||||
gc *support.ConnectorOperationStatus
|
gc *support.ConnectorOperationStatus
|
||||||
resourceCount int
|
resourceCount int
|
||||||
readErr, writeErr error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type detailsWriter interface {
|
type detailsWriter interface {
|
||||||
@ -166,7 +164,6 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
With("err", err).
|
With("err", err).
|
||||||
Errorw("doing backup", clues.InErr(err).Slice()...)
|
Errorw("doing backup", clues.InErr(err).Slice()...)
|
||||||
op.Errors.Fail(errors.Wrap(err, "doing backup"))
|
op.Errors.Fail(errors.Wrap(err, "doing backup"))
|
||||||
opStats.readErr = op.Errors.Failure()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: the consumer (sdk or cli) should run this, not operations.
|
// TODO: the consumer (sdk or cli) should run this, not operations.
|
||||||
@ -185,8 +182,6 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
err = op.persistResults(startTime, &opStats)
|
err = op.persistResults(startTime, &opStats)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
op.Errors.Fail(errors.Wrap(err, "persisting backup results"))
|
op.Errors.Fail(errors.Wrap(err, "persisting backup results"))
|
||||||
opStats.writeErr = op.Errors.Failure()
|
|
||||||
|
|
||||||
return op.Errors.Failure()
|
return op.Errors.Failure()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -212,8 +207,6 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
deets.Details())
|
deets.Details())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
op.Errors.Fail(errors.Wrap(err, "persisting backup"))
|
op.Errors.Fail(errors.Wrap(err, "persisting backup"))
|
||||||
opStats.writeErr = op.Errors.Failure()
|
|
||||||
|
|
||||||
return op.Errors.Failure()
|
return op.Errors.Failure()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -628,19 +621,11 @@ func (op *BackupOperation) persistResults(
|
|||||||
) error {
|
) error {
|
||||||
op.Results.StartedAt = started
|
op.Results.StartedAt = started
|
||||||
op.Results.CompletedAt = time.Now()
|
op.Results.CompletedAt = time.Now()
|
||||||
op.Results.ReadErrors = opStats.readErr
|
|
||||||
op.Results.WriteErrors = opStats.writeErr
|
|
||||||
|
|
||||||
op.Status = Completed
|
op.Status = Completed
|
||||||
|
|
||||||
if opStats.readErr != nil || opStats.writeErr != nil {
|
if op.Errors.Failure() != nil {
|
||||||
op.Status = Failed
|
op.Status = Failed
|
||||||
|
|
||||||
// TODO(keepers): replace with fault.Errors handling.
|
|
||||||
return multierror.Append(
|
|
||||||
errors.New("errors prevented the operation from processing"),
|
|
||||||
opStats.readErr,
|
|
||||||
opStats.writeErr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
op.Results.BytesRead = opStats.k.TotalHashedBytes
|
op.Results.BytesRead = opStats.k.TotalHashedBytes
|
||||||
@ -653,13 +638,13 @@ func (op *BackupOperation) persistResults(
|
|||||||
return errors.New("backup population never completed")
|
return errors.New("backup population never completed")
|
||||||
}
|
}
|
||||||
|
|
||||||
if opStats.gc.Metrics.Successes == 0 {
|
if op.Status != Failed && opStats.gc.Metrics.Successes == 0 {
|
||||||
op.Status = NoData
|
op.Status = NoData
|
||||||
}
|
}
|
||||||
|
|
||||||
op.Results.ItemsRead = opStats.gc.Metrics.Successes
|
op.Results.ItemsRead = opStats.gc.Metrics.Successes
|
||||||
|
|
||||||
return nil
|
return op.Errors.Failure()
|
||||||
}
|
}
|
||||||
|
|
||||||
// stores the operation details, results, and selectors in the backup manifest.
|
// stores the operation details, results, and selectors in the backup manifest.
|
||||||
|
|||||||
@ -379,6 +379,7 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
|
|||||||
expectStatus opStatus
|
expectStatus opStatus
|
||||||
expectErr assert.ErrorAssertionFunc
|
expectErr assert.ErrorAssertionFunc
|
||||||
stats backupStats
|
stats backupStats
|
||||||
|
fail error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
expectStatus: Completed,
|
expectStatus: Completed,
|
||||||
@ -398,10 +399,10 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
|
|||||||
{
|
{
|
||||||
expectStatus: Failed,
|
expectStatus: Failed,
|
||||||
expectErr: assert.Error,
|
expectErr: assert.Error,
|
||||||
|
fail: assert.AnError,
|
||||||
stats: backupStats{
|
stats: backupStats{
|
||||||
readErr: assert.AnError,
|
k: &kopia.BackupStats{},
|
||||||
k: &kopia.BackupStats{},
|
gc: &support.ConnectorOperationStatus{},
|
||||||
gc: &support.ConnectorOperationStatus{},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -428,6 +429,9 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
|
|||||||
sel,
|
sel,
|
||||||
evmock.NewBus())
|
evmock.NewBus())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
op.Errors.Fail(test.fail)
|
||||||
|
|
||||||
test.expectErr(t, op.persistResults(now, &test.stats))
|
test.expectErr(t, op.persistResults(now, &test.stats))
|
||||||
|
|
||||||
assert.Equal(t, test.expectStatus.String(), op.Status.String(), "status")
|
assert.Equal(t, test.expectStatus.String(), op.Status.String(), "status")
|
||||||
@ -436,8 +440,6 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
|
|||||||
assert.Equal(t, test.stats.k.TotalHashedBytes, op.Results.BytesRead, "bytes read")
|
assert.Equal(t, test.stats.k.TotalHashedBytes, op.Results.BytesRead, "bytes read")
|
||||||
assert.Equal(t, test.stats.k.TotalUploadedBytes, op.Results.BytesUploaded, "bytes written")
|
assert.Equal(t, test.stats.k.TotalUploadedBytes, op.Results.BytesUploaded, "bytes written")
|
||||||
assert.Equal(t, test.stats.resourceCount, op.Results.ResourceOwners, "resource owners")
|
assert.Equal(t, test.stats.resourceCount, op.Results.ResourceOwners, "resource owners")
|
||||||
assert.Equal(t, test.stats.readErr, op.Results.ReadErrors, "read errors")
|
|
||||||
assert.Equal(t, test.stats.writeErr, op.Results.WriteErrors, "write errors")
|
|
||||||
assert.Equal(t, now, op.Results.StartedAt, "started at")
|
assert.Equal(t, now, op.Results.StartedAt, "started at")
|
||||||
assert.Less(t, now, op.Results.CompletedAt, "completed at")
|
assert.Less(t, now, op.Results.CompletedAt, "completed at")
|
||||||
})
|
})
|
||||||
|
|||||||
@ -8,7 +8,6 @@ import (
|
|||||||
|
|
||||||
"github.com/alcionai/clues"
|
"github.com/alcionai/clues"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
multierror "github.com/hashicorp/go-multierror"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/common"
|
"github.com/alcionai/corso/src/internal/common"
|
||||||
@ -88,11 +87,10 @@ func (op RestoreOperation) validate() error {
|
|||||||
// pointer wrapping the values, while those values
|
// pointer wrapping the values, while those values
|
||||||
// get populated asynchronously.
|
// get populated asynchronously.
|
||||||
type restoreStats struct {
|
type restoreStats struct {
|
||||||
cs []data.RestoreCollection
|
cs []data.RestoreCollection
|
||||||
gc *support.ConnectorOperationStatus
|
gc *support.ConnectorOperationStatus
|
||||||
bytesRead *stats.ByteCounter
|
bytesRead *stats.ByteCounter
|
||||||
resourceCount int
|
resourceCount int
|
||||||
readErr, writeErr error
|
|
||||||
|
|
||||||
// a transient value only used to pair up start-end events.
|
// a transient value only used to pair up start-end events.
|
||||||
restoreID string
|
restoreID string
|
||||||
@ -153,7 +151,6 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
|
|||||||
With("err", err).
|
With("err", err).
|
||||||
Errorw("doing restore", clues.InErr(err).Slice()...)
|
Errorw("doing restore", clues.InErr(err).Slice()...)
|
||||||
op.Errors.Fail(errors.Wrap(err, "doing restore"))
|
op.Errors.Fail(errors.Wrap(err, "doing restore"))
|
||||||
opStats.readErr = op.Errors.Failure()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: the consumer (sdk or cli) should run this, not operations.
|
// TODO: the consumer (sdk or cli) should run this, not operations.
|
||||||
@ -172,8 +169,6 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
|
|||||||
err = op.persistResults(ctx, start, &opStats)
|
err = op.persistResults(ctx, start, &opStats)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
op.Errors.Fail(errors.Wrap(err, "persisting restore results"))
|
op.Errors.Fail(errors.Wrap(err, "persisting restore results"))
|
||||||
opStats.writeErr = op.Errors.Failure()
|
|
||||||
|
|
||||||
return nil, op.Errors.Failure()
|
return nil, op.Errors.Failure()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -277,19 +272,11 @@ func (op *RestoreOperation) persistResults(
|
|||||||
) error {
|
) error {
|
||||||
op.Results.StartedAt = started
|
op.Results.StartedAt = started
|
||||||
op.Results.CompletedAt = time.Now()
|
op.Results.CompletedAt = time.Now()
|
||||||
op.Results.ReadErrors = opStats.readErr
|
|
||||||
op.Results.WriteErrors = opStats.writeErr
|
|
||||||
|
|
||||||
op.Status = Completed
|
op.Status = Completed
|
||||||
|
|
||||||
if opStats.readErr != nil || opStats.writeErr != nil {
|
if op.Errors.Failure() != nil {
|
||||||
op.Status = Failed
|
op.Status = Failed
|
||||||
|
|
||||||
// TODO(keepers): replace with fault.Errors handling.
|
|
||||||
return multierror.Append(
|
|
||||||
errors.New("errors prevented the operation from processing"),
|
|
||||||
opStats.readErr,
|
|
||||||
opStats.writeErr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
op.Results.BytesRead = opStats.bytesRead.NumBytes
|
op.Results.BytesRead = opStats.bytesRead.NumBytes
|
||||||
@ -301,21 +288,19 @@ func (op *RestoreOperation) persistResults(
|
|||||||
return errors.New("restoration never completed")
|
return errors.New("restoration never completed")
|
||||||
}
|
}
|
||||||
|
|
||||||
if opStats.gc.Metrics.Successes == 0 {
|
if op.Status != Failed && opStats.gc.Metrics.Successes == 0 {
|
||||||
op.Status = NoData
|
op.Status = NoData
|
||||||
}
|
}
|
||||||
|
|
||||||
op.Results.ItemsWritten = opStats.gc.Metrics.Successes
|
op.Results.ItemsWritten = opStats.gc.Metrics.Successes
|
||||||
|
|
||||||
dur := op.Results.CompletedAt.Sub(op.Results.StartedAt)
|
|
||||||
|
|
||||||
op.bus.Event(
|
op.bus.Event(
|
||||||
ctx,
|
ctx,
|
||||||
events.RestoreEnd,
|
events.RestoreEnd,
|
||||||
map[string]any{
|
map[string]any{
|
||||||
events.BackupID: op.BackupID,
|
events.BackupID: op.BackupID,
|
||||||
events.DataRetrieved: op.Results.BytesRead,
|
events.DataRetrieved: op.Results.BytesRead,
|
||||||
events.Duration: dur,
|
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
||||||
events.EndTime: common.FormatTime(op.Results.CompletedAt),
|
events.EndTime: common.FormatTime(op.Results.CompletedAt),
|
||||||
events.ItemsRead: op.Results.ItemsRead,
|
events.ItemsRead: op.Results.ItemsRead,
|
||||||
events.ItemsWritten: op.Results.ItemsWritten,
|
events.ItemsWritten: op.Results.ItemsWritten,
|
||||||
@ -327,7 +312,7 @@ func (op *RestoreOperation) persistResults(
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
return nil
|
return op.Errors.Failure()
|
||||||
}
|
}
|
||||||
|
|
||||||
// formatDetailsForRestoration reduces the provided detail entries according to the
|
// formatDetailsForRestoration reduces the provided detail entries according to the
|
||||||
|
|||||||
@ -53,6 +53,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
|
|||||||
expectStatus opStatus
|
expectStatus opStatus
|
||||||
expectErr assert.ErrorAssertionFunc
|
expectErr assert.ErrorAssertionFunc
|
||||||
stats restoreStats
|
stats restoreStats
|
||||||
|
fail error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
expectStatus: Completed,
|
expectStatus: Completed,
|
||||||
@ -78,8 +79,8 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
|
|||||||
{
|
{
|
||||||
expectStatus: Failed,
|
expectStatus: Failed,
|
||||||
expectErr: assert.Error,
|
expectErr: assert.Error,
|
||||||
|
fail: assert.AnError,
|
||||||
stats: restoreStats{
|
stats: restoreStats{
|
||||||
readErr: assert.AnError,
|
|
||||||
bytesRead: &stats.ByteCounter{},
|
bytesRead: &stats.ByteCounter{},
|
||||||
gc: &support.ConnectorOperationStatus{},
|
gc: &support.ConnectorOperationStatus{},
|
||||||
},
|
},
|
||||||
@ -109,6 +110,9 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
|
|||||||
dest,
|
dest,
|
||||||
evmock.NewBus())
|
evmock.NewBus())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
op.Errors.Fail(test.fail)
|
||||||
|
|
||||||
test.expectErr(t, op.persistResults(ctx, now, &test.stats))
|
test.expectErr(t, op.persistResults(ctx, now, &test.stats))
|
||||||
|
|
||||||
assert.Equal(t, test.expectStatus.String(), op.Status.String(), "status")
|
assert.Equal(t, test.expectStatus.String(), op.Status.String(), "status")
|
||||||
@ -116,8 +120,6 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
|
|||||||
assert.Equal(t, test.stats.gc.Metrics.Successes, op.Results.ItemsWritten, "items written")
|
assert.Equal(t, test.stats.gc.Metrics.Successes, op.Results.ItemsWritten, "items written")
|
||||||
assert.Equal(t, test.stats.bytesRead.NumBytes, op.Results.BytesRead, "resource owners")
|
assert.Equal(t, test.stats.bytesRead.NumBytes, op.Results.BytesRead, "resource owners")
|
||||||
assert.Equal(t, test.stats.resourceCount, op.Results.ResourceOwners, "resource owners")
|
assert.Equal(t, test.stats.resourceCount, op.Results.ResourceOwners, "resource owners")
|
||||||
assert.Equal(t, test.stats.readErr, op.Results.ReadErrors, "read errors")
|
|
||||||
assert.Equal(t, test.stats.writeErr, op.Results.WriteErrors, "write errors")
|
|
||||||
assert.Equal(t, now, op.Results.StartedAt, "started at")
|
assert.Equal(t, now, op.Results.StartedAt, "started at")
|
||||||
assert.Less(t, now, op.Results.CompletedAt, "completed at")
|
assert.Less(t, now, op.Results.CompletedAt, "completed at")
|
||||||
})
|
})
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user