prevent panics in failed operation runs (#524)
This commit is contained in:
parent
9f1c8aa64c
commit
ceec4dfb45
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/alcionai/corso/internal/connector"
|
"github.com/alcionai/corso/internal/connector"
|
||||||
@ -71,6 +72,7 @@ func (op BackupOperation) validate() error {
|
|||||||
type backupStats struct {
|
type backupStats struct {
|
||||||
k *kopia.BackupStats
|
k *kopia.BackupStats
|
||||||
gc *support.ConnectorOperationStatus
|
gc *support.ConnectorOperationStatus
|
||||||
|
started bool
|
||||||
readErr, writeErr error
|
readErr, writeErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,7 +86,10 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
backupDetails *details.Details
|
backupDetails *details.Details
|
||||||
)
|
)
|
||||||
defer func() {
|
defer func() {
|
||||||
op.persistResults(time.Now(), &opStats)
|
err = op.persistResults(time.Now(), &opStats)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
err = op.createBackupModels(ctx, opStats.k.SnapshotID, backupDetails)
|
err = op.createBackupModels(ctx, opStats.k.SnapshotID, backupDetails)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -96,23 +101,27 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
// retrieve data from the producer
|
// retrieve data from the producer
|
||||||
gc, err := connector.NewGraphConnector(op.account)
|
gc, err := connector.NewGraphConnector(op.account)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
err = errors.Wrap(err, "connecting to graph api")
|
||||||
opStats.readErr = err
|
opStats.readErr = err
|
||||||
return errors.Wrap(err, "connecting to graph api")
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var cs []data.Collection
|
var cs []data.Collection
|
||||||
cs, err = gc.ExchangeDataCollection(ctx, op.Selectors)
|
cs, err = gc.ExchangeDataCollection(ctx, op.Selectors)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
err = errors.Wrap(err, "retrieving service data")
|
||||||
opStats.readErr = err
|
opStats.readErr = err
|
||||||
return errors.Wrap(err, "retrieving service data")
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// hand the results to the consumer
|
// hand the results to the consumer
|
||||||
opStats.k, backupDetails, err = op.kopia.BackupCollections(ctx, cs)
|
opStats.k, backupDetails, err = op.kopia.BackupCollections(ctx, cs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
err = errors.Wrap(err, "backing up service data")
|
||||||
opStats.writeErr = err
|
opStats.writeErr = err
|
||||||
return errors.Wrap(err, "backing up service data")
|
return err
|
||||||
}
|
}
|
||||||
|
opStats.started = true
|
||||||
opStats.gc = gc.AwaitStatus()
|
opStats.gc = gc.AwaitStatus()
|
||||||
|
|
||||||
return err
|
return err
|
||||||
@ -123,24 +132,25 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
func (op *BackupOperation) persistResults(
|
func (op *BackupOperation) persistResults(
|
||||||
started time.Time,
|
started time.Time,
|
||||||
opStats *backupStats,
|
opStats *backupStats,
|
||||||
) {
|
) error {
|
||||||
|
op.Results.StartedAt = started
|
||||||
|
op.Results.CompletedAt = time.Now()
|
||||||
|
|
||||||
op.Status = Completed
|
op.Status = Completed
|
||||||
if opStats.k.TotalFileCount == 0 && (opStats.readErr != nil || opStats.writeErr != nil) {
|
if !opStats.started {
|
||||||
op.Status = Failed
|
op.Status = Failed
|
||||||
|
return multierror.Append(
|
||||||
|
errors.New("errors prevented the operation from processing"),
|
||||||
|
opStats.readErr,
|
||||||
|
opStats.writeErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
op.Results.ReadErrors = opStats.readErr
|
op.Results.ReadErrors = opStats.readErr
|
||||||
op.Results.WriteErrors = opStats.writeErr
|
op.Results.WriteErrors = opStats.writeErr
|
||||||
|
op.Results.ItemsRead = opStats.gc.Successful
|
||||||
|
op.Results.ItemsWritten = opStats.k.TotalFileCount
|
||||||
|
|
||||||
if opStats.gc != nil {
|
return nil
|
||||||
op.Results.ItemsRead = opStats.gc.Successful
|
|
||||||
}
|
|
||||||
if opStats.k != nil {
|
|
||||||
op.Results.ItemsWritten = opStats.k.TotalFileCount
|
|
||||||
}
|
|
||||||
|
|
||||||
op.Results.StartedAt = started
|
|
||||||
op.Results.CompletedAt = time.Now()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// stores the operation details, results, and selectors in the backup manifest.
|
// stores the operation details, results, and selectors in the backup manifest.
|
||||||
@ -149,6 +159,10 @@ func (op *BackupOperation) createBackupModels(
|
|||||||
snapID string,
|
snapID string,
|
||||||
backupDetails *details.Details,
|
backupDetails *details.Details,
|
||||||
) error {
|
) error {
|
||||||
|
if backupDetails == nil {
|
||||||
|
return errors.New("no backup details to record")
|
||||||
|
}
|
||||||
|
|
||||||
err := op.store.Put(ctx, model.BackupDetailsSchema, &backupDetails.DetailsModel)
|
err := op.store.Put(ctx, model.BackupDetailsSchema, &backupDetails.DetailsModel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "creating backupdetails model")
|
return errors.Wrap(err, "creating backupdetails model")
|
||||||
|
|||||||
@ -41,6 +41,7 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
|
|||||||
acct = account.Account{}
|
acct = account.Account{}
|
||||||
now = time.Now()
|
now = time.Now()
|
||||||
stats = backupStats{
|
stats = backupStats{
|
||||||
|
started: true,
|
||||||
readErr: multierror.Append(nil, assert.AnError),
|
readErr: multierror.Append(nil, assert.AnError),
|
||||||
writeErr: assert.AnError,
|
writeErr: assert.AnError,
|
||||||
k: &kopia.BackupStats{
|
k: &kopia.BackupStats{
|
||||||
@ -55,9 +56,9 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
|
|||||||
op, err := NewBackupOperation(ctx, control.Options{}, kw, sw, acct, selectors.Selector{})
|
op, err := NewBackupOperation(ctx, control.Options{}, kw, sw, acct, selectors.Selector{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
op.persistResults(now, &stats)
|
require.NoError(t, op.persistResults(now, &stats))
|
||||||
|
|
||||||
assert.Equal(t, op.Status, Completed, "status")
|
assert.Equal(t, op.Status.String(), Completed.String(), "status")
|
||||||
assert.Equal(t, op.Results.ItemsRead, stats.gc.Successful, "items read")
|
assert.Equal(t, op.Results.ItemsRead, stats.gc.Successful, "items read")
|
||||||
assert.Equal(t, op.Results.ReadErrors, stats.readErr, "read errors")
|
assert.Equal(t, op.Results.ReadErrors, stats.readErr, "read errors")
|
||||||
assert.Equal(t, op.Results.ItemsWritten, stats.k.TotalFileCount, "items written")
|
assert.Equal(t, op.Results.ItemsWritten, stats.k.TotalFileCount, "items written")
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/alcionai/corso/internal/connector"
|
"github.com/alcionai/corso/internal/connector"
|
||||||
@ -72,23 +73,32 @@ func (op RestoreOperation) validate() error {
|
|||||||
type restoreStats struct {
|
type restoreStats struct {
|
||||||
cs []data.Collection
|
cs []data.Collection
|
||||||
gc *support.ConnectorOperationStatus
|
gc *support.ConnectorOperationStatus
|
||||||
|
started bool
|
||||||
readErr, writeErr error
|
readErr, writeErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run begins a synchronous restore operation.
|
// Run begins a synchronous restore operation.
|
||||||
// todo (keepers): return stats block in first param.
|
// todo (keepers): return stats block in first param.
|
||||||
func (op *RestoreOperation) Run(ctx context.Context) error {
|
func (op *RestoreOperation) Run(ctx context.Context) (err error) {
|
||||||
// TODO: persist initial state of restoreOperation in modelstore
|
// TODO: persist initial state of restoreOperation in modelstore
|
||||||
|
|
||||||
// persist operation results to the model store on exit
|
// persist operation results to the model store on exit
|
||||||
opStats := restoreStats{}
|
opStats := restoreStats{}
|
||||||
defer op.persistResults(time.Now(), &opStats)
|
defer func() {
|
||||||
|
err = op.persistResults(time.Now(), &opStats)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: persist results?
|
||||||
|
}()
|
||||||
|
|
||||||
// retrieve the restore point details
|
// retrieve the restore point details
|
||||||
d, b, err := op.store.GetDetailsFromBackupID(ctx, op.BackupID)
|
d, b, err := op.store.GetDetailsFromBackupID(ctx, op.BackupID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
opStats.readErr = errors.Wrap(err, "getting backup details for restore")
|
err = errors.Wrap(err, "getting backup details for restore")
|
||||||
return opStats.readErr
|
opStats.readErr = err
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
er, err := op.Selectors.ToExchangeRestore()
|
er, err := op.Selectors.ToExchangeRestore()
|
||||||
@ -111,22 +121,27 @@ func (op *RestoreOperation) Run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
dcs, err := op.kopia.RestoreMultipleItems(ctx, b.SnapshotID, paths)
|
dcs, err := op.kopia.RestoreMultipleItems(ctx, b.SnapshotID, paths)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
opStats.readErr = errors.Wrap(err, "retrieving service data")
|
err = errors.Wrap(err, "retrieving service data")
|
||||||
return opStats.readErr
|
opStats.readErr = err
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
opStats.cs = dcs
|
opStats.cs = dcs
|
||||||
|
|
||||||
// restore those collections using graph
|
// restore those collections using graph
|
||||||
gc, err := connector.NewGraphConnector(op.account)
|
gc, err := connector.NewGraphConnector(op.account)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
opStats.writeErr = errors.Wrap(err, "connecting to graph api")
|
err = errors.Wrap(err, "connecting to graph api")
|
||||||
return opStats.writeErr
|
opStats.writeErr = err
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := gc.RestoreMessages(ctx, dcs); err != nil {
|
err = gc.RestoreMessages(ctx, dcs)
|
||||||
opStats.writeErr = errors.Wrap(err, "restoring service data")
|
if err != nil {
|
||||||
return opStats.writeErr
|
err = errors.Wrap(err, "restoring service data")
|
||||||
|
opStats.writeErr = err
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
opStats.started = true
|
||||||
opStats.gc = gc.AwaitStatus()
|
opStats.gc = gc.AwaitStatus()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -136,23 +151,24 @@ func (op *RestoreOperation) Run(ctx context.Context) error {
|
|||||||
func (op *RestoreOperation) persistResults(
|
func (op *RestoreOperation) persistResults(
|
||||||
started time.Time,
|
started time.Time,
|
||||||
opStats *restoreStats,
|
opStats *restoreStats,
|
||||||
) {
|
) error {
|
||||||
op.Status = Completed
|
|
||||||
if (opStats.readErr != nil || opStats.writeErr != nil) &&
|
|
||||||
(opStats.gc == nil || opStats.gc.Successful == 0) {
|
|
||||||
op.Status = Failed
|
|
||||||
}
|
|
||||||
op.Results.ReadErrors = opStats.readErr
|
|
||||||
op.Results.WriteErrors = opStats.writeErr
|
|
||||||
|
|
||||||
op.Results.ItemsRead = len(opStats.cs) // TODO: file count, not collection count
|
|
||||||
|
|
||||||
if opStats.gc != nil {
|
|
||||||
op.Results.ItemsWritten = opStats.gc.Successful
|
|
||||||
}
|
|
||||||
|
|
||||||
op.Results.StartedAt = started
|
op.Results.StartedAt = started
|
||||||
op.Results.CompletedAt = time.Now()
|
op.Results.CompletedAt = time.Now()
|
||||||
|
|
||||||
// TODO: persist operation to modelstore
|
op.Status = Completed
|
||||||
|
if !opStats.started {
|
||||||
|
op.Status = Failed
|
||||||
|
op.Status = Failed
|
||||||
|
return multierror.Append(
|
||||||
|
errors.New("errors prevented the operation from processing"),
|
||||||
|
opStats.readErr,
|
||||||
|
opStats.writeErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
op.Results.ReadErrors = opStats.readErr
|
||||||
|
op.Results.WriteErrors = opStats.writeErr
|
||||||
|
op.Results.ItemsRead = len(opStats.cs) // TODO: file count, not collection count
|
||||||
|
op.Results.ItemsWritten = opStats.gc.Successful
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -46,6 +46,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
|
|||||||
acct = account.Account{}
|
acct = account.Account{}
|
||||||
now = time.Now()
|
now = time.Now()
|
||||||
stats = restoreStats{
|
stats = restoreStats{
|
||||||
|
started: true,
|
||||||
readErr: multierror.Append(nil, assert.AnError),
|
readErr: multierror.Append(nil, assert.AnError),
|
||||||
writeErr: assert.AnError,
|
writeErr: assert.AnError,
|
||||||
cs: []data.Collection{&exchange.Collection{}},
|
cs: []data.Collection{&exchange.Collection{}},
|
||||||
@ -58,9 +59,9 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
|
|||||||
op, err := NewRestoreOperation(ctx, control.Options{}, kw, sw, acct, "foo", selectors.Selector{})
|
op, err := NewRestoreOperation(ctx, control.Options{}, kw, sw, acct, "foo", selectors.Selector{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
op.persistResults(now, &stats)
|
require.NoError(t, op.persistResults(now, &stats))
|
||||||
|
|
||||||
assert.Equal(t, op.Status, Failed, "status")
|
assert.Equal(t, op.Status.String(), Completed.String(), "status")
|
||||||
assert.Equal(t, op.Results.ItemsRead, len(stats.cs), "items read")
|
assert.Equal(t, op.Results.ItemsRead, len(stats.cs), "items read")
|
||||||
assert.Equal(t, op.Results.ReadErrors, stats.readErr, "read errors")
|
assert.Equal(t, op.Results.ReadErrors, stats.readErr, "read errors")
|
||||||
assert.Equal(t, op.Results.ItemsWritten, stats.gc.Successful, "items written")
|
assert.Equal(t, op.Results.ItemsWritten, stats.gc.Successful, "items written")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user