GC: Contact Framework added (#473)

`exchange`.Contacts added to Collection retrieval option base through the use of a selector. 

The ExchangeDataCollection Function to be updated to make the function `Live` to be called externally
This commit is contained in:
Danny 2022-08-05 16:19:48 -04:00 committed by GitHub
parent 5070296e18
commit eff95b7702
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 287 additions and 47 deletions

View File

@ -53,7 +53,14 @@ type Collection struct {
// Populater are a class of functions that can be used to fill exchange.Collections with
// the corresponding information
type populater func(context.Context, graph.Service, *Collection, chan<- *support.ConnectorOperationStatus)
type populater func(
ctx context.Context,
service graph.Service,
user string,
jobs []string,
dataChannel chan<- data.Stream,
statusChannel chan<- *support.ConnectorOperationStatus,
)
// NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated
func NewCollection(
@ -80,7 +87,9 @@ func NewCollection(
func getPopulateFunction(optID optionIdentifier) populater {
switch optID {
case messages:
return PopulateFromCollection
return PopulateForMailCollection
case contacts:
return PopulateForContactCollection
default:
return nil
}
@ -95,7 +104,14 @@ func (eoc *Collection) AddJob(objID string) {
// M365 exchange objects and returns the data channel
func (eoc *Collection) Items() <-chan data.Stream {
if eoc.populate != nil {
go eoc.populate(context.TODO(), eoc.service, eoc, eoc.statusCh)
go eoc.populate(
context.TODO(),
eoc.service,
eoc.user,
eoc.jobs,
eoc.data,
eoc.statusCh,
)
}
return eoc.data
}
@ -105,46 +121,113 @@ func (eoc *Collection) FullPath() []string {
return append([]string{}, eoc.fullPath...)
}
// PopulateFromCollection async call to fill DataCollection via channel implementation
func PopulateFromCollection(
func PopulateForContactCollection(
ctx context.Context,
service graph.Service,
eoc *Collection,
user string,
jobs []string,
dataChannel chan<- data.Stream,
statusChannel chan<- *support.ConnectorOperationStatus,
) {
var (
errs error
success int
)
objectWriter := kw.NewJsonSerializationWriter()
for _, task := range jobs {
response, err := service.Client().UsersById(user).ContactsById(task).Get()
if err != nil {
trace := support.ConnectorStackErrorTrace(err)
errs = support.WrapAndAppend(
user,
errors.Wrapf(err, "unable to retrieve item %s; details: %s", task, trace),
errs,
)
continue
}
err = contactToDataCollection(ctx, service.Client(), objectWriter, dataChannel, response, user)
if err != nil {
errs = support.WrapAndAppendf(user, err, errs)
if service.ErrPolicy() {
break
}
continue
}
success++
}
close(dataChannel)
attemptedItems := len(jobs)
status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, 1, errs)
logger.Ctx(ctx).Debug(status.String())
statusChannel <- status
}
// PopulateForMailCollection async call to fill DataCollection via channel implementation
func PopulateForMailCollection(
ctx context.Context,
service graph.Service,
user string,
jobs []string,
dataChannel chan<- data.Stream,
statusChannel chan<- *support.ConnectorOperationStatus,
) {
var errs error
var attemptedItems, success int
objectWriter := kw.NewJsonSerializationWriter()
for _, task := range eoc.jobs {
response, err := service.Client().UsersById(eoc.user).MessagesById(task).Get()
for _, task := range jobs {
response, err := service.Client().UsersById(user).MessagesById(task).Get()
if err != nil {
errDetails := support.ConnectorStackErrorTrace(err)
errs = support.WrapAndAppend(
eoc.user,
errors.Wrapf(err, "unable to retrieve item %s; details %s", task, errDetails),
errs,
)
trace := support.ConnectorStackErrorTrace(err)
errs = support.WrapAndAppend(user, errors.Wrapf(err, "unable to retrieve item %s; details %s", task, trace), errs)
continue
}
err = messageToDataCollection(ctx, service.Client(), objectWriter, eoc.data, response, eoc.user)
success++
err = messageToDataCollection(ctx, service.Client(), objectWriter, dataChannel, response, user)
if err != nil {
errs = support.WrapAndAppendf(eoc.user, err, errs)
success--
}
if errs != nil && service.ErrPolicy() {
break
errs = support.WrapAndAppendf(user, err, errs)
if service.ErrPolicy() {
break
}
continue
}
success++
}
close(eoc.data)
attemptedItems += len(eoc.jobs)
close(dataChannel)
attemptedItems += len(jobs)
status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, 1, errs)
logger.Ctx(ctx).Debug(status.String())
statusChannel <- status
}
func contactToDataCollection(
ctx context.Context,
client *msgraphsdk.GraphServiceClient,
objectWriter *kw.JsonSerializationWriter,
dataChannel chan<- data.Stream,
contact models.Contactable,
user string,
) error {
defer objectWriter.Close()
err := objectWriter.WriteObjectValue("", contact)
if err != nil {
return support.SetNonRecoverableError(errors.Wrap(err, *contact.GetId()))
}
byteArray, err := objectWriter.GetSerializedContent()
if err != nil {
return support.WrapAndAppend(*contact.GetId(), err, nil)
}
if byteArray != nil {
dataChannel <- &Stream{id: *contact.GetId(), message: byteArray, info: nil}
}
return nil
}
func messageToDataCollection(
ctx context.Context,
client *msgraphsdk.GraphServiceClient,

View File

@ -89,14 +89,16 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_Items() {
expected := 5
testFunction := func(ctx context.Context,
service graph.Service,
eoc *Collection,
user string,
jobs []string,
dataChannel chan<- data.Stream,
notUsed chan<- *support.ConnectorOperationStatus) {
detail := &details.ExchangeInfo{Sender: "foo@bar.com", Subject: "Hello world!", Received: time.Now()}
for i := 0; i < expected; i++ {
temp := NewStream(uuid.NewString(), mockconnector.GetMockMessageBytes("Test_Items()"), *detail)
eoc.data <- &temp
dataChannel <- &temp
}
close(eoc.data)
close(dataChannel)
}
eoc := Collection{

View File

@ -4,7 +4,11 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/tester"
"github.com/alcionai/corso/pkg/selectors"
)
type ExchangeServiceSuite struct {
@ -91,3 +95,27 @@ func (suite *ExchangeServiceSuite) TestExchangeService_optionsForFolders() {
}
}
// NOTE the requirements are in PR 475
func (suite *ExchangeServiceSuite) TestExchangeService_SetupExchangeCollection() {
userID, err := tester.M365UserID()
require.NoError(suite.T(), err)
sel := selectors.NewExchangeBackup()
sel.Include(sel.Users([]string{userID}))
eb, err := sel.ToExchangeBackup()
require.NoError(suite.T(), err)
scopes := eb.Scopes()
for _, test := range scopes {
suite.T().Run(test.Category().String(), func(t *testing.T) {
discriminateFunc, graphQuery, iterFunc, err := SetupExchangeCollectionVars(test)
if test.Category() == selectors.ExchangeMailFolder ||
test.Category() == selectors.ExchangeContactFolder {
assert.NoError(t, err)
assert.NotNil(t, discriminateFunc)
assert.NotNil(t, graphQuery)
assert.NotNil(t, iterFunc)
}
})
}
}

View File

@ -179,22 +179,29 @@ func SetupExchangeCollectionVars(scope selectors.ExchangeScope) (
absser.ParsableFactory,
GraphQuery,
GraphIterateFunc,
error,
) {
if scope.IncludesCategory(selectors.ExchangeMail) {
folders := scope.Get(selectors.ExchangeMailFolder)
if folders[0] == selectors.AnyTgt {
if scope.IsAny(selectors.ExchangeMailFolder) {
return models.CreateMessageCollectionResponseFromDiscriminatorValue,
GetAllMessagesForUser,
IterateSelectAllMessagesForCollections
IterateSelectAllMessagesForCollections,
nil
}
return models.CreateMessageCollectionResponseFromDiscriminatorValue,
GetAllMessagesForUser,
IterateAndFilterMessagesForCollections
IterateAndFilterMessagesForCollections,
nil
}
return nil, nil, nil
if scope.IncludesCategory(selectors.ExchangeContactFolder) {
return models.CreateContactFromDiscriminatorValue,
GetAllContactsForUser,
IterateAllContactsForCollection,
nil
}
return nil, nil, nil, errors.New("exchange scope option not supported")
}
// GetCopyRestoreFolder utility function to create an unique folder for the restore process

View File

@ -6,6 +6,7 @@ import (
absser "github.com/microsoft/kiota-abstractions-go/serialization"
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
"github.com/microsoftgraph/msgraph-sdk-go/models"
mscontacts "github.com/microsoftgraph/msgraph-sdk-go/users/item/contacts"
msfolder "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders"
msmessage "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages"
msitem "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages/item"
@ -19,7 +20,10 @@ import (
type optionIdentifier int
const mailCategory = "mail"
const (
mailCategory = "mail"
contactsCategory = "contacts"
)
//go:generate stringer -type=optionIdentifier
const (
@ -27,23 +31,34 @@ const (
folders
messages
users
contacts
)
// GraphQuery represents functions which perform exchange-specific queries
// into M365 backstore.
//TODO: use selector or path for granularity into specific folders or specific date ranges
type GraphQuery func(graph.Service, []string) (absser.Parsable, error)
type GraphQuery func(graph.Service, string) (absser.Parsable, error)
// GetAllMessagesForUser is a GraphQuery function for receiving all messages for a single user
func GetAllMessagesForUser(gs graph.Service, identities []string) (absser.Parsable, error) {
func GetAllMessagesForUser(gs graph.Service, user string) (absser.Parsable, error) {
selecting := []string{"id", "parentFolderId"}
options, err := optionsForMessages(selecting)
if err != nil {
return nil, err
}
return gs.Client().UsersById(user).Messages().GetWithRequestConfigurationAndResponseHandler(options, nil)
}
// GetAllContactsForUser is a GraphQuery function for querying all the contacts in a user's account
func GetAllContactsForUser(gs graph.Service, user string) (absser.Parsable, error) {
selecting := []string{"id", "parentFolderId"}
options, err := optionsForContacts(selecting)
if err != nil {
return nil, err
}
return gs.Client().UsersById(user).Contacts().GetWithRequestConfigurationAndResponseHandler(options, nil)
return gs.Client().UsersById(identities[0]).Messages().GetWithRequestConfigurationAndResponseHandler(options, nil)
}
// GetAllFolderDisplayNamesForUser is a GraphQuery function for getting FolderId and display
@ -62,13 +77,13 @@ func GetAllFolderNamesForUser(gs graph.Service, identities []string) (absser.Par
// GraphIterateFuncs are iterate functions to be used with the M365 iterators (e.g. msgraphgocore.NewPageIterator)
// @returns a callback func that works with msgraphgocore.PageIterator.Iterate function
type GraphIterateFunc func(
string,
selectors.ExchangeScope,
error,
bool,
account.M365Config,
map[string]*Collection,
chan<- *support.ConnectorOperationStatus,
tenant string,
scope selectors.ExchangeScope,
errs error,
failFast bool,
credentials account.M365Config,
collections map[string]*Collection,
graphStatusChannel chan<- *support.ConnectorOperationStatus,
) func(any) bool
// IterateSelectAllMessageForCollection utility function for
@ -116,6 +131,48 @@ func IterateSelectAllMessagesForCollections(
}
}
// IterateAllContactsForCollection GraphIterateFunc for moving through
// a ContactsCollectionsResponse using the msgraphgocore paging interface.
// Contacts Ids are placed into a collection based upon the parent folder
func IterateAllContactsForCollection(
tenant string,
scope selectors.ExchangeScope,
errs error,
failFast bool,
credentials account.M365Config,
collections map[string]*Collection,
statusCh chan<- *support.ConnectorOperationStatus,
) func(any) bool {
return func(contactsItem any) bool {
user := scope.Get(selectors.ExchangeUser)[0]
contact, ok := contactsItem.(models.Contactable)
if !ok {
errs = support.WrapAndAppend(user, errors.New("contact iteration failure"), errs)
return true
}
directory := *contact.GetParentFolderId()
if _, ok := collections[directory]; !ok {
service, err := createService(credentials, failFast)
if err != nil {
errs = support.WrapAndAppend(user, err, errs)
return true
}
edc := NewCollection(
user,
[]string{tenant, user, contactsCategory, directory},
contacts,
service,
statusCh,
)
collections[directory] = &edc
}
collections[directory].AddJob(*contact.GetId())
return true
}
}
func IterateAndFilterMessagesForCollections(
tenant string,
scope selectors.ExchangeScope,
@ -289,6 +346,22 @@ func optionsForMailFolders(moreOps []string) (*msfolder.MailFoldersRequestBuilde
return options, nil
}
// optionsForContacts transforms options into select query for MailContacts
// @return is the first call in Contacts().GetWithRequestConfigurationAndResponseHandler(options, handler)
func optionsForContacts(moreOps []string) (*mscontacts.ContactsRequestBuilderGetRequestConfiguration, error) {
selecting, err := buildOptions(moreOps, contacts)
if err != nil {
return nil, err
}
requestParameters := &mscontacts.ContactsRequestBuilderGetQueryParameters{
Select: selecting,
}
options := &mscontacts.ContactsRequestBuilderGetRequestConfiguration{
QueryParameters: requestParameters,
}
return options, nil
}
// buildOptions - Utility Method for verifying if select options are valid for the m365 object type
// @return is a pair. The first is a string literal of allowable options based on the object type,
// the second is an error. An error is returned if an unsupported option or optionIdentifier was used
@ -321,9 +394,16 @@ func buildOptions(options []string, optID optionIdentifier) ([]string, error) {
"webLink": 5,
"id": 6,
}
fieldsForContacts := map[string]int{
"id": 1,
"parentFolderId": 2,
}
returnedOptions := []string{"id"}
switch optID {
case contacts:
allowedOptions = fieldsForContacts
case folders:
allowedOptions = fieldsForFolders
case users:

View File

@ -299,15 +299,17 @@ func (gc *GraphConnector) createCollections(
ctx context.Context,
scope selectors.ExchangeScope,
) (map[string]*exchange.Collection, error) {
var (
transformer absser.ParsableFactory
query exchange.GraphQuery
gIter exchange.GraphIterateFunc
)
user := scope.Get(selectors.ExchangeUser)[0]
transformer, query, gIter = exchange.SetupExchangeCollectionVars(scope)
response, err := query(&gc.graphService, []string{user})
transformer, query, gIter, err := exchange.SetupExchangeCollectionVars(scope)
if err != nil {
return nil, support.WrapAndAppend(user, err, nil)
}
response, err := query(&gc.graphService, user)
if err != nil {
return nil, err
}
@ -319,7 +321,7 @@ func (gc *GraphConnector) createCollections(
collections := make(map[string]*exchange.Collection)
var errs error
// callbackFunc iterates through all models.Messageable and fills exchange.Collection.jobs[]
// with corresponding messageIDs. New collections are created for each directory
// with corresponding item IDs. New collections are created for each directory
callbackFunc := gIter(gc.tenant, scope, errs, gc.failFast, gc.credentials, collections, gc.statusCh)
iterateError := pageIterator.Iterate(callbackFunc)
if iterateError != nil {

View File

@ -126,6 +126,44 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_MailRegressionTe
}
}
// TestGraphConnector_TestContactSequence verifies retrieval sequence
func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_TestContactSequence() {
userID, err := tester.M365UserID()
require.NoError(suite.T(), err)
sel := selectors.NewExchangeBackup()
sel.Include(sel.Users([]string{userID}))
eb, err := sel.ToExchangeBackup()
require.NoError(suite.T(), err)
scopes := eb.Scopes()
var contactsOnly selectors.ExchangeScope
for _, scope := range scopes {
if scope.IncludesCategory(selectors.ExchangeContactFolder) {
contactsOnly = scope
}
}
collections, err := suite.connector.createCollections(context.Background(), contactsOnly)
assert.NoError(suite.T(), err)
number := 0
for _, edc := range collections {
testName := fmt.Sprintf("%s_ContactFolder_%d", edc.FullPath()[1], number)
suite.T().Run(testName, func(t *testing.T) {
streamChannel := edc.Items()
for stream := range streamChannel {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
suite.NoError(err)
suite.NotZero(read)
message, err := support.CreateMessageFromBytes(buf.Bytes())
suite.NotNil(message)
suite.NoError(err)
}
number++
})
}
suite.Greater(len(collections), 0)
}
//TestGraphConnector_restoreMessages uses mock data to ensure GraphConnector
// is able to restore a messageable item to a Mailbox.
func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_restoreMessages() {