From 9d18d50cf87e4790e25248b197d64bd04cee8147 Mon Sep 17 00:00:00 2001 From: Danny Date: Fri, 19 Aug 2022 11:19:42 -0400 Subject: [PATCH] Populate function Refactor to single function (#585) Populate functions streamlined into one function with a few additional abstractions --- .../exchange/exchange_data_collection.go | 265 +++++++----------- .../exchange/exchange_data_collection_test.go | 44 --- .../connector/exchange/service_query.go | 29 +- 3 files changed, 129 insertions(+), 209 deletions(-) diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index e6b3fc44b..c631fe54c 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -6,8 +6,10 @@ package exchange import ( "bytes" "context" + "fmt" "io" + absser "github.com/microsoft/kiota-abstractions-go/serialization" kw "github.com/microsoft/kiota-serialization-json-go" msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" "github.com/microsoftgraph/msgraph-sdk-go/models" @@ -46,25 +48,14 @@ type Collection struct { jobs []string // service - client/adapter pair used to access M365 back store service graph.Service - // populate - Utility function to populate collection based on the M365 application type and granularity - populate populater - statusCh chan<- *support.ConnectorOperationStatus + + collectionType optionIdentifier + statusCh chan<- *support.ConnectorOperationStatus // FullPath is the slice representation of the action context passed down through the hierarchy. // The original request can be gleaned from the slice. (e.g. {, , "emails"}) fullPath []string } -// Populater are a class of functions that can be used to fill exchange.Collections with -// the corresponding information -type populater func( - ctx context.Context, - service graph.Service, - user string, - jobs []string, - dataChannel chan<- data.Stream, - statusChannel chan<- *support.ConnectorOperationStatus, -) - // NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated func NewCollection( user string, @@ -74,195 +65,135 @@ func NewCollection( statusCh chan<- *support.ConnectorOperationStatus, ) Collection { collection := Collection{ - user: user, - data: make(chan data.Stream, collectionChannelBufferSize), - jobs: make([]string, 0), - service: service, - statusCh: statusCh, - fullPath: fullPath, - populate: getPopulateFunction(collectionType), + user: user, + data: make(chan data.Stream, collectionChannelBufferSize), + jobs: make([]string, 0), + service: service, + statusCh: statusCh, + fullPath: fullPath, + collectionType: collectionType, } return collection } -// getPopulateFunction is a function to set populate function field -// with exchange-application specific functions -func getPopulateFunction(optID optionIdentifier) populater { - switch optID { - case messages: - return PopulateForMailCollection - case contacts: - return PopulateForContactCollection - case events: - return PopulateForEventCollection - default: - return nil - } -} - // AddJob appends additional objectID to structure's jobs field -func (eoc *Collection) AddJob(objID string) { - eoc.jobs = append(eoc.jobs, objID) +func (col *Collection) AddJob(objID string) { + col.jobs = append(col.jobs, objID) } // Items utility function to asynchronously execute process to fill data channel with // M365 exchange objects and returns the data channel -func (eoc *Collection) Items() <-chan data.Stream { - if eoc.populate != nil { - go eoc.populate( - context.TODO(), - eoc.service, - eoc.user, - eoc.jobs, - eoc.data, - eoc.statusCh, - ) +func (col *Collection) Items() <-chan data.Stream { + go col.populateByOptionIdentifier(context.TODO()) + return col.data +} + +// GetQueryAndSerializeFunc helper function that returns the two functions functions +// required to convert M365 identifier into a byte array filled with the serialized data +func GetQueryAndSerializeFunc(optID optionIdentifier) (GraphRetrievalFunc, GraphSerializeFunc) { + switch optID { + case contacts: + return RetrieveContactDataForUser, contactToDataCollection + case events: + return RetrieveEventDataForUser, eventToDataCollection + case messages: + return RetrieveMessageDataForUser, messageToDataCollection + // Unsupported options returns nil, nil + default: + return nil, nil } - return eoc.data } // FullPath returns the Collection's fullPath []string -func (eoc *Collection) FullPath() []string { - return append([]string{}, eoc.fullPath...) +func (col *Collection) FullPath() []string { + return append([]string{}, col.fullPath...) } -func PopulateForContactCollection( +// populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize +// all the M365IDs defined in the jobs field. data channel is closed by this function +func (col *Collection) populateByOptionIdentifier( ctx context.Context, - service graph.Service, - user string, - jobs []string, - dataChannel chan<- data.Stream, - statusChannel chan<- *support.ConnectorOperationStatus, ) { var ( errs error success int ) + defer func() { + col.finishPopulation(ctx, success, errs) + }() + user := col.user objectWriter := kw.NewJsonSerializationWriter() + // get QueryBasedonIdentifier + // verify that it is the correct type in called function + // serializationFunction + query, serializeFunc := GetQueryAndSerializeFunc(col.collectionType) + if query == nil { + errs = fmt.Errorf("unrecognized collection type: %s", col.collectionType.String()) + return + } - for _, task := range jobs { - response, err := service.Client().UsersById(user).ContactsById(task).Get() - if err != nil { - trace := support.ConnectorStackErrorTrace(err) - errs = support.WrapAndAppend( - user, - errors.Wrapf(err, "unable to retrieve item %s; details: %s", task, trace), - errs, - ) - continue - } - err = contactToDataCollection(ctx, service.Client(), objectWriter, dataChannel, response, user) + for _, identifier := range col.jobs { + response, err := query(col.service, user, identifier) if err != nil { errs = support.WrapAndAppendf(user, err, errs) - if service.ErrPolicy() { + if col.service.ErrPolicy() { break } continue } - - success++ - - } - close(dataChannel) - attemptedItems := len(jobs) - status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, 1, errs) - logger.Ctx(ctx).Debug(status.String()) - statusChannel <- status -} - -// PopulateForMailCollection async call to fill DataCollection via channel implementation -func PopulateForMailCollection( - ctx context.Context, - service graph.Service, - user string, - jobs []string, - dataChannel chan<- data.Stream, - statusChannel chan<- *support.ConnectorOperationStatus, -) { - var errs error - var attemptedItems, success int - objectWriter := kw.NewJsonSerializationWriter() - - for _, task := range jobs { - response, err := service.Client().UsersById(user).MessagesById(task).Get() - if err != nil { - trace := support.ConnectorStackErrorTrace(err) - errs = support.WrapAndAppend(user, errors.Wrapf(err, "unable to retrieve item %s; details %s", task, trace), errs) - continue - } - err = messageToDataCollection(ctx, service.Client(), objectWriter, dataChannel, response, user) + err = serializeFunc(ctx, col.service.Client(), objectWriter, col.data, response, user) if err != nil { errs = support.WrapAndAppendf(user, err, errs) - if service.ErrPolicy() { + if col.service.ErrPolicy() { break } continue } + success++ } - close(dataChannel) - attemptedItems += len(jobs) - - status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, 1, errs) - logger.Ctx(ctx).Debug(status.String()) - statusChannel <- status } -func PopulateForEventCollection( +// terminatePopulateSequence is a utility function used to close a Collection's data channel +// and to send the status update through the channel. +func (col *Collection) finishPopulation(ctx context.Context, success int, errs error) { + close(col.data) + attempted := len(col.jobs) + status := support.CreateStatus(ctx, support.Backup, attempted, success, 1, errs) + logger.Ctx(ctx).Debug(status.String()) + col.statusCh <- status +} + +// GraphSerializeFunc are class of functions that are used by Collections to transform GraphRetrievalFunc +// responses into data.Stream items contained within the Collection +type GraphSerializeFunc func( ctx context.Context, - service graph.Service, - user string, - jobs []string, + client *msgraphsdk.GraphServiceClient, + objectWriter *kw.JsonSerializationWriter, dataChannel chan<- data.Stream, - statusChannel chan<- *support.ConnectorOperationStatus, -) { - var ( - errs error - attemptedItems, success int - ) - objectWriter := kw.NewJsonSerializationWriter() - - for _, task := range jobs { - response, err := service.Client().UsersById(user).EventsById(task).Get() - if err != nil { - trace := support.ConnectorStackErrorTrace(err) - errs = support.WrapAndAppend( - user, - errors.Wrapf(err, "unable to retrieve items %s; details: %s", task, trace), - errs, - ) - continue - } - err = eventToDataCollection(ctx, service.Client(), objectWriter, dataChannel, response, user) - if err != nil { - errs = support.WrapAndAppend(user, err, errs) - - if service.ErrPolicy() { - break - } - continue - } - success++ - } - close(dataChannel) - attemptedItems += len(jobs) - status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, 1, errs) - logger.Ctx(ctx).Debug(status.String()) - statusChannel <- status -} + parsable absser.Parsable, + user string, +) error +// eventToDataCollection is a GraphSerializeFunc used to serialize models.Eventable objects into +// data.Stream objects. Returns an error the process finishes unsuccessfully. func eventToDataCollection( ctx context.Context, client *msgraphsdk.GraphServiceClient, objectWriter *kw.JsonSerializationWriter, dataChannel chan<- data.Stream, - event models.Eventable, + parsable absser.Parsable, user string, ) error { var err error defer objectWriter.Close() + event, ok := parsable.(models.Eventable) + if !ok { + return fmt.Errorf("expected Eventable, got %T", parsable) + } + if *event.GetHasAttachments() { var retriesErr error for count := 0; count < numberOfRetries; count++ { @@ -299,15 +230,20 @@ func eventToDataCollection( return nil } +// contactToDataCollection is a GraphSerializeFunc for models.Contactable func contactToDataCollection( ctx context.Context, client *msgraphsdk.GraphServiceClient, objectWriter *kw.JsonSerializationWriter, dataChannel chan<- data.Stream, - contact models.Contactable, + parsable absser.Parsable, user string, ) error { defer objectWriter.Close() + contact, ok := parsable.(models.Contactable) + if !ok { + return fmt.Errorf("expected Contactable, got %T", parsable) + } err := objectWriter.WriteObjectValue("", contact) if err != nil { return support.SetNonRecoverableError(errors.Wrap(err, *contact.GetId())) @@ -317,24 +253,29 @@ func contactToDataCollection( return support.WrapAndAppend(*contact.GetId(), err, nil) } if byteArray != nil { - dataChannel <- &Stream{id: *contact.GetId(), message: byteArray, info: nil} + dataChannel <- &Stream{id: *contact.GetId(), message: byteArray, info: ContactInfo(contact)} } return nil } +// messageToDataCollection is the GraphSerializeFunc for models.Messageable func messageToDataCollection( ctx context.Context, client *msgraphsdk.GraphServiceClient, objectWriter *kw.JsonSerializationWriter, dataChannel chan<- data.Stream, - message models.Messageable, + parsable absser.Parsable, user string, ) error { var err error - aMessage := message - adtl := message.GetAdditionalData() + defer objectWriter.Close() + aMessage, ok := parsable.(models.Messageable) + if !ok { + return fmt.Errorf("expected Messageable, got %T", parsable) + } + adtl := aMessage.GetAdditionalData() if len(adtl) > 2 { - aMessage, err = support.ConvertFromMessageable(adtl, message) + aMessage, err = support.ConvertFromMessageable(adtl, aMessage) if err != nil { return err } @@ -349,7 +290,7 @@ func messageToDataCollection( Attachments(). Get() retriesErr = err - if err == nil && attached != nil { + if err == nil { aMessage.SetAttachments(attached.GetValue()) break } @@ -359,19 +300,19 @@ func messageToDataCollection( return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil) } } + err = objectWriter.WriteObjectValue("", aMessage) if err != nil { return support.SetNonRecoverableError(errors.Wrapf(err, "%s", *aMessage.GetId())) } byteArray, err := objectWriter.GetSerializedContent() - objectWriter.Close() if err != nil { - return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil) - } - if byteArray != nil { - dataChannel <- &Stream{id: *aMessage.GetId(), message: byteArray, info: MessageInfo(aMessage)} + err = support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil) + return support.SetNonRecoverableError(err) } + + dataChannel <- &Stream{id: *aMessage.GetId(), message: byteArray, info: MessageInfo(aMessage)} return nil } diff --git a/src/internal/connector/exchange/exchange_data_collection_test.go b/src/internal/connector/exchange/exchange_data_collection_test.go index 08310c9be..ed864eaf4 100644 --- a/src/internal/connector/exchange/exchange_data_collection_test.go +++ b/src/internal/connector/exchange/exchange_data_collection_test.go @@ -2,19 +2,10 @@ package exchange import ( "bytes" - "context" "testing" - "time" - "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" - - "github.com/alcionai/corso/internal/connector/graph" - "github.com/alcionai/corso/internal/connector/mockconnector" - "github.com/alcionai/corso/internal/connector/support" - "github.com/alcionai/corso/internal/data" - "github.com/alcionai/corso/pkg/backup/details" ) type ExchangeDataCollectionSuite struct { @@ -82,38 +73,3 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_AddJob() { } suite.Equal(len(shopping), len(eoc.jobs)) } - -// TestExchangeCollection_Items() tests for the Collection.Items() ability -// to asynchronously fill `data` field with Stream objects -func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_Items() { - expected := 5 - testFunction := func(ctx context.Context, - service graph.Service, - user string, - jobs []string, - dataChannel chan<- data.Stream, - notUsed chan<- *support.ConnectorOperationStatus, - ) { - detail := &details.ExchangeInfo{Sender: "foo@bar.com", Subject: "Hello world!", Received: time.Now()} - for i := 0; i < expected; i++ { - temp := NewStream(uuid.NewString(), mockconnector.GetMockMessageBytes("Test_Items()"), *detail) - dataChannel <- &temp - } - close(dataChannel) - } - - eoc := Collection{ - user: "Dexter", - fullPath: []string{"Today", "is", "currently", "different"}, - data: make(chan data.Stream, expected), - populate: testFunction, - } - t := suite.T() - itemsReturn := eoc.Items() - retrieved := 0 - for item := range itemsReturn { - assert.NotNil(t, item) - retrieved++ - } - suite.Equal(expected, retrieved) -} diff --git a/src/internal/connector/exchange/service_query.go b/src/internal/connector/exchange/service_query.go index d91aa4382..428fb9449 100644 --- a/src/internal/connector/exchange/service_query.go +++ b/src/internal/connector/exchange/service_query.go @@ -93,7 +93,8 @@ const ( ) // GraphQuery represents functions which perform exchange-specific queries -// into M365 backstore. +// into M365 backstore. Responses -> returned items will only contain the information +// that is included in the options // TODO: use selector or path for granularity into specific folders or specific date ranges type GraphQuery func(graph.Service, string) (absser.Parsable, error) @@ -131,10 +132,10 @@ func GetAllFolderNamesForUser(gs graph.Service, user string) (absser.Parsable, e return gs.Client().UsersById(user).MailFolders().GetWithRequestConfigurationAndResponseHandler(options, nil) } -// GetAllEvents for User. Default returns EventResponseCollection for events in the future +// GetAllEvents for User. Default returns EventResponseCollection for future events. // of the time that the call was made. There a func GetAllEventsForUser(gs graph.Service, user string) (absser.Parsable, error) { - options, err := optionsForEvents([]string{"id", "calendar"}) + options, err := optionsForEvents([]string{"id"}) if err != nil { return nil, err } @@ -142,6 +143,28 @@ func GetAllEventsForUser(gs graph.Service, user string) (absser.Parsable, error) return gs.Client().UsersById(user).Events().GetWithRequestConfigurationAndResponseHandler(options, nil) } +// GraphRetrievalFunctions are functions from the Microsoft Graph API that retrieve +// the default associated data of a M365 object. This varies by object. Additional +// Queries must be run to obtain the omitted fields. +type GraphRetrievalFunc func(gs graph.Service, user, m365ID string) (absser.Parsable, error) + +// RetrieveContactDataForUser is a GraphRetrievalFun that returns all associated fields. +func RetrieveContactDataForUser(gs graph.Service, user, m365ID string) (absser.Parsable, error) { + return gs.Client().UsersById(user).ContactsById(m365ID).Get() +} + +// RetrieveEventDataForUser is a GraphRetrievalFunc that returns event data. +// Calendarable and attachment fields are omitted due to size +func RetrieveEventDataForUser(gs graph.Service, user, m365ID string) (absser.Parsable, error) { + return gs.Client().UsersById(user).EventsById(m365ID).Get() +} + +// RetrieveMessageDataForUser is a GraphRetrievalFunc that returns message data. +// Attachment field is omitted due to size. +func RetrieveMessageDataForUser(gs graph.Service, user, m365ID string) (absser.Parsable, error) { + return gs.Client().UsersById(user).MessagesById(m365ID).Get() +} + // GraphIterateFuncs are iterate functions to be used with the M365 iterators (e.g. msgraphgocore.NewPageIterator) // @returns a callback func that works with msgraphgocore.PageIterator.Iterate function type GraphIterateFunc func(