From eff95b7702a7cc903b91bced6aba34f3d736ff08 Mon Sep 17 00:00:00 2001 From: Danny Date: Fri, 5 Aug 2022 16:19:48 -0400 Subject: [PATCH] GC: Contact Framework added (#473) `exchange`.Contacts added to Collection retrieval option base through the use of a selector. The ExchangeDataCollection Function to be updated to make the function `Live` to be called externally --- .../exchange/exchange_data_collection.go | 129 ++++++++++++++---- .../exchange/exchange_data_collection_test.go | 8 +- .../exchange/exchange_service_test.go | 28 ++++ .../connector/exchange/service_functions.go | 19 ++- .../connector/exchange/service_query.go | 102 ++++++++++++-- src/internal/connector/graph_connector.go | 10 +- .../connector/graph_connector_test.go | 38 ++++++ 7 files changed, 287 insertions(+), 47 deletions(-) diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index baa590abb..e0650dc06 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -53,7 +53,14 @@ type Collection struct { // Populater are a class of functions that can be used to fill exchange.Collections with // the corresponding information -type populater func(context.Context, graph.Service, *Collection, chan<- *support.ConnectorOperationStatus) +type populater func( + ctx context.Context, + service graph.Service, + user string, + jobs []string, + dataChannel chan<- data.Stream, + statusChannel chan<- *support.ConnectorOperationStatus, +) // NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated func NewCollection( @@ -80,7 +87,9 @@ func NewCollection( func getPopulateFunction(optID optionIdentifier) populater { switch optID { case messages: - return PopulateFromCollection + return PopulateForMailCollection + case contacts: + return PopulateForContactCollection default: return nil } @@ -95,7 +104,14 @@ func (eoc *Collection) AddJob(objID string) { // M365 exchange objects and returns the data channel func (eoc *Collection) Items() <-chan data.Stream { if eoc.populate != nil { - go eoc.populate(context.TODO(), eoc.service, eoc, eoc.statusCh) + go eoc.populate( + context.TODO(), + eoc.service, + eoc.user, + eoc.jobs, + eoc.data, + eoc.statusCh, + ) } return eoc.data } @@ -105,46 +121,113 @@ func (eoc *Collection) FullPath() []string { return append([]string{}, eoc.fullPath...) } -// PopulateFromCollection async call to fill DataCollection via channel implementation -func PopulateFromCollection( +func PopulateForContactCollection( ctx context.Context, service graph.Service, - eoc *Collection, + user string, + jobs []string, + dataChannel chan<- data.Stream, + statusChannel chan<- *support.ConnectorOperationStatus, +) { + var ( + errs error + success int + ) + objectWriter := kw.NewJsonSerializationWriter() + + for _, task := range jobs { + response, err := service.Client().UsersById(user).ContactsById(task).Get() + if err != nil { + trace := support.ConnectorStackErrorTrace(err) + errs = support.WrapAndAppend( + user, + errors.Wrapf(err, "unable to retrieve item %s; details: %s", task, trace), + errs, + ) + continue + } + err = contactToDataCollection(ctx, service.Client(), objectWriter, dataChannel, response, user) + if err != nil { + errs = support.WrapAndAppendf(user, err, errs) + + if service.ErrPolicy() { + break + } + continue + } + + success++ + + } + close(dataChannel) + attemptedItems := len(jobs) + status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, 1, errs) + logger.Ctx(ctx).Debug(status.String()) + statusChannel <- status +} + +// PopulateForMailCollection async call to fill DataCollection via channel implementation +func PopulateForMailCollection( + ctx context.Context, + service graph.Service, + user string, + jobs []string, + dataChannel chan<- data.Stream, statusChannel chan<- *support.ConnectorOperationStatus, ) { var errs error var attemptedItems, success int objectWriter := kw.NewJsonSerializationWriter() - for _, task := range eoc.jobs { - response, err := service.Client().UsersById(eoc.user).MessagesById(task).Get() + for _, task := range jobs { + response, err := service.Client().UsersById(user).MessagesById(task).Get() if err != nil { - errDetails := support.ConnectorStackErrorTrace(err) - errs = support.WrapAndAppend( - eoc.user, - errors.Wrapf(err, "unable to retrieve item %s; details %s", task, errDetails), - errs, - ) + trace := support.ConnectorStackErrorTrace(err) + errs = support.WrapAndAppend(user, errors.Wrapf(err, "unable to retrieve item %s; details %s", task, trace), errs) continue } - err = messageToDataCollection(ctx, service.Client(), objectWriter, eoc.data, response, eoc.user) - success++ + err = messageToDataCollection(ctx, service.Client(), objectWriter, dataChannel, response, user) if err != nil { - errs = support.WrapAndAppendf(eoc.user, err, errs) - success-- - } - if errs != nil && service.ErrPolicy() { - break + errs = support.WrapAndAppendf(user, err, errs) + + if service.ErrPolicy() { + break + } + continue } + success++ } - close(eoc.data) - attemptedItems += len(eoc.jobs) + close(dataChannel) + attemptedItems += len(jobs) status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, 1, errs) logger.Ctx(ctx).Debug(status.String()) statusChannel <- status } +func contactToDataCollection( + ctx context.Context, + client *msgraphsdk.GraphServiceClient, + objectWriter *kw.JsonSerializationWriter, + dataChannel chan<- data.Stream, + contact models.Contactable, + user string, +) error { + defer objectWriter.Close() + err := objectWriter.WriteObjectValue("", contact) + if err != nil { + return support.SetNonRecoverableError(errors.Wrap(err, *contact.GetId())) + } + byteArray, err := objectWriter.GetSerializedContent() + if err != nil { + return support.WrapAndAppend(*contact.GetId(), err, nil) + } + if byteArray != nil { + dataChannel <- &Stream{id: *contact.GetId(), message: byteArray, info: nil} + } + return nil +} + func messageToDataCollection( ctx context.Context, client *msgraphsdk.GraphServiceClient, diff --git a/src/internal/connector/exchange/exchange_data_collection_test.go b/src/internal/connector/exchange/exchange_data_collection_test.go index 93250867a..047f526b0 100644 --- a/src/internal/connector/exchange/exchange_data_collection_test.go +++ b/src/internal/connector/exchange/exchange_data_collection_test.go @@ -89,14 +89,16 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_Items() { expected := 5 testFunction := func(ctx context.Context, service graph.Service, - eoc *Collection, + user string, + jobs []string, + dataChannel chan<- data.Stream, notUsed chan<- *support.ConnectorOperationStatus) { detail := &details.ExchangeInfo{Sender: "foo@bar.com", Subject: "Hello world!", Received: time.Now()} for i := 0; i < expected; i++ { temp := NewStream(uuid.NewString(), mockconnector.GetMockMessageBytes("Test_Items()"), *detail) - eoc.data <- &temp + dataChannel <- &temp } - close(eoc.data) + close(dataChannel) } eoc := Collection{ diff --git a/src/internal/connector/exchange/exchange_service_test.go b/src/internal/connector/exchange/exchange_service_test.go index c117503f7..e79b5b16b 100644 --- a/src/internal/connector/exchange/exchange_service_test.go +++ b/src/internal/connector/exchange/exchange_service_test.go @@ -4,7 +4,11 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/internal/tester" + "github.com/alcionai/corso/pkg/selectors" ) type ExchangeServiceSuite struct { @@ -91,3 +95,27 @@ func (suite *ExchangeServiceSuite) TestExchangeService_optionsForFolders() { } } + +// NOTE the requirements are in PR 475 +func (suite *ExchangeServiceSuite) TestExchangeService_SetupExchangeCollection() { + userID, err := tester.M365UserID() + require.NoError(suite.T(), err) + sel := selectors.NewExchangeBackup() + sel.Include(sel.Users([]string{userID})) + eb, err := sel.ToExchangeBackup() + require.NoError(suite.T(), err) + scopes := eb.Scopes() + + for _, test := range scopes { + suite.T().Run(test.Category().String(), func(t *testing.T) { + discriminateFunc, graphQuery, iterFunc, err := SetupExchangeCollectionVars(test) + if test.Category() == selectors.ExchangeMailFolder || + test.Category() == selectors.ExchangeContactFolder { + assert.NoError(t, err) + assert.NotNil(t, discriminateFunc) + assert.NotNil(t, graphQuery) + assert.NotNil(t, iterFunc) + } + }) + } +} diff --git a/src/internal/connector/exchange/service_functions.go b/src/internal/connector/exchange/service_functions.go index b6150598c..42ffaf795 100644 --- a/src/internal/connector/exchange/service_functions.go +++ b/src/internal/connector/exchange/service_functions.go @@ -179,22 +179,29 @@ func SetupExchangeCollectionVars(scope selectors.ExchangeScope) ( absser.ParsableFactory, GraphQuery, GraphIterateFunc, + error, ) { if scope.IncludesCategory(selectors.ExchangeMail) { - folders := scope.Get(selectors.ExchangeMailFolder) - if folders[0] == selectors.AnyTgt { + if scope.IsAny(selectors.ExchangeMailFolder) { return models.CreateMessageCollectionResponseFromDiscriminatorValue, GetAllMessagesForUser, - IterateSelectAllMessagesForCollections + IterateSelectAllMessagesForCollections, + nil } return models.CreateMessageCollectionResponseFromDiscriminatorValue, GetAllMessagesForUser, - IterateAndFilterMessagesForCollections + IterateAndFilterMessagesForCollections, + nil } - return nil, nil, nil - + if scope.IncludesCategory(selectors.ExchangeContactFolder) { + return models.CreateContactFromDiscriminatorValue, + GetAllContactsForUser, + IterateAllContactsForCollection, + nil + } + return nil, nil, nil, errors.New("exchange scope option not supported") } // GetCopyRestoreFolder utility function to create an unique folder for the restore process diff --git a/src/internal/connector/exchange/service_query.go b/src/internal/connector/exchange/service_query.go index 71deca476..39724cb46 100644 --- a/src/internal/connector/exchange/service_query.go +++ b/src/internal/connector/exchange/service_query.go @@ -6,6 +6,7 @@ import ( absser "github.com/microsoft/kiota-abstractions-go/serialization" msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core" "github.com/microsoftgraph/msgraph-sdk-go/models" + mscontacts "github.com/microsoftgraph/msgraph-sdk-go/users/item/contacts" msfolder "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders" msmessage "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages" msitem "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages/item" @@ -19,7 +20,10 @@ import ( type optionIdentifier int -const mailCategory = "mail" +const ( + mailCategory = "mail" + contactsCategory = "contacts" +) //go:generate stringer -type=optionIdentifier const ( @@ -27,23 +31,34 @@ const ( folders messages users + contacts ) // GraphQuery represents functions which perform exchange-specific queries // into M365 backstore. //TODO: use selector or path for granularity into specific folders or specific date ranges -type GraphQuery func(graph.Service, []string) (absser.Parsable, error) +type GraphQuery func(graph.Service, string) (absser.Parsable, error) // GetAllMessagesForUser is a GraphQuery function for receiving all messages for a single user -func GetAllMessagesForUser(gs graph.Service, identities []string) (absser.Parsable, error) { +func GetAllMessagesForUser(gs graph.Service, user string) (absser.Parsable, error) { selecting := []string{"id", "parentFolderId"} options, err := optionsForMessages(selecting) if err != nil { return nil, err } + return gs.Client().UsersById(user).Messages().GetWithRequestConfigurationAndResponseHandler(options, nil) +} + +// GetAllContactsForUser is a GraphQuery function for querying all the contacts in a user's account +func GetAllContactsForUser(gs graph.Service, user string) (absser.Parsable, error) { + selecting := []string{"id", "parentFolderId"} + options, err := optionsForContacts(selecting) + if err != nil { + return nil, err + } + return gs.Client().UsersById(user).Contacts().GetWithRequestConfigurationAndResponseHandler(options, nil) - return gs.Client().UsersById(identities[0]).Messages().GetWithRequestConfigurationAndResponseHandler(options, nil) } // GetAllFolderDisplayNamesForUser is a GraphQuery function for getting FolderId and display @@ -62,13 +77,13 @@ func GetAllFolderNamesForUser(gs graph.Service, identities []string) (absser.Par // GraphIterateFuncs are iterate functions to be used with the M365 iterators (e.g. msgraphgocore.NewPageIterator) // @returns a callback func that works with msgraphgocore.PageIterator.Iterate function type GraphIterateFunc func( - string, - selectors.ExchangeScope, - error, - bool, - account.M365Config, - map[string]*Collection, - chan<- *support.ConnectorOperationStatus, + tenant string, + scope selectors.ExchangeScope, + errs error, + failFast bool, + credentials account.M365Config, + collections map[string]*Collection, + graphStatusChannel chan<- *support.ConnectorOperationStatus, ) func(any) bool // IterateSelectAllMessageForCollection utility function for @@ -116,6 +131,48 @@ func IterateSelectAllMessagesForCollections( } } +// IterateAllContactsForCollection GraphIterateFunc for moving through +// a ContactsCollectionsResponse using the msgraphgocore paging interface. +// Contacts Ids are placed into a collection based upon the parent folder +func IterateAllContactsForCollection( + tenant string, + scope selectors.ExchangeScope, + errs error, + failFast bool, + credentials account.M365Config, + collections map[string]*Collection, + statusCh chan<- *support.ConnectorOperationStatus, +) func(any) bool { + return func(contactsItem any) bool { + + user := scope.Get(selectors.ExchangeUser)[0] + + contact, ok := contactsItem.(models.Contactable) + if !ok { + errs = support.WrapAndAppend(user, errors.New("contact iteration failure"), errs) + return true + } + directory := *contact.GetParentFolderId() + if _, ok := collections[directory]; !ok { + service, err := createService(credentials, failFast) + if err != nil { + errs = support.WrapAndAppend(user, err, errs) + return true + } + edc := NewCollection( + user, + []string{tenant, user, contactsCategory, directory}, + contacts, + service, + statusCh, + ) + collections[directory] = &edc + } + collections[directory].AddJob(*contact.GetId()) + return true + } +} + func IterateAndFilterMessagesForCollections( tenant string, scope selectors.ExchangeScope, @@ -289,6 +346,22 @@ func optionsForMailFolders(moreOps []string) (*msfolder.MailFoldersRequestBuilde return options, nil } +// optionsForContacts transforms options into select query for MailContacts +// @return is the first call in Contacts().GetWithRequestConfigurationAndResponseHandler(options, handler) +func optionsForContacts(moreOps []string) (*mscontacts.ContactsRequestBuilderGetRequestConfiguration, error) { + selecting, err := buildOptions(moreOps, contacts) + if err != nil { + return nil, err + } + requestParameters := &mscontacts.ContactsRequestBuilderGetQueryParameters{ + Select: selecting, + } + options := &mscontacts.ContactsRequestBuilderGetRequestConfiguration{ + QueryParameters: requestParameters, + } + return options, nil +} + // buildOptions - Utility Method for verifying if select options are valid for the m365 object type // @return is a pair. The first is a string literal of allowable options based on the object type, // the second is an error. An error is returned if an unsupported option or optionIdentifier was used @@ -321,9 +394,16 @@ func buildOptions(options []string, optID optionIdentifier) ([]string, error) { "webLink": 5, "id": 6, } + + fieldsForContacts := map[string]int{ + "id": 1, + "parentFolderId": 2, + } returnedOptions := []string{"id"} switch optID { + case contacts: + allowedOptions = fieldsForContacts case folders: allowedOptions = fieldsForFolders case users: diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 5591eb620..d0d5036c7 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -299,15 +299,17 @@ func (gc *GraphConnector) createCollections( ctx context.Context, scope selectors.ExchangeScope, ) (map[string]*exchange.Collection, error) { - var ( transformer absser.ParsableFactory query exchange.GraphQuery gIter exchange.GraphIterateFunc ) user := scope.Get(selectors.ExchangeUser)[0] - transformer, query, gIter = exchange.SetupExchangeCollectionVars(scope) - response, err := query(&gc.graphService, []string{user}) + transformer, query, gIter, err := exchange.SetupExchangeCollectionVars(scope) + if err != nil { + return nil, support.WrapAndAppend(user, err, nil) + } + response, err := query(&gc.graphService, user) if err != nil { return nil, err } @@ -319,7 +321,7 @@ func (gc *GraphConnector) createCollections( collections := make(map[string]*exchange.Collection) var errs error // callbackFunc iterates through all models.Messageable and fills exchange.Collection.jobs[] - // with corresponding messageIDs. New collections are created for each directory + // with corresponding item IDs. New collections are created for each directory callbackFunc := gIter(gc.tenant, scope, errs, gc.failFast, gc.credentials, collections, gc.statusCh) iterateError := pageIterator.Iterate(callbackFunc) if iterateError != nil { diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index 157528316..6baf0f765 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -126,6 +126,44 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_MailRegressionTe } } +// TestGraphConnector_TestContactSequence verifies retrieval sequence +func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_TestContactSequence() { + userID, err := tester.M365UserID() + require.NoError(suite.T(), err) + sel := selectors.NewExchangeBackup() + sel.Include(sel.Users([]string{userID})) + eb, err := sel.ToExchangeBackup() + require.NoError(suite.T(), err) + scopes := eb.Scopes() + var contactsOnly selectors.ExchangeScope + for _, scope := range scopes { + if scope.IncludesCategory(selectors.ExchangeContactFolder) { + contactsOnly = scope + } + } + collections, err := suite.connector.createCollections(context.Background(), contactsOnly) + assert.NoError(suite.T(), err) + number := 0 + for _, edc := range collections { + testName := fmt.Sprintf("%s_ContactFolder_%d", edc.FullPath()[1], number) + suite.T().Run(testName, func(t *testing.T) { + streamChannel := edc.Items() + for stream := range streamChannel { + buf := &bytes.Buffer{} + read, err := buf.ReadFrom(stream.ToReader()) + suite.NoError(err) + suite.NotZero(read) + message, err := support.CreateMessageFromBytes(buf.Bytes()) + suite.NotNil(message) + suite.NoError(err) + + } + number++ + }) + } + suite.Greater(len(collections), 0) +} + //TestGraphConnector_restoreMessages uses mock data to ensure GraphConnector // is able to restore a messageable item to a Mailbox. func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_restoreMessages() {