use gc status to populate op results (#287)

Exports access to the graphconnector status prop
so that restore and backup ops can track those metrics.
This commit is contained in:
Keepers 2022-07-07 13:10:45 -06:00 committed by GitHub
parent 4e7c9bd1e8
commit 503fca14f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 128 additions and 88 deletions

View File

@ -1,8 +1,6 @@
package backup package backup
import ( import (
"fmt"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/pflag" "github.com/spf13/pflag"
@ -79,12 +77,13 @@ func createExchangeCmd(cmd *cobra.Command, args []string) error {
return errors.Wrap(err, "Failed to initialize Exchange backup") return errors.Wrap(err, "Failed to initialize Exchange backup")
} }
result, err := bo.Run(ctx) err = bo.Run(ctx)
if err != nil { if err != nil {
return errors.Wrap(err, "Failed to run Exchange backup") return errors.Wrap(err, "Failed to run Exchange backup")
} }
fmt.Printf("Backed up restore point %s in %s for Exchange user %s.\n", result.SnapshotID, s.Provider, user) // todo: revive when restorePoints are hooked up to backupOperation results
// fmt.Printf("Backed up restore point %s in %s for Exchange user %s.\n", result.SnapshotID, s.Provider, user)
return nil return nil
} }

View File

@ -286,7 +286,7 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([
status, err := support.CreateStatus(support.Backup, totalItems, success, len(folderList), errs) status, err := support.CreateStatus(support.Backup, totalItems, success, len(folderList), errs)
if err == nil { if err == nil {
gc.SetStatus(*status) gc.SetStatus(*status)
logger.Ctx(ctx).Debugw(gc.Status()) logger.Ctx(ctx).Debugw(gc.PrintableStatus())
} }
return collections, errs return collections, errs
} }
@ -371,7 +371,13 @@ func (gc *GraphConnector) SetStatus(cos support.ConnectorOperationStatus) {
gc.status = &cos gc.status = &cos
} }
func (gc *GraphConnector) Status() string { // Status returns the current status of the graphConnector operaion.
func (gc *GraphConnector) Status() *support.ConnectorOperationStatus {
return gc.status
}
// PrintableStatus returns a string formatted version of the GC status.
func (gc *GraphConnector) PrintableStatus() string {
if gc.status == nil { if gc.status == nil {
return "" return ""
} }

View File

@ -170,12 +170,13 @@ func (suite *DisconnectedGraphConnectorSuite) TestInterfaceAlignment() {
func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_Status() { func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_Status() {
gc := GraphConnector{} gc := GraphConnector{}
suite.Equal(len(gc.Status()), 0) suite.Equal(len(gc.PrintableStatus()), 0)
status, err := support.CreateStatus(support.Restore, 12, 9, 8, status, err := support.CreateStatus(support.Restore, 12, 9, 8,
support.WrapAndAppend("tres", errors.New("three"), support.WrapAndAppend("arc376", errors.New("one"), errors.New("two")))) support.WrapAndAppend("tres", errors.New("three"), support.WrapAndAppend("arc376", errors.New("one"), errors.New("two"))))
assert.NoError(suite.T(), err) assert.NoError(suite.T(), err)
gc.SetStatus(*status) gc.SetStatus(*status)
suite.Greater(len(gc.Status()), 0) suite.Greater(len(gc.PrintableStatus()), 0)
suite.Greater(gc.Status().ObjectCount, 0)
} }
func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_ErrorChecking() { func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_ErrorChecking() {
tests := []struct { tests := []struct {

View File

@ -7,7 +7,7 @@ import (
type ConnectorOperationStatus struct { type ConnectorOperationStatus struct {
lastOperation Operation lastOperation Operation
objectCount int ObjectCount int
folderCount int folderCount int
successful int successful int
errorCount int errorCount int
@ -33,14 +33,14 @@ func CreateStatus(op Operation, objects, success, folders int, err error) (*Conn
} }
status := ConnectorOperationStatus{ status := ConnectorOperationStatus{
lastOperation: op, lastOperation: op,
objectCount: objects, ObjectCount: objects,
folderCount: folders, folderCount: folders,
successful: success, successful: success,
errorCount: GetNumberOfErrors(err), errorCount: GetNumberOfErrors(err),
incomplete: hasErrors, incomplete: hasErrors,
incompleteReason: reason, incompleteReason: reason,
} }
if status.objectCount != status.errorCount+status.successful { if status.ObjectCount != status.errorCount+status.successful {
return nil, errors.New("incorrect total on initialization") return nil, errors.New("incorrect total on initialization")
} }
return &status, nil return &status, nil
@ -48,7 +48,7 @@ func CreateStatus(op Operation, objects, success, folders int, err error) (*Conn
func (cos *ConnectorOperationStatus) String() string { func (cos *ConnectorOperationStatus) String() string {
message := fmt.Sprintf("Action: %s performed on %d of %d objects within %d directories.", cos.lastOperation.String(), message := fmt.Sprintf("Action: %s performed on %d of %d objects within %d directories.", cos.lastOperation.String(),
cos.successful, cos.objectCount, cos.folderCount) cos.successful, cos.ObjectCount, cos.folderCount)
if cos.incomplete { if cos.incomplete {
message += " " + cos.incompleteReason message += " " + cos.incompleteReason
} }

View File

@ -4,10 +4,10 @@ import (
"context" "context"
"time" "time"
multierror "github.com/hashicorp/go-multierror"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/kopia" "github.com/alcionai/corso/internal/kopia"
"github.com/alcionai/corso/pkg/account" "github.com/alcionai/corso/pkg/account"
) )
@ -55,55 +55,68 @@ func (op BackupOperation) validate() error {
return op.operation.validate() return op.operation.validate()
} }
// aggregates stats from the backup.Run().
// primarily used so that the defer can take in a
// pointer wrapping the values, while those values
// get populated asynchronously.
type backupStats struct {
k *kopia.BackupStats
gc *support.ConnectorOperationStatus
readErr, writeErr error
}
// Run begins a synchronous backup operation. // Run begins a synchronous backup operation.
func (op *BackupOperation) Run(ctx context.Context) (*kopia.BackupStats, error) { func (op *BackupOperation) Run(ctx context.Context) error {
// TODO: persist initial state of backupOperation in modelstore // TODO: persist initial state of backupOperation in modelstore
var (
cs []connector.DataCollection
stats = &kopia.BackupStats{}
readErr, writeErr error
)
// persist operation results to the model store on exit // persist operation results to the model store on exit
defer op.persistResults(time.Now(), cs, stats, readErr, writeErr) stats := backupStats{}
defer op.persistResults(time.Now(), &stats)
// retrieve data from the producer
gc, err := connector.NewGraphConnector(op.account) gc, err := connector.NewGraphConnector(op.account)
if err != nil { if err != nil {
readErr = multierror.Append(readErr, err) stats.readErr = err
return nil, errors.Wrap(err, "connecting to graph api") return errors.Wrap(err, "connecting to graph api")
} }
var cs []connector.DataCollection
cs, err = gc.ExchangeDataCollection(ctx, op.Targets[0]) cs, err = gc.ExchangeDataCollection(ctx, op.Targets[0])
if err != nil { if err != nil {
readErr = multierror.Append(readErr, err) stats.readErr = err
return nil, errors.Wrap(err, "retrieving service data") return errors.Wrap(err, "retrieving service data")
}
stats.gc = gc.Status()
// hand the results to the consumer
stats.k, err = op.kopia.BackupCollections(ctx, cs)
if err != nil {
stats.writeErr = err
return errors.Wrap(err, "backing up service data")
} }
stats, writeErr = op.kopia.BackupCollections(ctx, cs) return nil
if writeErr != nil {
return nil, errors.Wrap(err, "backing up service data")
}
return stats, nil
} }
// writes the backupOperation outcome to the modelStore. // writes the backupOperation outcome to the modelStore.
func (op *BackupOperation) persistResults( func (op *BackupOperation) persistResults(
started time.Time, started time.Time,
cs []connector.DataCollection, stats *backupStats,
stats *kopia.BackupStats,
readErr, writeErr error,
) { ) {
op.Status = Successful op.Status = Successful
if readErr != nil || writeErr != nil { if stats.readErr != nil || stats.writeErr != nil {
op.Status = Failed op.Status = Failed
} }
op.Results.ItemsRead = len(cs) // TODO: file count, not collection count op.Results.ReadErrors = stats.readErr
op.Results.ReadErrors = readErr op.Results.WriteErrors = stats.writeErr
op.Results.ItemsWritten = stats.TotalFileCount
op.Results.WriteErrors = writeErr if stats.gc != nil {
op.Results.ItemsRead = stats.gc.ObjectCount
}
if stats.k != nil {
op.Results.ItemsWritten = stats.k.TotalFileCount
}
op.Results.StartedAt = started op.Results.StartedAt = started
op.Results.CompletedAt = time.Now() op.Results.CompletedAt = time.Now()

View File

@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/kopia" "github.com/alcionai/corso/internal/kopia"
ctesting "github.com/alcionai/corso/internal/testing" ctesting "github.com/alcionai/corso/internal/testing"
"github.com/alcionai/corso/pkg/account" "github.com/alcionai/corso/pkg/account"
@ -35,27 +35,31 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
ctx := context.Background() ctx := context.Background()
var ( var (
kw = &kopia.Wrapper{} kw = &kopia.Wrapper{}
acct = account.Account{} acct = account.Account{}
now = time.Now() now = time.Now()
cs = []connector.DataCollection{&connector.ExchangeDataCollection{}} stats = backupStats{
readErrs = multierror.Append(nil, assert.AnError) readErr: multierror.Append(nil, assert.AnError),
writeErrs = assert.AnError writeErr: assert.AnError,
stats = &kopia.BackupStats{ k: &kopia.BackupStats{
TotalFileCount: 1, TotalFileCount: 1,
},
gc: &support.ConnectorOperationStatus{
ObjectCount: 1,
},
} }
) )
op, err := NewBackupOperation(ctx, Options{}, kw, acct, nil) op, err := NewBackupOperation(ctx, Options{}, kw, acct, nil)
require.NoError(t, err) require.NoError(t, err)
op.persistResults(now, cs, stats, readErrs, writeErrs) op.persistResults(now, &stats)
assert.Equal(t, op.Status, Failed) assert.Equal(t, op.Status, Failed)
assert.Equal(t, op.Results.ItemsRead, len(cs)) assert.Equal(t, op.Results.ItemsRead, stats.gc.ObjectCount)
assert.Equal(t, op.Results.ReadErrors, readErrs) assert.Equal(t, op.Results.ReadErrors, stats.readErr)
assert.Equal(t, op.Results.ItemsWritten, stats.TotalFileCount) assert.Equal(t, op.Results.ItemsWritten, stats.k.TotalFileCount)
assert.Equal(t, op.Results.WriteErrors, writeErrs) assert.Equal(t, op.Results.WriteErrors, stats.writeErr)
assert.Equal(t, op.Results.StartedAt, now) assert.Equal(t, op.Results.StartedAt, now)
assert.Less(t, now, op.Results.CompletedAt) assert.Less(t, now, op.Results.CompletedAt)
} }
@ -147,10 +151,11 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
[]string{m365User}) []string{m365User})
require.NoError(t, err) require.NoError(t, err)
stats, err := bo.Run(ctx) require.NoError(t, bo.Run(ctx))
require.NoError(t, err) require.NotEmpty(t, bo.Results)
require.NotNil(t, stats)
assert.Equal(t, bo.Status, Successful) assert.Equal(t, bo.Status, Successful)
assert.Greater(t, stats.TotalFileCount, 0) assert.Greater(t, bo.Results.ItemsRead, 0)
assert.Zero(t, stats.ErrorCount) assert.Greater(t, bo.Results.ItemsWritten, 0)
assert.Zero(t, bo.Results.ReadErrors)
assert.Zero(t, bo.Results.WriteErrors)
} }

View File

@ -4,10 +4,10 @@ import (
"context" "context"
"time" "time"
multierror "github.com/hashicorp/go-multierror"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/kopia" "github.com/alcionai/corso/internal/kopia"
"github.com/alcionai/corso/pkg/account" "github.com/alcionai/corso/pkg/account"
) )
@ -57,32 +57,40 @@ func (op RestoreOperation) validate() error {
return op.operation.validate() return op.operation.validate()
} }
// aggregates stats from the restore.Run().
// primarily used so that the defer can take in a
// pointer wrapping the values, while those values
// get populated asynchronously.
type restoreStats struct {
cs []connector.DataCollection
gc *support.ConnectorOperationStatus
readErr, writeErr error
}
// Run begins a synchronous restore operation. // Run begins a synchronous restore operation.
// todo (keepers): return stats block in first param. // todo (keepers): return stats block in first param.
func (op *RestoreOperation) Run(ctx context.Context) error { func (op *RestoreOperation) Run(ctx context.Context) error {
// TODO: persist initial state of restoreOperation in modelstore // TODO: persist initial state of restoreOperation in modelstore
var (
cs []connector.DataCollection
readErr, writeErr error
)
// persist operation results to the model store on exit // persist operation results to the model store on exit
defer op.persistResults(time.Now(), cs, readErr, writeErr) stats := restoreStats{}
defer op.persistResults(time.Now(), &stats)
dc, readErr := op.kopia.RestoreSingleItem(ctx, op.RestorePointID, op.Targets) dc, err := op.kopia.RestoreSingleItem(ctx, op.RestorePointID, op.Targets)
if readErr != nil { if err != nil {
return errors.Wrap(readErr, "retrieving service data") stats.readErr = err
return errors.Wrap(err, "retrieving service data")
} }
stats.cs = []connector.DataCollection{dc}
gc, err := connector.NewGraphConnector(op.account) gc, err := connector.NewGraphConnector(op.account)
if err != nil { if err != nil {
writeErr = multierror.Append(writeErr, err) stats.writeErr = err
return errors.Wrap(err, "connecting to graph api") return errors.Wrap(err, "connecting to graph api")
} }
if err := gc.RestoreMessages(ctx, dc); err != nil { if err := gc.RestoreMessages(ctx, dc); err != nil {
writeErr = multierror.Append(writeErr, err) stats.writeErr = err
return errors.Wrap(err, "restoring service data") return errors.Wrap(err, "restoring service data")
} }
@ -93,18 +101,20 @@ func (op *RestoreOperation) Run(ctx context.Context) error {
// writes the restoreOperation outcome to the modelStore. // writes the restoreOperation outcome to the modelStore.
func (op *RestoreOperation) persistResults( func (op *RestoreOperation) persistResults(
started time.Time, started time.Time,
cs []connector.DataCollection, stats *restoreStats,
readErr, writeErr error,
) { ) {
op.Status = Successful op.Status = Successful
if readErr != nil || writeErr != nil { if stats.readErr != nil || stats.writeErr != nil {
op.Status = Failed op.Status = Failed
} }
op.Results.ReadErrors = stats.readErr
op.Results.WriteErrors = stats.writeErr
op.Results.ItemsRead = len(cs) // TODO: file count, not collection count op.Results.ItemsRead = len(stats.cs) // TODO: file count, not collection count
op.Results.ReadErrors = readErr
op.Results.ItemsWritten = -1 // TODO: get write count from GC if stats.gc != nil {
op.Results.WriteErrors = writeErr op.Results.ItemsWritten = stats.gc.ObjectCount
}
op.Results.StartedAt = started op.Results.StartedAt = started
op.Results.CompletedAt = time.Now() op.Results.CompletedAt = time.Now()

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/kopia" "github.com/alcionai/corso/internal/kopia"
ctesting "github.com/alcionai/corso/internal/testing" ctesting "github.com/alcionai/corso/internal/testing"
"github.com/alcionai/corso/pkg/account" "github.com/alcionai/corso/pkg/account"
@ -35,24 +36,29 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
ctx := context.Background() ctx := context.Background()
var ( var (
kw = &kopia.Wrapper{} kw = &kopia.Wrapper{}
acct = account.Account{} acct = account.Account{}
now = time.Now() now = time.Now()
cs = []connector.DataCollection{&connector.ExchangeDataCollection{}} stats = restoreStats{
readErrs = multierror.Append(nil, assert.AnError) readErr: multierror.Append(nil, assert.AnError),
writeErrs = assert.AnError writeErr: assert.AnError,
cs: []connector.DataCollection{&connector.ExchangeDataCollection{}},
gc: &support.ConnectorOperationStatus{
ObjectCount: 1,
},
}
) )
op, err := NewRestoreOperation(ctx, Options{}, kw, acct, "foo", nil) op, err := NewRestoreOperation(ctx, Options{}, kw, acct, "foo", nil)
require.NoError(t, err) require.NoError(t, err)
op.persistResults(now, cs, readErrs, writeErrs) op.persistResults(now, &stats)
assert.Equal(t, op.Status, Failed) assert.Equal(t, op.Status, Failed)
assert.Equal(t, op.Results.ItemsRead, len(cs)) assert.Equal(t, op.Results.ItemsRead, len(stats.cs))
assert.Equal(t, op.Results.ReadErrors, readErrs) assert.Equal(t, op.Results.ReadErrors, stats.readErr)
assert.Equal(t, op.Results.ItemsWritten, -1) assert.Equal(t, op.Results.ItemsWritten, stats.gc.ObjectCount)
assert.Equal(t, op.Results.WriteErrors, writeErrs) assert.Equal(t, op.Results.WriteErrors, stats.writeErr)
assert.Equal(t, op.Results.StartedAt, now) assert.Equal(t, op.Results.StartedAt, now)
assert.Less(t, now, op.Results.CompletedAt) assert.Less(t, now, op.Results.CompletedAt)
} }