GraphConnector Status support module (#222)

Issue #214: Status of Connector
- Support package contains errors module
- status is updated after backup operation
- test suite added for new features.
This commit is contained in:
Danny 2022-06-22 17:17:36 -04:00 committed by GitHub
parent 756b429362
commit 128e9274e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 251 additions and 27 deletions

View File

@ -34,8 +34,9 @@ type GraphConnector struct {
tenant string
adapter msgraphsdk.GraphRequestAdapter
client msgraphsdk.GraphServiceClient
Users map[string]string //key<email> value<id>
Streams string //Not implemented for ease of code check-in
Users map[string]string //key<email> value<id>
Streams string //Not implemented for ease of code check-in
status *support.ConnectorOperationStatus // contains the status of the last run status
}
func NewGraphConnector(tenantId, clientId, secret string) (*GraphConnector, error) {
@ -57,6 +58,7 @@ func NewGraphConnector(tenantId, clientId, secret string) (*GraphConnector, erro
adapter: *adapter,
client: *msgraphsdk.NewGraphServiceClient(adapter),
Users: make(map[string]string, 0),
status: nil,
}
// TODO: Revisit Query all users.
err = gc.setTenantUsers()
@ -82,7 +84,7 @@ func (gc *GraphConnector) setTenantUsers() error {
return err
}
if response == nil {
err = WrapAndAppend("general access", errors.New("connector failed: No access"), err)
err = support.WrapAndAppend("general access", errors.New("connector failed: No access"), err)
return err
}
userIterator, err := msgraphgocore.NewPageIterator(response, &gc.adapter, models.CreateUserCollectionResponseFromDiscriminatorValue)
@ -93,7 +95,7 @@ func (gc *GraphConnector) setTenantUsers() error {
callbackFunc := func(userItem interface{}) bool {
user, ok := userItem.(models.Userable)
if !ok {
err = WrapAndAppend(gc.adapter.GetBaseUrl(), errors.New("user iteration failure"), err)
err = support.WrapAndAppend(gc.adapter.GetBaseUrl(), errors.New("user iteration failure"), err)
return true
}
gc.Users[*user.GetMail()] = *user.GetId()
@ -101,7 +103,7 @@ func (gc *GraphConnector) setTenantUsers() error {
}
iterateError = userIterator.Iterate(callbackFunc)
if iterateError != nil {
err = WrapAndAppend(gc.adapter.GetBaseUrl(), iterateError, err)
err = support.WrapAndAppend(gc.adapter.GetBaseUrl(), iterateError, err)
}
return err
}
@ -170,12 +172,12 @@ func (gc *GraphConnector) restoreMessages(ctx context.Context, dc DataCollection
buf := &bytes.Buffer{}
_, err = buf.ReadFrom(data.ToReader())
if err != nil {
errs = WrapAndAppend(data.UUID(), err, errs)
errs = support.WrapAndAppend(data.UUID(), err, errs)
continue
}
message, err := support.CreateMessageFromBytes(buf.Bytes())
if err != nil {
errs = WrapAndAppend(data.UUID(), err, errs)
errs = support.WrapAndAppend(data.UUID(), err, errs)
continue
}
clone := support.ToMessage(message)
@ -191,20 +193,19 @@ func (gc *GraphConnector) restoreMessages(ctx context.Context, dc DataCollection
clone.SetIsDraft(&draft)
sentMessage, err := gc.client.UsersById(user).MailFoldersById(address).Messages().Post(clone)
if err != nil {
details := ConnectorStackErrorTrace(ctx, err)
errs = WrapAndAppend(data.UUID()+": "+details, err, errs)
errs = support.WrapAndAppend(data.UUID()+": "+
support.ConnectorStackErrorTrace(ctx, err), err, errs)
continue
// TODO: Add to retry Handler for the for failure
}
if sentMessage == nil && err == nil {
errs = WrapAndAppend(data.UUID(), errors.New("Message not Sent: Blocked by server"), errs)
errs = support.WrapAndAppend(data.UUID(), errors.New("Message not Sent: Blocked by server"), errs)
}
// This completes the restore loop for a message..
}
return errs
}
// serializeMessages: Temp Function as place Holder until Collections have been added
@ -226,29 +227,31 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([
var byteArray []byte
collections := make([]DataCollection, 0)
var errs error
var totalItems, success int
for _, aFolder := range folderList {
result, err := gc.client.UsersById(user).MailFoldersById(aFolder).Messages().Get()
if err != nil {
errs = WrapAndAppend(user, err, errs)
errs = support.WrapAndAppend(user, err, errs)
}
if result == nil {
errs = WrapAndAppend(user, fmt.Errorf("nil response on message query, folder: %s", aFolder), errs)
errs = support.WrapAndAppend(user, fmt.Errorf("nil response on message query, folder: %s", aFolder), errs)
continue
}
pageIterator, err := msgraphgocore.NewPageIterator(result, &gc.adapter, models.CreateMessageCollectionResponseFromDiscriminatorValue)
if err != nil {
errs = WrapAndAppend(user, fmt.Errorf("iterator failed initialization: %v", err), errs)
errs = support.WrapAndAppend(user, fmt.Errorf("iterator failed initialization: %v", err), errs)
continue
}
objectWriter := kw.NewJsonSerializationWriter()
edc := NewExchangeDataCollection(user, []string{gc.tenant, user, mailCategory, aFolder})
callbackFunc := func(messageItem interface{}) bool {
totalItems++
message, ok := messageItem.(models.Messageable)
if !ok {
errs = WrapAndAppend(user, fmt.Errorf("non-message return for user: %s", user), errs)
errs = support.WrapAndAppend(user, fmt.Errorf("non-message return for user: %s", user), errs)
return true
}
if *message.GetHasAttachments() {
@ -262,18 +265,18 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([
}
if err != nil {
logger.Ctx(ctx).Debug("exceeded maximum retries")
errs = WrapAndAppend(*message.GetId(), fmt.Errorf("attachment failed: %v ", err), errs)
errs = support.WrapAndAppend(*message.GetId(), fmt.Errorf("attachment failed: %v ", err), errs)
}
}
err = objectWriter.WriteObjectValue("", message)
if err != nil {
errs = WrapAndAppend(*message.GetId(), err, errs)
errs = support.WrapAndAppend(*message.GetId(), err, errs)
return true
}
byteArray, err = objectWriter.GetSerializedContent()
objectWriter.Close()
if err != nil {
errs = WrapAndAppend(*message.GetId(), err, errs)
errs = support.WrapAndAppend(*message.GetId(), err, errs)
return true
}
if byteArray != nil {
@ -284,13 +287,32 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([
err = pageIterator.Iterate(callbackFunc)
if err != nil {
errs = WrapAndAppend(user, err, errs)
errs = support.WrapAndAppend(user, err, errs)
}
// Todo Retry Handler to be implemented
edc.FinishPopulation()
logger.Ctx(ctx).Debugw("finished storing ExchangeDataColection", "itemCount", edc.Length())
success += edc.Length()
collections = append(collections, &edc)
}
status, err := support.CreateStatus(support.Backup, totalItems, success, len(folderList), errs)
if err == nil {
gc.SetStatus(*status)
logger.Ctx(ctx).Debugw(gc.Status())
}
return collections, errs
}
// SetStatus helper function
func (gc *GraphConnector) SetStatus(cos support.ConnectorOperationStatus) {
gc.status = &cos
}
func (gc *GraphConnector) Status() string {
if gc.status == nil {
return ""
}
return gc.status.String()
}

View File

@ -4,10 +4,12 @@ import (
"context"
"testing"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/connector/support"
ctesting "github.com/alcionai/corso/internal/testing"
"github.com/alcionai/corso/pkg/credentials"
)
@ -44,8 +46,6 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector() {
}
// --------------------
type DiconnectedGraphConnectorSuite struct {
suite.Suite
}
@ -67,9 +67,7 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_ExchangeDataColl
collectionList, err := suite.connector.ExchangeDataCollection(context.Background(), "lidiah@8qzvrj.onmicrosoft.com")
assert.NotNil(suite.T(), collectionList)
assert.Error(suite.T(), err) // TODO Remove after https://github.com/alcionai/corso/issues/140
if err != nil {
suite.T().Logf("Missing Data: %s\n", err.Error())
}
assert.NotNil(suite.T(), suite.connector.status)
suite.NotContains(err.Error(), "attachment failed") // TODO Create Retry Exceeded Error
exchangeData := collectionList[0]
suite.Greater(len(exchangeData.FullPath()), 2)
@ -152,3 +150,13 @@ func (suite *DiconnectedGraphConnectorSuite) TestInterfaceAlignment() {
assert.NotNil(suite.T(), dc)
}
func (suite *DiconnectedGraphConnectorSuite) TestGraphConnector_Status() {
gc := GraphConnector{}
suite.Equal(len(gc.Status()), 0)
status, err := support.CreateStatus(support.Restore, 12, 9, 8,
support.WrapAndAppend("tres", errors.New("three"), support.WrapAndAppend("arc376", errors.New("one"), errors.New("two"))))
assert.NoError(suite.T(), err)
gc.SetStatus(*status)
suite.Greater(len(gc.Status()), 0)
}

View File

@ -1,8 +1,9 @@
package connector
package support
import (
"context"
"fmt"
"strconv"
"strings"
multierror "github.com/hashicorp/go-multierror"
@ -23,6 +24,21 @@ func WrapAndAppendf(identifier interface{}, e error, previous error) error {
return multierror.Append(previous, errors.Wrapf(e, "%v", identifier))
}
// GetErrors Helper method to return the integer amount of errors in multi error
func GetNumberOfErrors(err error) int {
if err == nil {
return 0
}
result, _, wasFound := strings.Cut(err.Error(), " ")
if wasFound {
aNum, err := strconv.Atoi(result)
if err == nil {
return aNum
}
}
return 1
}
// ListErrors is a helper method used to return the string of errors when
// the multiError library is used.
// depends on ConnectorStackErrorTrace

View File

@ -1,4 +1,4 @@
package connector
package support
import (
"context"
@ -66,3 +66,33 @@ func (suite *GraphConnectorErrorSuite) TestConcatenateStringFromPointers() {
suite.True(strings.Contains(outString, v1))
suite.True(strings.Contains(outString, v3))
}
func (suite *GraphConnectorErrorSuite) TestGetNumberOfErrors() {
table := []struct {
name string
errs error
expected int
}{
{
name: "No error",
errs: nil,
expected: 0,
},
{
name: "Not an ErrorList",
errs: errors.New("network error"),
expected: 1,
},
{
name: "Three Errors",
errs: WrapAndAppend("tres", errors.New("three"), WrapAndAppend("arc376", errors.New("one"), errors.New("two"))),
expected: 3,
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
result := GetNumberOfErrors(test.errs)
suite.Equal(result, test.expected)
})
}
}

View File

@ -0,0 +1,25 @@
// Code generated by "stringer -type=Operation"; DO NOT EDIT.
package support
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[OpUnknown-0]
_ = x[Backup-1]
_ = x[Restore-2]
}
const _Operation_name = "OpUnknownBackupRestore"
var _Operation_index = [...]uint8{0, 9, 15, 22}
func (i Operation) String() string {
if i < 0 || i >= Operation(len(_Operation_index)-1) {
return "Operation(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _Operation_name[_Operation_index[i]:_Operation_index[i+1]]
}

View File

@ -0,0 +1,57 @@
package support
import (
"errors"
"fmt"
)
type ConnectorOperationStatus struct {
lastOperation Operation
objectCount int
folderCount int
successful int
errorCount int
incomplete bool
incompleteReason string
}
type Operation int
//go:generate stringer -type=Operation
const (
OpUnknown Operation = iota
Backup
Restore
)
// Constructor for ConnectorOperationStatus. If the counts do not agree, an error is returned.
func CreateStatus(op Operation, objects, success, folders int, err error) (*ConnectorOperationStatus, error) {
hasErrors := err != nil
var reason string
if err != nil {
reason = err.Error()
}
status := ConnectorOperationStatus{
lastOperation: op,
objectCount: objects,
folderCount: folders,
successful: success,
errorCount: GetNumberOfErrors(err),
incomplete: hasErrors,
incompleteReason: reason,
}
if status.objectCount != status.errorCount+status.successful {
return nil, errors.New("incorrect total on initialization")
}
return &status, nil
}
func (cos *ConnectorOperationStatus) String() string {
message := fmt.Sprintf("Action: %s performed on %d of %d objects within %d directories.", cos.lastOperation.String(),
cos.successful, cos.objectCount, cos.folderCount)
if cos.incomplete {
message += " " + cos.incompleteReason
}
message = message + "\n"
return message
}

View File

@ -0,0 +1,66 @@
package support
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type GCStatusTestSuite struct {
suite.Suite
}
func TestGraphConnectorStatus(t *testing.T) {
suite.Run(t, &GCStatusTestSuite{})
}
// operationType, objects, success, folders, errCount int, errStatus string
type statusParams struct {
operationType Operation
objects int
success int
folders int
err error
}
func (suite *GCStatusTestSuite) TestCreateStatus() {
table := []struct {
name string
params statusParams
expected bool
checkError assert.ValueAssertionFunc
}{
{
name: "Test: Status Success",
params: statusParams{Backup, 12, 12, 3, nil},
expected: false,
checkError: assert.Nil,
},
{
name: "Test: Status Failed",
params: statusParams{Restore, 12, 9, 8, WrapAndAppend("tres", errors.New("three"), WrapAndAppend("arc376", errors.New("one"), errors.New("two")))},
expected: true,
checkError: assert.Nil,
},
{
name: "Invalid status",
params: statusParams{Backup, 9, 3, 12, errors.New("invalidcl")},
expected: false,
checkError: assert.NotNil,
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
result, err := CreateStatus(test.params.operationType, test.params.objects,
test.params.success, test.params.folders, test.params.err)
test.checkError(t, err)
if err == nil {
suite.Equal(result.incomplete, test.expected)
}
})
}
}