GC async population of m365 objects mail (#332)

-- Changes made to incorporate async population of a DataCollection
This commit is contained in:
Danny 2022-07-18 17:01:58 -04:00 committed by GitHub
parent 5a9f2e4601
commit d00332f328
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 138 additions and 61 deletions

View File

@ -27,11 +27,6 @@ jobs:
with:
go-version: 1.18
- name: go-cache-paths
run: |
echo "::set-output name=go-build::$(go env GOCACHE)"
echo "::set-output name=go-mod::$(go env GOMODCACHE)"
- name: Cache Go build
uses: actions/cache@v3
id: mybuild

View File

@ -6,6 +6,7 @@ import (
"bytes"
"context"
"strings"
"sync/atomic"
az "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
ka "github.com/microsoft/kiota-authentication-azure-go"
@ -23,7 +24,7 @@ import (
)
const (
numberOfRetries = 3
numberOfRetries = 4
mailCategory = "mail"
)
@ -31,12 +32,18 @@ const (
// GraphRequestAdapter from the msgraph-sdk-go. Additional fields are for
// bookkeeping and interfacing with other component.
type GraphConnector struct {
tenant string
adapter msgraphsdk.GraphRequestAdapter
graphService
tenant string
Users map[string]string //key<email> value<id>
status *support.ConnectorOperationStatus // contains the status of the last run status
statusCh chan *support.ConnectorOperationStatus
awaitingMessages int32
credentials account.M365Config
}
type graphService struct {
client msgraphsdk.GraphServiceClient
Users map[string]string //key<email> value<id>
Streams string //Not implemented for ease of code check-in
status *support.ConnectorOperationStatus // contains the status of the last run status
adapter msgraphsdk.GraphRequestAdapter
}
func NewGraphConnector(acct account.Account) (*GraphConnector, error) {
@ -44,8 +51,28 @@ func NewGraphConnector(acct account.Account) (*GraphConnector, error) {
if err != nil {
return nil, errors.Wrap(err, "retrieving m356 account configuration")
}
gc := GraphConnector{
tenant: m365.TenantID,
Users: make(map[string]string, 0),
status: nil,
statusCh: make(chan *support.ConnectorOperationStatus),
credentials: m365,
}
aService, err := gc.createService()
if err != nil {
return nil, err
}
gc.graphService = *aService
err = gc.setTenantUsers()
if err != nil {
return nil, err
}
return &gc, nil
}
func createAdapter(tenant, client, secret string) (*msgraphsdk.GraphRequestAdapter, error) {
// Client Provider: Uses Secret for access to tenant-level data
cred, err := az.NewClientSecretCredential(m365.TenantID, m365.ClientID, m365.ClientSecret, nil)
cred, err := az.NewClientSecretCredential(tenant, client, secret, nil)
if err != nil {
return nil, err
}
@ -54,22 +81,20 @@ func NewGraphConnector(acct account.Account) (*GraphConnector, error) {
return nil, err
}
adapter, err := msgraphsdk.NewGraphRequestAdapter(auth)
return adapter, err
}
// createSubConnector private constructor method for subConnector
func (gc *GraphConnector) createService() (*graphService, error) {
adapter, err := createAdapter(gc.credentials.TenantID, gc.credentials.ClientID, gc.credentials.ClientSecret)
if err != nil {
return nil, err
}
gc := GraphConnector{
tenant: m365.TenantID,
connector := graphService{
adapter: *adapter,
client: *msgraphsdk.NewGraphServiceClient(adapter),
Users: make(map[string]string, 0),
status: nil,
}
// TODO: Revisit Query all users.
err = gc.setTenantUsers()
if err != nil {
return nil, err
}
return &gc, nil
return &connector, err
}
// setTenantUsers queries the M365 to identify the users in the
@ -83,7 +108,7 @@ func (gc *GraphConnector) setTenantUsers() error {
options := &msuser.UsersRequestBuilderGetRequestConfiguration{
QueryParameters: requestParams,
}
response, err := gc.client.Users().GetWithRequestConfigurationAndResponseHandler(options, nil)
response, err := gc.graphService.client.Users().GetWithRequestConfigurationAndResponseHandler(options, nil)
if err != nil {
return err
}
@ -91,7 +116,7 @@ func (gc *GraphConnector) setTenantUsers() error {
err = support.WrapAndAppend("general access", errors.New("connector failed: No access"), err)
return err
}
userIterator, err := msgraphgocore.NewPageIterator(response, &gc.adapter, models.CreateUserCollectionResponseFromDiscriminatorValue)
userIterator, err := msgraphgocore.NewPageIterator(response, &gc.graphService.adapter, models.CreateUserCollectionResponseFromDiscriminatorValue)
if err != nil {
return err
}
@ -99,7 +124,7 @@ func (gc *GraphConnector) setTenantUsers() error {
callbackFunc := func(userItem interface{}) bool {
user, ok := userItem.(models.Userable)
if !ok {
err = support.WrapAndAppend(gc.adapter.GetBaseUrl(), errors.New("user iteration failure"), err)
err = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), errors.New("user iteration failure"), err)
return true
}
gc.Users[*user.GetMail()] = *user.GetId()
@ -107,7 +132,7 @@ func (gc *GraphConnector) setTenantUsers() error {
}
iterateError = userIterator.Iterate(callbackFunc)
if iterateError != nil {
err = support.WrapAndAppend(gc.adapter.GetBaseUrl(), iterateError, err)
err = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), iterateError, err)
}
return err
}
@ -171,17 +196,16 @@ func (gc *GraphConnector) ExchangeDataCollection(ctx context.Context, selector s
}
dcs, err := gc.serializeMessages(ctx, user)
if err != nil {
errs = support.WrapAndAppend(user, err, errs)
return nil, support.WrapAndAppend(user, err, errs)
}
if len(dcs) > 0 {
collections = append(collections, dcs...)
for _, collection := range dcs {
collections = append(collections, collection)
}
}
}
}
// TODO replace with completion of Issue 124:
//TODO: Retry handler to convert return: (DataCollection, error)
return collections, errs
}
@ -235,7 +259,7 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []DataCollect
clone.SetSingleValueExtendedProperties(svlep)
draft := false
clone.SetIsDraft(&draft)
sentMessage, err := gc.client.UsersById(user).MailFoldersById(address).Messages().Post(clone)
sentMessage, err := gc.graphService.client.UsersById(user).MailFoldersById(address).Messages().Post(clone)
if err != nil {
errs = support.WrapAndAppend(
data.UUID()+": "+support.ConnectorStackErrorTrace(err),
@ -263,13 +287,13 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []DataCollect
// serializeMessages: Temp Function as place Holder until Collections have been added
// to the GraphConnector struct.
func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([]DataCollection, error) {
func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (map[string]*ExchangeDataCollection, error) {
options := optionsForMessageSnapshot()
response, err := gc.client.UsersById(user).Messages().GetWithRequestConfigurationAndResponseHandler(options, nil)
response, err := gc.graphService.client.UsersById(user).Messages().GetWithRequestConfigurationAndResponseHandler(options, nil)
if err != nil {
return nil, err
}
pageIterator, err := msgraphgocore.NewPageIterator(response, &gc.adapter, models.CreateMessageCollectionResponseFromDiscriminatorValue)
pageIterator, err := msgraphgocore.NewPageIterator(response, &gc.graphService.adapter, models.CreateMessageCollectionResponseFromDiscriminatorValue)
if err != nil {
return nil, err
}
@ -277,7 +301,7 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([
callbackFunc := func(messageItem any) bool {
message, ok := messageItem.(models.Messageable)
if !ok {
err = support.WrapAndAppendf(gc.adapter.GetBaseUrl(), errors.New("message iteration failure"), err)
err = support.WrapAndAppendf(gc.graphService.adapter.GetBaseUrl(), errors.New("message iteration failure"), err)
return true
}
// Saving to messages to list. Indexed by folder
@ -286,48 +310,89 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([
}
iterateError := pageIterator.Iterate(callbackFunc)
if iterateError != nil {
err = support.WrapAndAppend(gc.adapter.GetBaseUrl(), iterateError, err)
err = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), iterateError, err)
}
if err != nil {
return nil, err // return error if snapshot is incomplete
}
// Time to create Exchange data Holder
collections := make([]DataCollection, 0)
objectWriter := kw.NewJsonSerializationWriter()
var errs error
var attemptedItems, success int
// Create collection of ExchangeDataCollection and create data Holder
collections := make(map[string]*ExchangeDataCollection)
for aFolder, tasks := range tasklist {
for aFolder := range tasklist {
// prep the items for handoff to the backup consumer
edc := NewExchangeDataCollection(user, []string{gc.tenant, user, mailCategory, aFolder})
collections[aFolder] = &edc
}
if len(collections) == 0 {
if len(tasklist) != 0 {
// Below error message needs revising. Assumption is that it should always
// find both items to fetch and a DataCollection to put them in
return nil, support.WrapAndAppend(
user, errors.New("found items but no directories"), err)
}
// return empty collection when no items found
return nil, err
}
service, err := gc.createService()
if err != nil {
return nil, support.WrapAndAppend(user, err, err)
}
// async call to populate
go service.populateFromTaskList(ctx, tasklist, collections, gc.statusCh)
gc.incrementAwaitingMessages()
return collections, err
}
// populateFromTaskList async call to fill DataCollection via channel implementation
func (sc *graphService) populateFromTaskList(
context context.Context,
tasklist TaskList,
collections map[string]*ExchangeDataCollection,
statusChannel chan<- *support.ConnectorOperationStatus,
) {
var errs error
var attemptedItems, success int
objectWriter := kw.NewJsonSerializationWriter()
//Todo this has to return all the errors in the status
for aFolder, tasks := range tasklist {
// Get the same folder
edc := collections[aFolder]
if edc == nil {
for _, task := range tasks {
errs = support.WrapAndAppend(task, errors.New("unable to query: collection not found during populateFromTaskList"), errs)
}
continue
}
for _, task := range tasks {
response, err := gc.client.UsersById(user).MessagesById(task).Get()
response, err := sc.client.UsersById(edc.user).MessagesById(task).Get()
if err != nil {
details := support.ConnectorStackErrorTrace(err)
errs = support.WrapAndAppend(user, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs)
errs = support.WrapAndAppend(edc.user, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs)
continue
}
err = gc.messageToDataCollection(ctx, objectWriter, edc, response, user)
err = messageToDataCollection(&sc.client, context, objectWriter, edc.data, response, edc.user)
if err != nil {
errs = support.WrapAndAppendf(user, err, errs)
errs = support.WrapAndAppendf(edc.user, err, errs)
}
}
edc.FinishPopulation()
attemptedItems += len(tasks)
success += edc.Length()
collections = append(collections, &edc)
}
status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, len(tasklist), errs)
gc.SetStatus(*status)
logger.Ctx(ctx).Debugw(gc.PrintableStatus())
return collections, errs
status := support.CreateStatus(context, support.Backup, attemptedItems, success, len(tasklist), errs)
logger.Ctx(context).Debug(status.String())
statusChannel <- status
}
func (gc *GraphConnector) messageToDataCollection(
func messageToDataCollection(
client *msgraphsdk.GraphServiceClient,
ctx context.Context,
objectWriter *kw.JsonSerializationWriter,
edc ExchangeDataCollection,
dataChannel chan<- DataStream,
message models.Messageable,
user string,
) error {
@ -344,7 +409,7 @@ func (gc *GraphConnector) messageToDataCollection(
// getting all the attachments might take a couple attempts due to filesize
var retriesErr error
for count := 0; count < numberOfRetries; count++ {
attached, err := gc.client.
attached, err := client.
UsersById(user).
MessagesById(*aMessage.GetId()).
Attachments().
@ -371,9 +436,8 @@ func (gc *GraphConnector) messageToDataCollection(
return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil)
}
if byteArray != nil {
edc.PopulateCollection(&ExchangeData{id: *aMessage.GetId(), message: byteArray})
dataChannel <- &ExchangeData{id: *aMessage.GetId(), message: byteArray}
}
return nil
}
@ -382,6 +446,16 @@ func (gc *GraphConnector) SetStatus(cos support.ConnectorOperationStatus) {
gc.status = &cos
}
// AwaitStatus updates status field based on item within statusChannel.
func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus {
if gc.awaitingMessages > 0 {
gc.status = <-gc.statusCh
atomic.AddInt32(&gc.awaitingMessages, -1)
return gc.status
}
return nil
}
// Status returns the current status of the graphConnector operaion.
func (gc *GraphConnector) Status() *support.ConnectorOperationStatus {
return gc.status
@ -395,6 +469,10 @@ func (gc *GraphConnector) PrintableStatus() string {
return gc.status.String()
}
func (gc *GraphConnector) incrementAwaitingMessages() {
atomic.AddInt32(&gc.awaitingMessages, 1)
}
// IsRecoverableError returns true iff error is a RecoverableGCEerror
func IsRecoverableError(e error) bool {
var recoverable support.RecoverableGCError

View File

@ -59,11 +59,15 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_setTenantUsers()
func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_ExchangeDataCollection() {
sel := selectors.NewExchangeBackup()
sel.Include(sel.Users("lidiah@8qzvrj.onmicrosoft.com"))
sel.Include(sel.Users("meganb@8qzvrj.onmicrosoft.com"))
collectionList, err := suite.connector.ExchangeDataCollection(context.Background(), sel.Selector)
assert.NotNil(suite.T(), collectionList, "collection list")
assert.Nil(suite.T(), err)
assert.NotNil(suite.T(), suite.connector.status, "connector status")
assert.True(suite.T(), suite.connector.awaitingMessages > 0)
assert.Nil(suite.T(), suite.connector.status)
status := suite.connector.AwaitStatus()
assert.NotNil(suite.T(), status, "status not blocking on async call")
exchangeData := collectionList[0]
suite.Greater(len(exchangeData.FullPath()), 2)
}

View File

@ -90,7 +90,6 @@ func (op *BackupOperation) Run(ctx context.Context) error {
stats.readErr = err
return errors.Wrap(err, "retrieving service data")
}
stats.gc = gc.Status()
// hand the results to the consumer
var details *backup.Details
@ -99,6 +98,7 @@ func (op *BackupOperation) Run(ctx context.Context) error {
stats.writeErr = err
return errors.Wrap(err, "backing up service data")
}
stats.gc = gc.AwaitStatus()
err = op.createBackupModels(ctx, stats.k.SnapshotID, details)
if err != nil {