add restore, backup result persistence skeleton (#275)
e2e wiring of persistence is not yet complete. Will need modelstore integration, and additional information about file and error counts from kw and gc.
This commit is contained in:
parent
26fc4c9075
commit
8725cacc22
@ -2,7 +2,9 @@ package operations
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/alcionai/corso/internal/connector"
|
"github.com/alcionai/corso/internal/connector"
|
||||||
@ -55,22 +57,56 @@ func (op BackupOperation) validate() error {
|
|||||||
|
|
||||||
// Run begins a synchronous backup operation.
|
// Run begins a synchronous backup operation.
|
||||||
func (op *BackupOperation) Run(ctx context.Context) (*kopia.BackupStats, error) {
|
func (op *BackupOperation) Run(ctx context.Context) (*kopia.BackupStats, error) {
|
||||||
|
// TODO: persist initial state of backupOperation in modelstore
|
||||||
|
|
||||||
|
var (
|
||||||
|
cs []connector.DataCollection
|
||||||
|
stats = &kopia.BackupStats{}
|
||||||
|
readErr, writeErr error
|
||||||
|
)
|
||||||
|
|
||||||
|
// persist operation results to the model store on exit
|
||||||
|
defer op.persistResults(time.Now(), cs, stats, readErr, writeErr)
|
||||||
|
|
||||||
gc, err := connector.NewGraphConnector(op.account)
|
gc, err := connector.NewGraphConnector(op.account)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
readErr = multierror.Append(readErr, err)
|
||||||
return nil, errors.Wrap(err, "connecting to graph api")
|
return nil, errors.Wrap(err, "connecting to graph api")
|
||||||
}
|
}
|
||||||
|
|
||||||
cs, err := gc.ExchangeDataCollection(ctx, op.Targets[0])
|
cs, err = gc.ExchangeDataCollection(ctx, op.Targets[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
readErr = multierror.Append(readErr, err)
|
||||||
return nil, errors.Wrap(err, "retrieving service data")
|
return nil, errors.Wrap(err, "retrieving service data")
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: utilize stats
|
stats, writeErr = op.kopia.BackupCollections(ctx, cs)
|
||||||
stats, err := op.kopia.BackupCollections(ctx, cs)
|
if writeErr != nil {
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "backing up service data")
|
return nil, errors.Wrap(err, "backing up service data")
|
||||||
}
|
}
|
||||||
|
|
||||||
op.Status = Successful
|
|
||||||
return stats, nil
|
return stats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// writes the backupOperation outcome to the modelStore.
|
||||||
|
func (op *BackupOperation) persistResults(
|
||||||
|
started time.Time,
|
||||||
|
cs []connector.DataCollection,
|
||||||
|
stats *kopia.BackupStats,
|
||||||
|
readErr, writeErr error,
|
||||||
|
) {
|
||||||
|
op.Status = Successful
|
||||||
|
if readErr != nil || writeErr != nil {
|
||||||
|
op.Status = Failed
|
||||||
|
}
|
||||||
|
|
||||||
|
op.Results.ItemsRead = len(cs) // TODO: file count, not collection count
|
||||||
|
op.Results.ReadErrors = readErr
|
||||||
|
op.Results.ItemsWritten = stats.TotalFileCount
|
||||||
|
op.Results.WriteErrors = writeErr
|
||||||
|
|
||||||
|
op.Results.StartedAt = started
|
||||||
|
op.Results.CompletedAt = time.Now()
|
||||||
|
|
||||||
|
// TODO: persist operation to modelstore
|
||||||
|
}
|
||||||
|
|||||||
@ -1,20 +1,69 @@
|
|||||||
package operations_test
|
package operations
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/internal/connector"
|
||||||
"github.com/alcionai/corso/internal/kopia"
|
"github.com/alcionai/corso/internal/kopia"
|
||||||
"github.com/alcionai/corso/internal/operations"
|
|
||||||
ctesting "github.com/alcionai/corso/internal/testing"
|
ctesting "github.com/alcionai/corso/internal/testing"
|
||||||
"github.com/alcionai/corso/pkg/account"
|
"github.com/alcionai/corso/pkg/account"
|
||||||
"github.com/alcionai/corso/pkg/repository"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// unit
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type BackupOpSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBackupOpSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(BackupOpSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: after modelStore integration is added, mock the store and/or
|
||||||
|
// move this to an integration test.
|
||||||
|
func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
|
||||||
|
t := suite.T()
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
var (
|
||||||
|
kw = &kopia.KopiaWrapper{}
|
||||||
|
acct = account.Account{}
|
||||||
|
now = time.Now()
|
||||||
|
cs = []connector.DataCollection{&connector.ExchangeDataCollection{}}
|
||||||
|
readErrs = multierror.Append(nil, assert.AnError)
|
||||||
|
writeErrs = assert.AnError
|
||||||
|
stats = &kopia.BackupStats{
|
||||||
|
TotalFileCount: 1,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
op, err := NewBackupOperation(ctx, Options{}, kw, acct, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
op.persistResults(now, cs, stats, readErrs, writeErrs)
|
||||||
|
|
||||||
|
assert.Equal(t, op.Status, Failed)
|
||||||
|
assert.Equal(t, op.Results.ItemsRead, len(cs))
|
||||||
|
assert.Equal(t, op.Results.ReadErrors, readErrs)
|
||||||
|
assert.Equal(t, op.Results.ItemsWritten, stats.TotalFileCount)
|
||||||
|
assert.Equal(t, op.Results.WriteErrors, writeErrs)
|
||||||
|
assert.Equal(t, op.Results.StartedAt, now)
|
||||||
|
assert.Less(t, now, op.Results.CompletedAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// integration
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
type BackupOpIntegrationSuite struct {
|
type BackupOpIntegrationSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
}
|
}
|
||||||
@ -43,20 +92,20 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
|||||||
|
|
||||||
table := []struct {
|
table := []struct {
|
||||||
name string
|
name string
|
||||||
opts operations.Options
|
opts Options
|
||||||
kw *kopia.KopiaWrapper
|
kw *kopia.KopiaWrapper
|
||||||
acct account.Account
|
acct account.Account
|
||||||
targets []string
|
targets []string
|
||||||
errCheck assert.ErrorAssertionFunc
|
errCheck assert.ErrorAssertionFunc
|
||||||
}{
|
}{
|
||||||
{"good", operations.Options{}, kw, acct, nil, assert.NoError},
|
{"good", Options{}, kw, acct, nil, assert.NoError},
|
||||||
{"missing kopia", operations.Options{}, nil, acct, nil, assert.Error},
|
{"missing kopia", Options{}, nil, acct, nil, assert.Error},
|
||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
suite.T().Run(test.name, func(t *testing.T) {
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
_, err := operations.NewBackupOperation(
|
_, err := NewBackupOperation(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
operations.Options{},
|
Options{},
|
||||||
test.kw,
|
test.kw,
|
||||||
test.acct,
|
test.acct,
|
||||||
nil)
|
nil)
|
||||||
@ -80,16 +129,21 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
|
|||||||
st, err := ctesting.NewPrefixedS3Storage(t)
|
st, err := ctesting.NewPrefixedS3Storage(t)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
r, err := repository.Initialize(ctx, acct, st)
|
k := kopia.New(st)
|
||||||
require.NoError(t, err)
|
require.NoError(t, k.Initialize(ctx))
|
||||||
|
|
||||||
bo, err := r.NewBackup(ctx, []string{m365User})
|
bo, err := NewBackupOperation(
|
||||||
|
ctx,
|
||||||
|
Options{},
|
||||||
|
k,
|
||||||
|
acct,
|
||||||
|
[]string{m365User})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
stats, err := bo.Run(ctx)
|
stats, err := bo.Run(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, stats)
|
require.NotNil(t, stats)
|
||||||
assert.Equal(t, bo.Status, operations.Successful)
|
assert.Equal(t, bo.Status, Successful)
|
||||||
assert.Greater(t, stats.TotalFileCount, 0)
|
assert.Greater(t, stats.TotalFileCount, 0)
|
||||||
assert.Zero(t, stats.ErrorCount)
|
assert.Zero(t, stats.ErrorCount)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,7 +4,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
multierror "github.com/hashicorp/go-multierror"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/alcionai/corso/internal/kopia"
|
"github.com/alcionai/corso/internal/kopia"
|
||||||
@ -68,10 +67,10 @@ func (op operation) validate() error {
|
|||||||
// Summary tracks the total files touched and errors produced
|
// Summary tracks the total files touched and errors produced
|
||||||
// during an operation.
|
// during an operation.
|
||||||
type summary struct {
|
type summary struct {
|
||||||
ItemsRead int `json:"itemsRead,omitempty"`
|
ItemsRead int `json:"itemsRead,omitempty"`
|
||||||
ItemsWritten int `json:"itemsWritten,omitempty"`
|
ItemsWritten int `json:"itemsWritten,omitempty"`
|
||||||
ReadErrors multierror.Error `json:"readErrors,omitempty"`
|
ReadErrors error `json:"readErrors,omitempty"`
|
||||||
WriteErrors multierror.Error `json:"writeErrors,omitempty"`
|
WriteErrors error `json:"writeErrors,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Metrics tracks performance details such as timing, throughput, etc.
|
// Metrics tracks performance details such as timing, throughput, etc.
|
||||||
|
|||||||
@ -2,7 +2,9 @@ package operations
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/alcionai/corso/internal/connector"
|
"github.com/alcionai/corso/internal/connector"
|
||||||
@ -58,20 +60,54 @@ func (op RestoreOperation) validate() error {
|
|||||||
// Run begins a synchronous restore operation.
|
// Run begins a synchronous restore operation.
|
||||||
// todo (keepers): return stats block in first param.
|
// todo (keepers): return stats block in first param.
|
||||||
func (op *RestoreOperation) Run(ctx context.Context) error {
|
func (op *RestoreOperation) Run(ctx context.Context) error {
|
||||||
dc, err := op.kopia.RestoreSingleItem(ctx, op.RestorePointID, op.Targets)
|
// TODO: persist initial state of restoreOperation in modelstore
|
||||||
if err != nil {
|
|
||||||
return errors.Wrap(err, "retrieving service data")
|
var (
|
||||||
|
cs []connector.DataCollection
|
||||||
|
readErr, writeErr error
|
||||||
|
)
|
||||||
|
|
||||||
|
// persist operation results to the model store on exit
|
||||||
|
defer op.persistResults(time.Now(), cs, readErr, writeErr)
|
||||||
|
|
||||||
|
dc, readErr := op.kopia.RestoreSingleItem(ctx, op.RestorePointID, op.Targets)
|
||||||
|
if readErr != nil {
|
||||||
|
return errors.Wrap(readErr, "retrieving service data")
|
||||||
}
|
}
|
||||||
|
|
||||||
gc, err := connector.NewGraphConnector(op.account)
|
gc, err := connector.NewGraphConnector(op.account)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
writeErr = multierror.Append(writeErr, err)
|
||||||
return errors.Wrap(err, "connecting to graph api")
|
return errors.Wrap(err, "connecting to graph api")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := gc.RestoreMessages(ctx, dc); err != nil {
|
if err := gc.RestoreMessages(ctx, dc); err != nil {
|
||||||
|
writeErr = multierror.Append(writeErr, err)
|
||||||
return errors.Wrap(err, "restoring service data")
|
return errors.Wrap(err, "restoring service data")
|
||||||
}
|
}
|
||||||
|
|
||||||
op.Status = Successful
|
op.Status = Successful
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// writes the restoreOperation outcome to the modelStore.
|
||||||
|
func (op *RestoreOperation) persistResults(
|
||||||
|
started time.Time,
|
||||||
|
cs []connector.DataCollection,
|
||||||
|
readErr, writeErr error,
|
||||||
|
) {
|
||||||
|
op.Status = Successful
|
||||||
|
if readErr != nil || writeErr != nil {
|
||||||
|
op.Status = Failed
|
||||||
|
}
|
||||||
|
|
||||||
|
op.Results.ItemsRead = len(cs) // TODO: file count, not collection count
|
||||||
|
op.Results.ReadErrors = readErr
|
||||||
|
op.Results.ItemsWritten = -1 // TODO: get write count from GC
|
||||||
|
op.Results.WriteErrors = writeErr
|
||||||
|
|
||||||
|
op.Results.StartedAt = started
|
||||||
|
op.Results.CompletedAt = time.Now()
|
||||||
|
|
||||||
|
// TODO: persist operation to modelstore
|
||||||
|
}
|
||||||
|
|||||||
@ -1,19 +1,66 @@
|
|||||||
package operations_test
|
package operations
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/internal/connector"
|
||||||
"github.com/alcionai/corso/internal/kopia"
|
"github.com/alcionai/corso/internal/kopia"
|
||||||
"github.com/alcionai/corso/internal/operations"
|
|
||||||
ctesting "github.com/alcionai/corso/internal/testing"
|
ctesting "github.com/alcionai/corso/internal/testing"
|
||||||
"github.com/alcionai/corso/pkg/account"
|
"github.com/alcionai/corso/pkg/account"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// unit
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type RestoreOpSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRestoreOpSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(RestoreOpSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: after modelStore integration is added, mock the store and/or
|
||||||
|
// move this to an integration test.
|
||||||
|
func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
|
||||||
|
t := suite.T()
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
var (
|
||||||
|
kw = &kopia.KopiaWrapper{}
|
||||||
|
acct = account.Account{}
|
||||||
|
now = time.Now()
|
||||||
|
cs = []connector.DataCollection{&connector.ExchangeDataCollection{}}
|
||||||
|
readErrs = multierror.Append(nil, assert.AnError)
|
||||||
|
writeErrs = assert.AnError
|
||||||
|
)
|
||||||
|
|
||||||
|
op, err := NewRestoreOperation(ctx, Options{}, kw, acct, "foo", nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
op.persistResults(now, cs, readErrs, writeErrs)
|
||||||
|
|
||||||
|
assert.Equal(t, op.Status, Failed)
|
||||||
|
assert.Equal(t, op.Results.ItemsRead, len(cs))
|
||||||
|
assert.Equal(t, op.Results.ReadErrors, readErrs)
|
||||||
|
assert.Equal(t, op.Results.ItemsWritten, -1)
|
||||||
|
assert.Equal(t, op.Results.WriteErrors, writeErrs)
|
||||||
|
assert.Equal(t, op.Results.StartedAt, now)
|
||||||
|
assert.Less(t, now, op.Results.CompletedAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// integration
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
type RestoreOpIntegrationSuite struct {
|
type RestoreOpIntegrationSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
}
|
}
|
||||||
@ -37,20 +84,20 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
|
|||||||
|
|
||||||
table := []struct {
|
table := []struct {
|
||||||
name string
|
name string
|
||||||
opts operations.Options
|
opts Options
|
||||||
kw *kopia.KopiaWrapper
|
kw *kopia.KopiaWrapper
|
||||||
acct account.Account
|
acct account.Account
|
||||||
targets []string
|
targets []string
|
||||||
errCheck assert.ErrorAssertionFunc
|
errCheck assert.ErrorAssertionFunc
|
||||||
}{
|
}{
|
||||||
{"good", operations.Options{}, kw, acct, nil, assert.NoError},
|
{"good", Options{}, kw, acct, nil, assert.NoError},
|
||||||
{"missing kopia", operations.Options{}, nil, acct, nil, assert.Error},
|
{"missing kopia", Options{}, nil, acct, nil, assert.Error},
|
||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
suite.T().Run(test.name, func(t *testing.T) {
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
_, err := operations.NewRestoreOperation(
|
_, err := NewRestoreOperation(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
operations.Options{},
|
Options{},
|
||||||
test.kw,
|
test.kw,
|
||||||
test.acct,
|
test.acct,
|
||||||
"restore-point-id",
|
"restore-point-id",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user