GC Status Progress Detail Extension (#1001)

## Description
Feature to show the number of bytes and additional details for backup / restore progress from GraphConncector
Full description [here](https://www.notion.so/alcion/Corso-Testing-Notes-b9867ac719d8459d8b46dbc7b07b33e0#da3869278b434b0398e7d245554b609b)
<!-- Insert PR description-->

## Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature

## Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* closes #559<issue>

## Test Plan

<!-- How will this be tested prior to merging.-->
- [x] 💪 Manual

Feature Output changed to:

```go
graph_connector_test.go:137: Action: Backup performed on 5 of 5 objects (227.63KB) within 1 directories. Downloaded from Inbox
        
graph_connector_test.go:137: Action: Backup performed on 8 of 8 objects (231.28KB) within 2 directories. Downloaded from Inbox, Contacts

graph_connector_test.go:137: Action: Backup performed on 23 of 23 objects (309.36KB) within 3 directories. Downloaded from Inbox, Contacts, Calendar
```
This commit is contained in:
Danny 2022-10-03 19:25:26 -04:00 committed by GitHub
parent ae2f4e6712
commit c6a9d2feb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 215 additions and 115 deletions

View File

@ -60,6 +60,7 @@ require (
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.11 // indirect

View File

@ -189,6 +189,8 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf h1:FtEj8sfIcaaBfAKrE1Cwb61YDtYq9JxChK1c7AKce7s=
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf/go.mod h1:yrqSXGoD/4EKfF26AOGzscPOgTTJcyAwM2rpixWT+t4=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=

View File

@ -115,10 +115,11 @@ func (col *Collection) populateByOptionIdentifier(
var (
errs error
success int
totalBytes int64
)
defer func() {
col.finishPopulation(ctx, success, errs)
col.finishPopulation(ctx, success, totalBytes, errs)
}()
user := col.user
@ -144,7 +145,7 @@ func (col *Collection) populateByOptionIdentifier(
continue
}
err = serializeFunc(ctx, col.service.Client(), objectWriter, col.data, response, user)
byteCount, err := serializeFunc(ctx, col.service.Client(), objectWriter, col.data, response, user)
if err != nil {
errs = support.WrapAndAppendf(user, err, errs)
@ -156,15 +157,26 @@ func (col *Collection) populateByOptionIdentifier(
}
success++
totalBytes += int64(byteCount)
}
}
// terminatePopulateSequence is a utility function used to close a Collection's data channel
// and to send the status update through the channel.
func (col *Collection) finishPopulation(ctx context.Context, success int, errs error) {
func (col *Collection) finishPopulation(ctx context.Context, success int, totalBytes int64, errs error) {
close(col.data)
attempted := len(col.jobs)
status := support.CreateStatus(ctx, support.Backup, attempted, success, 1, errs)
status := support.CreateStatus(ctx,
support.Backup,
1,
support.CollectionMetrics{
Objects: attempted,
Successes: success,
TotalBytes: totalBytes,
},
errs,
col.fullPath.Folder())
logger.Ctx(ctx).Debug(status.String())
col.statusUpdater(status)
}
@ -178,7 +190,7 @@ type GraphSerializeFunc func(
dataChannel chan<- data.Stream,
parsable absser.Parsable,
user string,
) error
) (int, error)
// eventToDataCollection is a GraphSerializeFunc used to serialize models.Eventable objects into
// data.Stream objects. Returns an error the process finishes unsuccessfully.
@ -189,14 +201,14 @@ func eventToDataCollection(
dataChannel chan<- data.Stream,
parsable absser.Parsable,
user string,
) error {
) (int, error) {
var err error
defer objectWriter.Close()
event, ok := parsable.(models.Eventable)
if !ok {
return fmt.Errorf("expected Eventable, got %T", parsable)
return 0, fmt.Errorf("expected Eventable, got %T", parsable)
}
if *event.GetHasAttachments() {
@ -219,7 +231,7 @@ func eventToDataCollection(
if retriesErr != nil {
logger.Ctx(ctx).Debug("exceeded maximum retries")
return support.WrapAndAppend(
return 0, support.WrapAndAppend(
*event.GetId(),
errors.Wrap(retriesErr, "attachment failed"),
nil)
@ -228,19 +240,19 @@ func eventToDataCollection(
err = objectWriter.WriteObjectValue("", event)
if err != nil {
return support.SetNonRecoverableError(errors.Wrap(err, *event.GetId()))
return 0, support.SetNonRecoverableError(errors.Wrap(err, *event.GetId()))
}
byteArray, err := objectWriter.GetSerializedContent()
if err != nil {
return support.WrapAndAppend(*event.GetId(), errors.Wrap(err, "serializing content"), nil)
return 0, support.WrapAndAppend(*event.GetId(), errors.Wrap(err, "serializing content"), nil)
}
if byteArray != nil {
if len(byteArray) > 0 {
dataChannel <- &Stream{id: *event.GetId(), message: byteArray, info: EventInfo(event)}
}
return nil
return len(byteArray), nil
}
// contactToDataCollection is a GraphSerializeFunc for models.Contactable
@ -251,29 +263,29 @@ func contactToDataCollection(
dataChannel chan<- data.Stream,
parsable absser.Parsable,
user string,
) error {
) (int, error) {
defer objectWriter.Close()
contact, ok := parsable.(models.Contactable)
if !ok {
return fmt.Errorf("expected Contactable, got %T", parsable)
return 0, fmt.Errorf("expected Contactable, got %T", parsable)
}
err := objectWriter.WriteObjectValue("", contact)
if err != nil {
return support.SetNonRecoverableError(errors.Wrap(err, *contact.GetId()))
return 0, support.SetNonRecoverableError(errors.Wrap(err, *contact.GetId()))
}
byteArray, err := objectWriter.GetSerializedContent()
if err != nil {
return support.WrapAndAppend(*contact.GetId(), err, nil)
return 0, support.WrapAndAppend(*contact.GetId(), err, nil)
}
if byteArray != nil {
if len(byteArray) > 0 {
dataChannel <- &Stream{id: *contact.GetId(), message: byteArray, info: ContactInfo(contact)}
}
return nil
return len(byteArray), nil
}
// messageToDataCollection is the GraphSerializeFunc for models.Messageable
@ -284,21 +296,21 @@ func messageToDataCollection(
dataChannel chan<- data.Stream,
parsable absser.Parsable,
user string,
) error {
) (int, error) {
var err error
defer objectWriter.Close()
aMessage, ok := parsable.(models.Messageable)
if !ok {
return fmt.Errorf("expected Messageable, got %T", parsable)
return 0, fmt.Errorf("expected Messageable, got %T", parsable)
}
adtl := aMessage.GetAdditionalData()
if len(adtl) > 2 {
aMessage, err = support.ConvertFromMessageable(adtl, aMessage)
if err != nil {
return err
return 0, err
}
}
@ -322,24 +334,24 @@ func messageToDataCollection(
if retriesErr != nil {
logger.Ctx(ctx).Debug("exceeded maximum retries")
return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil)
return 0, support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil)
}
}
err = objectWriter.WriteObjectValue("", aMessage)
if err != nil {
return support.SetNonRecoverableError(errors.Wrapf(err, "%s", *aMessage.GetId()))
return 0, support.SetNonRecoverableError(errors.Wrapf(err, "%s", *aMessage.GetId()))
}
byteArray, err := objectWriter.GetSerializedContent()
if err != nil {
err = support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil)
return support.SetNonRecoverableError(err)
return 0, support.SetNonRecoverableError(err)
}
dataChannel <- &Stream{id: *aMessage.GetId(), message: byteArray, info: MessageInfo(aMessage)}
return nil
return len(byteArray), nil
}
// Stream represents a single item retrieved from exchange

View File

@ -292,7 +292,7 @@ func RestoreExchangeDataCollections(
var (
pathCounter = map[string]bool{}
rootFolder string
attempts, successes int
metrics support.CollectionMetrics
errs error
// TODO policy to be updated from external source after completion of refactoring
policy = control.Copy
@ -303,9 +303,10 @@ func RestoreExchangeDataCollections(
}
for _, dc := range dcs {
a, s, root, canceled := restoreCollection(ctx, gs, dc, rootFolder, pathCounter, dest, policy, errUpdater)
attempts += a
successes += s
temp, root, canceled := restoreCollection(ctx, gs, dc, rootFolder, pathCounter, dest, policy, errUpdater)
metrics.Combine(temp)
rootFolder = root
if canceled {
@ -313,7 +314,12 @@ func RestoreExchangeDataCollections(
}
}
status := support.CreateStatus(ctx, support.Restore, attempts, successes, len(pathCounter), errs)
status := support.CreateStatus(ctx,
support.Restore,
len(pathCounter),
metrics,
errs,
dest.ContainerName)
return status, errs
}
@ -328,12 +334,12 @@ func restoreCollection(
dest control.RestoreDestination,
policy control.CollisionPolicy,
errUpdater func(string, error),
) (int, int, string, bool) {
) (support.CollectionMetrics, string, bool) {
defer trace.StartRegion(ctx, "gc:exchange:restoreCollection").End()
trace.Log(ctx, "gc:exchange:restoreCollection", dc.FullPath().String())
var (
attempts, successes int
metrics support.CollectionMetrics
folderID string
err error
items = dc.Items()
@ -347,20 +353,20 @@ func restoreCollection(
folderID, root, err := directoryCheckFunc(ctx, err, directory.String(), rootFolder, pathCounter)
if err != nil { // assuming FailFast
errUpdater(directory.String(), err)
return 0, 0, rootFolder, false
return metrics, rootFolder, false
}
for {
select {
case <-ctx.Done():
errUpdater("context cancelled", ctx.Err())
return attempts, successes, root, true
return metrics, root, true
case itemData, ok := <-items:
if !ok {
return attempts, successes, root, false
return metrics, root, false
}
attempts++
metrics.Objects++
trace.Log(ctx, "gc:exchange:restoreCollection:item", itemData.UUID())
@ -372,7 +378,9 @@ func restoreCollection(
continue
}
err = RestoreExchangeObject(ctx, buf.Bytes(), category, policy, gs, folderID, user)
byteArray := buf.Bytes()
err = RestoreExchangeObject(ctx, byteArray, category, policy, gs, folderID, user)
if err != nil {
// More information to be here
errUpdater(
@ -381,7 +389,9 @@ func restoreCollection(
continue
}
successes++
metrics.TotalBytes += int64(len(byteArray))
metrics.Successes++
}
}
}

View File

@ -92,13 +92,18 @@ func (suite *DisconnectedGraphConnectorSuite) TestBuild() {
func statusTestTask(gc *GraphConnector, objects, success, folder int) {
status := support.CreateStatus(
context.Background(),
support.Restore,
objects, success, folder,
support.Restore, folder,
support.CollectionMetrics{
Objects: objects,
Successes: success,
TotalBytes: 0,
},
support.WrapAndAppend(
"tres",
errors.New("three"),
support.WrapAndAppend("arc376", errors.New("one"), errors.New("two")),
),
"statusTestTask",
)
gc.UpdateStatus(status)
}

View File

@ -108,6 +108,7 @@ func (od *Item) Info() details.ItemInfo {
func (oc *Collection) populateItems(ctx context.Context) {
var (
errs error
byteCount int64
itemsRead = 0
)
@ -115,7 +116,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
// `details.OneDriveInfo`
parentPathString, err := getDriveFolderPath(oc.folderPath)
if err != nil {
oc.reportAsCompleted(ctx, 0, err)
oc.reportAsCompleted(ctx, 0, 0, err)
return
}
@ -133,6 +134,8 @@ func (oc *Collection) populateItems(ctx context.Context) {
}
// Item read successfully, add to collection
itemsRead++
// byteCount iteration
byteCount += itemInfo.Size
itemInfo.ParentPath = parentPathString
@ -143,17 +146,22 @@ func (oc *Collection) populateItems(ctx context.Context) {
}
}
oc.reportAsCompleted(ctx, itemsRead, errs)
oc.reportAsCompleted(ctx, itemsRead, byteCount, errs)
}
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsRead int, errs error) {
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsRead int, byteCount int64, errs error) {
close(oc.data)
status := support.CreateStatus(ctx, support.Backup,
len(oc.driveItemIDs), // items to read
itemsRead, // items read successfully
1, // num folders (always 1)
errs)
support.CollectionMetrics{
Objects: len(oc.driveItemIDs), // items to read,
Successes: itemsRead, // items read successfully,
TotalBytes: byteCount, // Number of bytes read in the operation,
},
errs,
oc.folderPath.Folder(), // Additional details
)
logger.Ctx(ctx).Debug(status.String())
oc.statusUpdater(status)
}

View File

@ -52,7 +52,7 @@ func RestoreCollections(
dcs []data.Collection,
) (*support.ConnectorOperationStatus, error) {
var (
total, restored int
restoreMetrics support.CollectionMetrics
restoreErrors error
)
@ -62,30 +62,39 @@ func RestoreCollections(
// Iterate through the data collections and restore the contents of each
for _, dc := range dcs {
t, r, canceled := restoreCollection(ctx, service, dc, dest.ContainerName, errUpdater)
total += t
restored += r
temp, canceled := restoreCollection(ctx, service, dc, dest.ContainerName, errUpdater)
restoreMetrics.Combine(temp)
if canceled {
break
}
}
return support.CreateStatus(ctx, support.Restore, total, restored, 0, restoreErrors), nil
return support.CreateStatus(
ctx,
support.Restore,
len(dcs),
restoreMetrics,
restoreErrors,
dest.ContainerName),
nil
}
// restoreCollection handles restoration of an individual collection.
// @returns Integer representing totalItems, restoredItems, and the
// amount of bytes restored. The bool represents whether the context was cancelled
func restoreCollection(
ctx context.Context,
service graph.Service,
dc data.Collection,
restoreContainerName string,
errUpdater func(string, error),
) (int, int, bool) {
) (support.CollectionMetrics, bool) {
defer trace.StartRegion(ctx, "gc:oneDrive:restoreCollection").End()
var (
total, restored int
metrics = support.CollectionMetrics{}
copyBuffer = make([]byte, copyBufferSize)
directory = dc.FullPath()
)
@ -93,7 +102,7 @@ func restoreCollection(
drivePath, err := toOneDrivePath(directory)
if err != nil {
errUpdater(directory.String(), err)
return 0, 0, false
return metrics, false
}
// Assemble folder hierarchy we're going to restore into (we recreate the folder hierarchy
@ -110,7 +119,7 @@ func restoreCollection(
restoreFolderID, err := createRestoreFolders(ctx, service, drivePath.driveID, restoreFolderElements)
if err != nil {
errUpdater(directory.String(), errors.Wrapf(err, "failed to create folders %v", restoreFolderElements))
return 0, 0, false
return metrics, false
}
// Restore items from the collection
@ -120,13 +129,15 @@ func restoreCollection(
select {
case <-ctx.Done():
errUpdater("context canceled", ctx.Err())
return total, restored, true
return metrics, true
case itemData, ok := <-items:
if !ok {
return total, restored, false
return metrics, false
}
total++
metrics.Objects++
metrics.TotalBytes += int64(len(copyBuffer))
err := restoreItem(ctx, service, itemData, drivePath.driveID, restoreFolderID, copyBuffer)
if err != nil {
@ -134,7 +145,7 @@ func restoreCollection(
continue
}
restored++
metrics.Successes++
}
}
}

View File

@ -4,9 +4,17 @@ import (
"context"
"fmt"
bytesize "github.com/inhies/go-bytesize"
"github.com/alcionai/corso/src/pkg/logger"
)
// ConnectorOperationStatus is a data type used to describe the state of
// the sequence of operations.
// @param ObjectCount integer representation of how many objects have downloaded or uploaded.
// @param Successful: Number of objects that are sent through the connector without incident.
// @param incomplete: Bool representation of whether all intended items were download or uploaded.
// @param bytes: represents the total number of bytes that have been downloaded or uploaded.
type ConnectorOperationStatus struct {
lastOperation Operation
ObjectCount int
@ -15,6 +23,19 @@ type ConnectorOperationStatus struct {
errorCount int
incomplete bool
incompleteReason string
additionalDetails string
bytes int64
}
type CollectionMetrics struct {
Objects, Successes int
TotalBytes int64
}
func (cm *CollectionMetrics) Combine(additional CollectionMetrics) {
cm.Objects += additional.Objects
cm.Successes += additional.Successes
cm.TotalBytes += additional.TotalBytes
}
type Operation int
@ -30,8 +51,10 @@ const (
func CreateStatus(
ctx context.Context,
op Operation,
objects, success, folders int,
folders int,
cm CollectionMetrics,
err error,
details string,
) *ConnectorOperationStatus {
var reason string
@ -43,19 +66,21 @@ func CreateStatus(
numErr := GetNumberOfErrors(err)
status := ConnectorOperationStatus{
lastOperation: op,
ObjectCount: objects,
ObjectCount: cm.Objects,
FolderCount: folders,
Successful: success,
Successful: cm.Successes,
errorCount: numErr,
incomplete: hasErrors,
incompleteReason: reason,
bytes: cm.TotalBytes,
additionalDetails: details,
}
if status.ObjectCount != status.errorCount+status.Successful {
logger.Ctx(ctx).DPanicw(
"status object count does not match errors + successes",
"objects", objects,
"successes", success,
"objects", cm.Objects,
"successes", cm.Successes,
"numErrors", numErr,
"errors", err.Error())
}
@ -90,21 +115,38 @@ func MergeStatus(one, two ConnectorOperationStatus) ConnectorOperationStatus {
FolderCount: one.FolderCount + two.FolderCount,
Successful: one.Successful + two.Successful,
errorCount: one.errorCount + two.errorCount,
bytes: one.bytes + two.bytes,
incomplete: hasErrors,
incompleteReason: one.incompleteReason + " " + two.incompleteReason,
incompleteReason: one.incompleteReason + ", " + two.incompleteReason,
additionalDetails: one.additionalDetails + ", " + two.additionalDetails,
}
return status
}
func (cos *ConnectorOperationStatus) String() string {
message := fmt.Sprintf("Action: %s performed on %d of %d objects within %d directories.", cos.lastOperation.String(),
cos.Successful, cos.ObjectCount, cos.FolderCount)
var operationStatement string
switch cos.lastOperation {
case Backup:
operationStatement = "Downloaded from "
case Restore:
operationStatement = "Restored content to "
}
message := fmt.Sprintf("Action: %s performed on %d of %d objects (%s) within %d directories.",
cos.lastOperation.String(),
cos.Successful,
cos.ObjectCount,
bytesize.New(float64(cos.bytes)),
cos.FolderCount,
)
if cos.incomplete {
message += " " + cos.incompleteReason
}
message = message + "\n"
message += " " + operationStatement + cos.additionalDetails + "\n"
return message
}

View File

@ -54,10 +54,11 @@ func (suite *GCStatusTestSuite) TestCreateStatus() {
result := CreateStatus(
context.Background(),
test.params.operationType,
test.params.objects,
test.params.success,
test.params.folders,
test.params.err)
CollectionMetrics{test.params.objects, test.params.success, 0},
test.params.err,
"",
)
test.expect(t, result.incomplete, "status is incomplete")
})
}
@ -71,10 +72,14 @@ func (suite *GCStatusTestSuite) TestCreateStatus_InvalidStatus() {
CreateStatus(
context.Background(),
params.operationType,
params.folders,
CollectionMetrics{
params.objects,
params.success,
params.folders,
0,
},
params.err,
"",
)
})
}
@ -90,7 +95,7 @@ func (suite *GCStatusTestSuite) TestMergeStatus() {
}{
{
name: "Test: Status + unknown",
one: *CreateStatus(simpleContext, Backup, 1, 1, 1, nil),
one: *CreateStatus(simpleContext, Backup, 1, CollectionMetrics{1, 1, 0}, nil, ""),
two: ConnectorOperationStatus{},
expected: statusParams{Backup, 1, 1, 1, nil},
isIncomplete: assert.False,
@ -98,27 +103,31 @@ func (suite *GCStatusTestSuite) TestMergeStatus() {
{
name: "Test: unknown + Status",
one: ConnectorOperationStatus{},
two: *CreateStatus(simpleContext, Backup, 1, 1, 1, nil),
two: *CreateStatus(simpleContext, Backup, 1, CollectionMetrics{1, 1, 0}, nil, ""),
expected: statusParams{Backup, 1, 1, 1, nil},
isIncomplete: assert.False,
},
{
name: "Test: Successful + Successful",
one: *CreateStatus(simpleContext, Backup, 1, 1, 1, nil),
two: *CreateStatus(simpleContext, Backup, 3, 3, 3, nil),
one: *CreateStatus(simpleContext, Backup, 1, CollectionMetrics{1, 1, 0}, nil, ""),
two: *CreateStatus(simpleContext, Backup, 3, CollectionMetrics{3, 3, 0}, nil, ""),
expected: statusParams{Backup, 4, 4, 4, nil},
isIncomplete: assert.False,
},
{
name: "Test: Successful + Unsuccessful",
one: *CreateStatus(simpleContext, Backup, 17, 17, 13, nil),
one: *CreateStatus(simpleContext, Backup, 13, CollectionMetrics{17, 17, 0}, nil, ""),
two: *CreateStatus(
simpleContext,
Backup,
8,
CollectionMetrics{
12,
9,
8,
0,
},
WrapAndAppend("tres", errors.New("three"), WrapAndAppend("arc376", errors.New("one"), errors.New("two"))),
"",
),
expected: statusParams{Backup, 29, 26, 21, nil},
isIncomplete: assert.True,