Gc serialize to collection (#467)

Abstractions: Exchange selector function introduced, GraphIterateFuncs and service functions extended
This commit is contained in:
Danny 2022-08-03 14:26:06 -04:00 committed by GitHub
parent 186c2d9dc4
commit 2b95553355
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 70 additions and 13 deletions

View File

@ -55,6 +55,7 @@ type Collection struct {
func NewCollection(
aUser string,
pathRepresentation []string,
collectionType optionIdentifier,
aService graph.Service,
statusCh chan<- *support.ConnectorOperationStatus,
) Collection {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"time"
absser "github.com/microsoft/kiota-abstractions-go/serialization"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
"github.com/microsoftgraph/msgraph-sdk-go/models"
@ -16,6 +17,7 @@ import (
"github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/control"
"github.com/alcionai/corso/pkg/logger"
"github.com/alcionai/corso/pkg/selectors"
)
var ErrFolderNotFound = errors.New("folder not found")
@ -117,6 +119,23 @@ func GetMailFolderID(service graph.Service, folderName, user string) (*string, e
}
// SetupExchangeCollectionVars is a helper function returns a sets
// Exchange.Type specific functions based on scope
func SetupExchangeCollectionVars(scope selectors.ExchangeScope) (
absser.ParsableFactory,
GraphQuery,
GraphIterateFunc,
) {
if scope.IncludesCategory(selectors.ExchangeMail) {
return models.CreateMessageCollectionResponseFromDiscriminatorValue,
GetAllMessagesForUser,
IterateSelectAllMessagesForCollections
}
return nil, nil, nil
}
// GetCopyRestoreFolder utility function to create an unique folder for the restore process
func GetCopyRestoreFolder(service graph.Service, user string) (*string, error) {
now := time.Now().UTC()

View File

@ -11,6 +11,7 @@ import (
"github.com/alcionai/corso/internal/connector/graph"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/selectors"
)
type optionIdentifier int
@ -40,10 +41,25 @@ func GetAllMessagesForUser(gs graph.Service, identities []string) (absser.Parsab
return gs.Client().UsersById(identities[0]).Messages().GetWithRequestConfigurationAndResponseHandler(options, nil)
}
// IterateMessageCollection utility function for Iterating through MessagesCollectionResponse
// During iteration, Collections are added to the map based on the parent folder
func IterateMessagesCollection(
tenant, user string,
// GraphIterateFuncs are iterate functions to be used with the M365 iterators (e.g. msgraphgocore.NewPageIterator)
// @returns a callback func that works with msgraphgocore.PageIterator.Iterate function
type GraphIterateFunc func(
string,
selectors.ExchangeScope,
error,
bool,
account.M365Config,
map[string]*Collection,
chan<- *support.ConnectorOperationStatus,
) func(any) bool
// IterateSelectAllMessageForCollection utility function for
// Iterating through MessagesCollectionResponse
// During iteration, messages belonging to any folder are
// placed into a Collection based on the parent folder
func IterateSelectAllMessagesForCollections(
tenant string,
scope selectors.ExchangeScope,
errs error,
failFast bool,
credentials account.M365Config,
@ -51,6 +67,10 @@ func IterateMessagesCollection(
statusCh chan<- *support.ConnectorOperationStatus,
) func(any) bool {
return func(messageItem any) bool {
// Defines the type of collection being created within the function
collection_type := messages
user := scope.Get(selectors.ExchangeUser)[0]
message, ok := messageItem.(models.Messageable)
if !ok {
errs = support.WrapAndAppendf(user, errors.New("message iteration failure"), errs)
@ -64,7 +84,13 @@ func IterateMessagesCollection(
errs = support.WrapAndAppend(user, err, errs)
return true
}
edc := NewCollection(user, []string{tenant, user, mailCategory, directory}, service, statusCh)
edc := NewCollection(
user,
[]string{tenant, user, mailCategory, directory},
collection_type,
service,
statusCh,
)
collections[directory] = &edc
}
collections[directory].AddJob(*message.GetId())

View File

@ -203,8 +203,8 @@ func (gc *GraphConnector) ExchangeDataCollection(ctx context.Context, selector s
errs)
continue
}
// TODO: Creates a map of collections based on scope
dcs, err := gc.serializeMessages(ctx, user)
// Creates a map of collections based on scope
dcs, err := gc.createCollections(ctx, scope)
if err != nil {
return nil, support.WrapAndAppend(user, err, errs)
}
@ -284,15 +284,26 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []data.Collec
return errs
}
// serializeMessages: Temp Function as place Holder until Collections have been added
// createCollection - utility function that retrieves M365
// IDs through Microsoft Graph API. The selectors.ExchangeScope
// determines the type of collections that are stored.
// to the GraphConnector struct.
func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (map[string]*exchange.Collection, error) {
var transformer absser.ParsableFactory
response, err := exchange.GetAllMessagesForUser(&gc.graphService, []string{user}) //TODO: Selector to be used for exchange.query
func (gc *GraphConnector) createCollections(
ctx context.Context,
scope selectors.ExchangeScope,
) (map[string]*exchange.Collection, error) {
var (
transformer absser.ParsableFactory
query exchange.GraphQuery
gIter exchange.GraphIterateFunc
)
user := scope.Get(selectors.ExchangeUser)[0]
transformer, query, gIter = exchange.SetupExchangeCollectionVars(scope)
response, err := query(&gc.graphService, []string{user})
if err != nil {
return nil, err
}
transformer = models.CreateMessageCollectionResponseFromDiscriminatorValue
pageIterator, err := msgraphgocore.NewPageIterator(response, &gc.graphService.adapter, transformer)
if err != nil {
return nil, err
@ -302,7 +313,7 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m
var errs error
// callbackFunc iterates through all models.Messageable and fills exchange.Collection.jobs[]
// with corresponding messageIDs. New collections are created for each directory
callbackFunc := exchange.IterateMessagesCollection(gc.tenant, user, errs, gc.failFast, gc.credentials, collections, gc.statusCh)
callbackFunc := gIter(gc.tenant, scope, errs, gc.failFast, gc.credentials, collections, gc.statusCh)
iterateError := pageIterator.Iterate(callbackFunc)
if iterateError != nil {
errs = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), iterateError, errs)