Populate function Refactor to single function (#585)

Populate functions streamlined into one function with a few additional abstractions
This commit is contained in:
Danny 2022-08-19 11:19:42 -04:00 committed by GitHub
parent 854635ac24
commit 9d18d50cf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 129 additions and 209 deletions

View File

@ -6,8 +6,10 @@ package exchange
import (
"bytes"
"context"
"fmt"
"io"
absser "github.com/microsoft/kiota-abstractions-go/serialization"
kw "github.com/microsoft/kiota-serialization-json-go"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
"github.com/microsoftgraph/msgraph-sdk-go/models"
@ -46,25 +48,14 @@ type Collection struct {
jobs []string
// service - client/adapter pair used to access M365 back store
service graph.Service
// populate - Utility function to populate collection based on the M365 application type and granularity
populate populater
collectionType optionIdentifier
statusCh chan<- *support.ConnectorOperationStatus
// FullPath is the slice representation of the action context passed down through the hierarchy.
// The original request can be gleaned from the slice. (e.g. {<tenant ID>, <user ID>, "emails"})
fullPath []string
}
// Populater are a class of functions that can be used to fill exchange.Collections with
// the corresponding information
type populater func(
ctx context.Context,
service graph.Service,
user string,
jobs []string,
dataChannel chan<- data.Stream,
statusChannel chan<- *support.ConnectorOperationStatus,
)
// NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated
func NewCollection(
user string,
@ -80,189 +71,129 @@ func NewCollection(
service: service,
statusCh: statusCh,
fullPath: fullPath,
populate: getPopulateFunction(collectionType),
collectionType: collectionType,
}
return collection
}
// getPopulateFunction is a function to set populate function field
// with exchange-application specific functions
func getPopulateFunction(optID optionIdentifier) populater {
switch optID {
case messages:
return PopulateForMailCollection
case contacts:
return PopulateForContactCollection
case events:
return PopulateForEventCollection
default:
return nil
}
}
// AddJob appends additional objectID to structure's jobs field
func (eoc *Collection) AddJob(objID string) {
eoc.jobs = append(eoc.jobs, objID)
func (col *Collection) AddJob(objID string) {
col.jobs = append(col.jobs, objID)
}
// Items utility function to asynchronously execute process to fill data channel with
// M365 exchange objects and returns the data channel
func (eoc *Collection) Items() <-chan data.Stream {
if eoc.populate != nil {
go eoc.populate(
context.TODO(),
eoc.service,
eoc.user,
eoc.jobs,
eoc.data,
eoc.statusCh,
)
func (col *Collection) Items() <-chan data.Stream {
go col.populateByOptionIdentifier(context.TODO())
return col.data
}
// GetQueryAndSerializeFunc helper function that returns the two functions functions
// required to convert M365 identifier into a byte array filled with the serialized data
func GetQueryAndSerializeFunc(optID optionIdentifier) (GraphRetrievalFunc, GraphSerializeFunc) {
switch optID {
case contacts:
return RetrieveContactDataForUser, contactToDataCollection
case events:
return RetrieveEventDataForUser, eventToDataCollection
case messages:
return RetrieveMessageDataForUser, messageToDataCollection
// Unsupported options returns nil, nil
default:
return nil, nil
}
return eoc.data
}
// FullPath returns the Collection's fullPath []string
func (eoc *Collection) FullPath() []string {
return append([]string{}, eoc.fullPath...)
func (col *Collection) FullPath() []string {
return append([]string{}, col.fullPath...)
}
func PopulateForContactCollection(
// populateByOptionIdentifier is a utility function that uses col.collectionType to be able to serialize
// all the M365IDs defined in the jobs field. data channel is closed by this function
func (col *Collection) populateByOptionIdentifier(
ctx context.Context,
service graph.Service,
user string,
jobs []string,
dataChannel chan<- data.Stream,
statusChannel chan<- *support.ConnectorOperationStatus,
) {
var (
errs error
success int
)
defer func() {
col.finishPopulation(ctx, success, errs)
}()
user := col.user
objectWriter := kw.NewJsonSerializationWriter()
for _, task := range jobs {
response, err := service.Client().UsersById(user).ContactsById(task).Get()
if err != nil {
trace := support.ConnectorStackErrorTrace(err)
errs = support.WrapAndAppend(
user,
errors.Wrapf(err, "unable to retrieve item %s; details: %s", task, trace),
errs,
)
continue
// get QueryBasedonIdentifier
// verify that it is the correct type in called function
// serializationFunction
query, serializeFunc := GetQueryAndSerializeFunc(col.collectionType)
if query == nil {
errs = fmt.Errorf("unrecognized collection type: %s", col.collectionType.String())
return
}
err = contactToDataCollection(ctx, service.Client(), objectWriter, dataChannel, response, user)
for _, identifier := range col.jobs {
response, err := query(col.service, user, identifier)
if err != nil {
errs = support.WrapAndAppendf(user, err, errs)
if service.ErrPolicy() {
if col.service.ErrPolicy() {
break
}
continue
}
success++
}
close(dataChannel)
attemptedItems := len(jobs)
status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, 1, errs)
logger.Ctx(ctx).Debug(status.String())
statusChannel <- status
}
// PopulateForMailCollection async call to fill DataCollection via channel implementation
func PopulateForMailCollection(
ctx context.Context,
service graph.Service,
user string,
jobs []string,
dataChannel chan<- data.Stream,
statusChannel chan<- *support.ConnectorOperationStatus,
) {
var errs error
var attemptedItems, success int
objectWriter := kw.NewJsonSerializationWriter()
for _, task := range jobs {
response, err := service.Client().UsersById(user).MessagesById(task).Get()
if err != nil {
trace := support.ConnectorStackErrorTrace(err)
errs = support.WrapAndAppend(user, errors.Wrapf(err, "unable to retrieve item %s; details %s", task, trace), errs)
continue
}
err = messageToDataCollection(ctx, service.Client(), objectWriter, dataChannel, response, user)
err = serializeFunc(ctx, col.service.Client(), objectWriter, col.data, response, user)
if err != nil {
errs = support.WrapAndAppendf(user, err, errs)
if service.ErrPolicy() {
if col.service.ErrPolicy() {
break
}
continue
}
success++
}
close(dataChannel)
attemptedItems += len(jobs)
status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, 1, errs)
logger.Ctx(ctx).Debug(status.String())
statusChannel <- status
}
func PopulateForEventCollection(
// terminatePopulateSequence is a utility function used to close a Collection's data channel
// and to send the status update through the channel.
func (col *Collection) finishPopulation(ctx context.Context, success int, errs error) {
close(col.data)
attempted := len(col.jobs)
status := support.CreateStatus(ctx, support.Backup, attempted, success, 1, errs)
logger.Ctx(ctx).Debug(status.String())
col.statusCh <- status
}
// GraphSerializeFunc are class of functions that are used by Collections to transform GraphRetrievalFunc
// responses into data.Stream items contained within the Collection
type GraphSerializeFunc func(
ctx context.Context,
service graph.Service,
user string,
jobs []string,
client *msgraphsdk.GraphServiceClient,
objectWriter *kw.JsonSerializationWriter,
dataChannel chan<- data.Stream,
statusChannel chan<- *support.ConnectorOperationStatus,
) {
var (
errs error
attemptedItems, success int
)
objectWriter := kw.NewJsonSerializationWriter()
for _, task := range jobs {
response, err := service.Client().UsersById(user).EventsById(task).Get()
if err != nil {
trace := support.ConnectorStackErrorTrace(err)
errs = support.WrapAndAppend(
user,
errors.Wrapf(err, "unable to retrieve items %s; details: %s", task, trace),
errs,
)
continue
}
err = eventToDataCollection(ctx, service.Client(), objectWriter, dataChannel, response, user)
if err != nil {
errs = support.WrapAndAppend(user, err, errs)
if service.ErrPolicy() {
break
}
continue
}
success++
}
close(dataChannel)
attemptedItems += len(jobs)
status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, 1, errs)
logger.Ctx(ctx).Debug(status.String())
statusChannel <- status
}
parsable absser.Parsable,
user string,
) error
// eventToDataCollection is a GraphSerializeFunc used to serialize models.Eventable objects into
// data.Stream objects. Returns an error the process finishes unsuccessfully.
func eventToDataCollection(
ctx context.Context,
client *msgraphsdk.GraphServiceClient,
objectWriter *kw.JsonSerializationWriter,
dataChannel chan<- data.Stream,
event models.Eventable,
parsable absser.Parsable,
user string,
) error {
var err error
defer objectWriter.Close()
event, ok := parsable.(models.Eventable)
if !ok {
return fmt.Errorf("expected Eventable, got %T", parsable)
}
if *event.GetHasAttachments() {
var retriesErr error
for count := 0; count < numberOfRetries; count++ {
@ -299,15 +230,20 @@ func eventToDataCollection(
return nil
}
// contactToDataCollection is a GraphSerializeFunc for models.Contactable
func contactToDataCollection(
ctx context.Context,
client *msgraphsdk.GraphServiceClient,
objectWriter *kw.JsonSerializationWriter,
dataChannel chan<- data.Stream,
contact models.Contactable,
parsable absser.Parsable,
user string,
) error {
defer objectWriter.Close()
contact, ok := parsable.(models.Contactable)
if !ok {
return fmt.Errorf("expected Contactable, got %T", parsable)
}
err := objectWriter.WriteObjectValue("", contact)
if err != nil {
return support.SetNonRecoverableError(errors.Wrap(err, *contact.GetId()))
@ -317,24 +253,29 @@ func contactToDataCollection(
return support.WrapAndAppend(*contact.GetId(), err, nil)
}
if byteArray != nil {
dataChannel <- &Stream{id: *contact.GetId(), message: byteArray, info: nil}
dataChannel <- &Stream{id: *contact.GetId(), message: byteArray, info: ContactInfo(contact)}
}
return nil
}
// messageToDataCollection is the GraphSerializeFunc for models.Messageable
func messageToDataCollection(
ctx context.Context,
client *msgraphsdk.GraphServiceClient,
objectWriter *kw.JsonSerializationWriter,
dataChannel chan<- data.Stream,
message models.Messageable,
parsable absser.Parsable,
user string,
) error {
var err error
aMessage := message
adtl := message.GetAdditionalData()
defer objectWriter.Close()
aMessage, ok := parsable.(models.Messageable)
if !ok {
return fmt.Errorf("expected Messageable, got %T", parsable)
}
adtl := aMessage.GetAdditionalData()
if len(adtl) > 2 {
aMessage, err = support.ConvertFromMessageable(adtl, message)
aMessage, err = support.ConvertFromMessageable(adtl, aMessage)
if err != nil {
return err
}
@ -349,7 +290,7 @@ func messageToDataCollection(
Attachments().
Get()
retriesErr = err
if err == nil && attached != nil {
if err == nil {
aMessage.SetAttachments(attached.GetValue())
break
}
@ -359,19 +300,19 @@ func messageToDataCollection(
return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil)
}
}
err = objectWriter.WriteObjectValue("", aMessage)
if err != nil {
return support.SetNonRecoverableError(errors.Wrapf(err, "%s", *aMessage.GetId()))
}
byteArray, err := objectWriter.GetSerializedContent()
objectWriter.Close()
if err != nil {
return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil)
err = support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil)
return support.SetNonRecoverableError(err)
}
if byteArray != nil {
dataChannel <- &Stream{id: *aMessage.GetId(), message: byteArray, info: MessageInfo(aMessage)}
}
return nil
}

View File

@ -2,19 +2,10 @@ package exchange
import (
"bytes"
"context"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/connector/graph"
"github.com/alcionai/corso/internal/connector/mockconnector"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/pkg/backup/details"
)
type ExchangeDataCollectionSuite struct {
@ -82,38 +73,3 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_AddJob() {
}
suite.Equal(len(shopping), len(eoc.jobs))
}
// TestExchangeCollection_Items() tests for the Collection.Items() ability
// to asynchronously fill `data` field with Stream objects
func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_Items() {
expected := 5
testFunction := func(ctx context.Context,
service graph.Service,
user string,
jobs []string,
dataChannel chan<- data.Stream,
notUsed chan<- *support.ConnectorOperationStatus,
) {
detail := &details.ExchangeInfo{Sender: "foo@bar.com", Subject: "Hello world!", Received: time.Now()}
for i := 0; i < expected; i++ {
temp := NewStream(uuid.NewString(), mockconnector.GetMockMessageBytes("Test_Items()"), *detail)
dataChannel <- &temp
}
close(dataChannel)
}
eoc := Collection{
user: "Dexter",
fullPath: []string{"Today", "is", "currently", "different"},
data: make(chan data.Stream, expected),
populate: testFunction,
}
t := suite.T()
itemsReturn := eoc.Items()
retrieved := 0
for item := range itemsReturn {
assert.NotNil(t, item)
retrieved++
}
suite.Equal(expected, retrieved)
}

View File

@ -93,7 +93,8 @@ const (
)
// GraphQuery represents functions which perform exchange-specific queries
// into M365 backstore.
// into M365 backstore. Responses -> returned items will only contain the information
// that is included in the options
// TODO: use selector or path for granularity into specific folders or specific date ranges
type GraphQuery func(graph.Service, string) (absser.Parsable, error)
@ -131,10 +132,10 @@ func GetAllFolderNamesForUser(gs graph.Service, user string) (absser.Parsable, e
return gs.Client().UsersById(user).MailFolders().GetWithRequestConfigurationAndResponseHandler(options, nil)
}
// GetAllEvents for User. Default returns EventResponseCollection for events in the future
// GetAllEvents for User. Default returns EventResponseCollection for future events.
// of the time that the call was made. There a
func GetAllEventsForUser(gs graph.Service, user string) (absser.Parsable, error) {
options, err := optionsForEvents([]string{"id", "calendar"})
options, err := optionsForEvents([]string{"id"})
if err != nil {
return nil, err
}
@ -142,6 +143,28 @@ func GetAllEventsForUser(gs graph.Service, user string) (absser.Parsable, error)
return gs.Client().UsersById(user).Events().GetWithRequestConfigurationAndResponseHandler(options, nil)
}
// GraphRetrievalFunctions are functions from the Microsoft Graph API that retrieve
// the default associated data of a M365 object. This varies by object. Additional
// Queries must be run to obtain the omitted fields.
type GraphRetrievalFunc func(gs graph.Service, user, m365ID string) (absser.Parsable, error)
// RetrieveContactDataForUser is a GraphRetrievalFun that returns all associated fields.
func RetrieveContactDataForUser(gs graph.Service, user, m365ID string) (absser.Parsable, error) {
return gs.Client().UsersById(user).ContactsById(m365ID).Get()
}
// RetrieveEventDataForUser is a GraphRetrievalFunc that returns event data.
// Calendarable and attachment fields are omitted due to size
func RetrieveEventDataForUser(gs graph.Service, user, m365ID string) (absser.Parsable, error) {
return gs.Client().UsersById(user).EventsById(m365ID).Get()
}
// RetrieveMessageDataForUser is a GraphRetrievalFunc that returns message data.
// Attachment field is omitted due to size.
func RetrieveMessageDataForUser(gs graph.Service, user, m365ID string) (absser.Parsable, error) {
return gs.Client().UsersById(user).MessagesById(m365ID).Get()
}
// GraphIterateFuncs are iterate functions to be used with the M365 iterators (e.g. msgraphgocore.NewPageIterator)
// @returns a callback func that works with msgraphgocore.PageIterator.Iterate function
type GraphIterateFunc func(