Population Function Moved to Exchange Package (#429)

Functions and interfaces moved to interact properly with connector package. Fields are no longer exported until the required functions are moved to the exchange package.
This commit is contained in:
Danny 2022-07-27 19:18:20 -04:00 committed by GitHub
parent 665fa21b13
commit 8bfff3c88f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 202 additions and 151 deletions

View File

@ -5,10 +5,19 @@ package exchange
import (
"bytes"
"context"
"io"
kw "github.com/microsoft/kiota-serialization-json-go"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/pkg/errors"
"github.com/alcionai/corso/internal/connector/graph"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/pkg/backup/details"
"github.com/alcionai/corso/pkg/logger"
)
var _ data.Collection = &Collection{}
@ -17,14 +26,18 @@ var _ data.StreamInfo = &Stream{}
const (
collectionChannelBufferSize = 1000
numberOfRetries = 4
)
// Collection represents an compilation of M365 objects from a specific exchange application.
// Each Collection is to only hold one application type at a time. This is not enforced
// ExchangeDataCollection represents exchange mailbox
// data for a single user.
//
// It implements the DataCollection interface
type Collection struct {
// M365 user
User string // M365 user
Data chan data.Stream // represents a single M365 object from an Exchange application
Data chan data.Stream
// FullPath is the slice representation of the action context passed down through the hierarchy.
//The original request can be gleaned from the slice. (e.g. {<tenant ID>, <user ID>, "emails"})
fullPath []string
@ -52,7 +65,110 @@ func (eoc *Collection) FinishPopulation() {
}
}
// Items() returns the channel containing M365 Exchange objects
// NOTE: Refactor has not happened moving into folders
// populateFromTaskList async call to fill DataCollection via channel implementation
func PopulateFromTaskList(
ctx context.Context,
tasklist support.TaskList,
service graph.Service,
collections map[string]*Collection,
statusChannel chan<- *support.ConnectorOperationStatus,
) {
var errs error
var attemptedItems, success int
objectWriter := kw.NewJsonSerializationWriter()
//Todo this has to return all the errors in the status
for aFolder, tasks := range tasklist {
// Get the same folder
edc := collections[aFolder]
if edc == nil {
for _, task := range tasks {
errs = support.WrapAndAppend(task, errors.New("unable to query: collection not found during populateFromTaskList"), errs)
}
continue
}
for _, task := range tasks {
response, err := service.Client().UsersById(edc.User).MessagesById(task).Get()
if err != nil {
details := support.ConnectorStackErrorTrace(err)
errs = support.WrapAndAppend(edc.User, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs)
continue
}
err = messageToDataCollection(service.Client(), ctx, objectWriter, edc.Data, response, edc.User)
success++
if err != nil {
errs = support.WrapAndAppendf(edc.User, err, errs)
success--
}
if errs != nil && service.ErrPolicy() {
break
}
}
edc.FinishPopulation()
attemptedItems += len(tasks)
}
status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, len(tasklist), errs)
logger.Ctx(ctx).Debug(status.String())
statusChannel <- status
}
func messageToDataCollection(
client *msgraphsdk.GraphServiceClient,
ctx context.Context,
objectWriter *kw.JsonSerializationWriter,
dataChannel chan<- data.Stream,
message models.Messageable,
user string,
) error {
var err error
aMessage := message
adtl := message.GetAdditionalData()
if len(adtl) > 2 {
aMessage, err = support.ConvertFromMessageable(adtl, message)
if err != nil {
return err
}
}
if *aMessage.GetHasAttachments() {
// getting all the attachments might take a couple attempts due to filesize
var retriesErr error
for count := 0; count < numberOfRetries; count++ {
attached, err := client.
UsersById(user).
MessagesById(*aMessage.GetId()).
Attachments().
Get()
retriesErr = err
if err == nil && attached != nil {
aMessage.SetAttachments(attached.GetValue())
break
}
}
if retriesErr != nil {
logger.Ctx(ctx).Debug("exceeded maximum retries")
return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil)
}
}
err = objectWriter.WriteObjectValue("", aMessage)
if err != nil {
return support.SetNonRecoverableError(errors.Wrapf(err, "%s", *aMessage.GetId()))
}
byteArray, err := objectWriter.GetSerializedContent()
objectWriter.Close()
if err != nil {
return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil)
}
if byteArray != nil {
dataChannel <- &Stream{id: *aMessage.GetId(), message: byteArray, info: MessageInfo(aMessage)}
}
return nil
}
func (eoc *Collection) Items() <-chan data.Stream {
return eoc.Data
}
@ -63,22 +179,31 @@ func (edc *Collection) FullPath() []string {
// Stream represents a single item retrieved from exchange
type Stream struct {
Id string
id string
// TODO: We may need this to be a "oneOf" of `message`, `contact`, etc.
// going forward. Using []byte for now but I assume we'll have
// some structured type in here (serialization to []byte can be done in `Read`)
Message []byte
Inf *details.ExchangeInfo //temporary change to bring populate function into directory
message []byte
info *details.ExchangeInfo //temporary change to bring populate function into directory
}
func (od *Stream) UUID() string {
return od.Id
return od.id
}
func (od *Stream) ToReader() io.ReadCloser {
return io.NopCloser(bytes.NewReader(od.Message))
return io.NopCloser(bytes.NewReader(od.message))
}
func (od *Stream) Info() details.ItemInfo {
return details.ItemInfo{Exchange: od.Inf}
return details.ItemInfo{Exchange: od.info}
}
func NewStream(identifier string, bytes []byte, detail details.ExchangeInfo) Stream {
return Stream{
id: identifier,
message: bytes,
info: &detail,
}
}

View File

@ -28,7 +28,7 @@ func TestExchangeDataCollectionSuite(t *testing.T) {
func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Valid() {
m := []byte("test message")
description := "aFile"
ed := &Stream{Id: description, Message: m}
ed := &Stream{id: description, message: m}
// Read the message using the `ExchangeData` reader and validate it matches what we set
buf := &bytes.Buffer{}
@ -41,7 +41,7 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Valid() {
func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Empty() {
var empty []byte
expected := int64(0)
ed := &Stream{Message: empty}
ed := &Stream{message: empty}
buf := &bytes.Buffer{}
received, err := buf.ReadFrom(ed.ToReader())
suite.Equal(expected, received)
@ -69,7 +69,7 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCol
expected := len(inputStrings) / 2 // We are using pairs
edc := NewCollection("Fletcher", []string{"sugar", "horses", "painted red"})
for i := 0; i < expected; i++ {
edc.PopulateCollection(&Stream{Id: inputStrings[i*2], Message: []byte(inputStrings[i*2+1])})
edc.PopulateCollection(&Stream{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])})
}
suite.Equal(expected, len(edc.Data))
}
@ -80,7 +80,7 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_Items() {
expected := len(inputStrings) / 2 // We are using pairs
edc := NewCollection("Fletcher", []string{"sugar", "horses", "painted red"})
for i := 0; i < expected; i++ {
edc.Data <- &Stream{Id: inputStrings[i*2], Message: []byte(inputStrings[i*2+1])}
edc.Data <- &Stream{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}
}
close(edc.Data)
suite.Equal(expected, len(edc.Data))

View File

@ -0,0 +1,14 @@
package graph
import msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
type Service interface {
// Client() returns msgraph Service client that can be used to process and execute
// the majority of the queries to the M365 Backstore
Client() *msgraphsdk.GraphServiceClient
// Adapter() returns GraphRequest adapter used to process large requests, create batches
// and page iterators
Adapter() *msgraphsdk.GraphRequestAdapter
// ErrPolicy returns if the service is implementing a Fast-Fail policy or not
ErrPolicy() bool
}

View File

@ -10,7 +10,6 @@ import (
az "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
ka "github.com/microsoft/kiota-authentication-azure-go"
kw "github.com/microsoft/kiota-serialization-json-go"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
"github.com/microsoftgraph/msgraph-sdk-go/models"
@ -26,7 +25,6 @@ import (
)
const (
numberOfRetries = 4
mailCategory = "mail"
)
@ -49,6 +47,18 @@ type graphService struct {
failFast bool // if true service will exit sequence upon encountering an error
}
func (gs *graphService) Client() *msgraphsdk.GraphServiceClient {
return &gs.client
}
func (gs *graphService) Adapter() *msgraphsdk.GraphRequestAdapter {
return &gs.adapter
}
func (gs *graphService) ErrPolicy() bool {
return gs.failFast
}
func NewGraphConnector(acct account.Account) (*GraphConnector, error) {
m365, err := acct.M365Config()
if err != nil {
@ -319,7 +329,7 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m
if err != nil {
return nil, err
}
tasklist := NewTaskList() // map[folder][] messageIds
tasklist := support.NewTaskList() // map[folder][] messageIds
callbackFunc := func(messageItem any) bool {
message, ok := messageItem.(models.Messageable)
if !ok {
@ -361,114 +371,12 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m
return nil, support.WrapAndAppend(user, err, err)
}
// async call to populate
go service.populateFromTaskList(ctx, tasklist, collections, gc.statusCh)
go exchange.PopulateFromTaskList(ctx, tasklist, service, collections, gc.statusCh)
gc.incrementAwaitingMessages()
return collections, err
}
// populateFromTaskList async call to fill DataCollection via channel implementation
func (sc *graphService) populateFromTaskList(
ctx context.Context,
tasklist TaskList,
collections map[string]*exchange.Collection,
statusChannel chan<- *support.ConnectorOperationStatus,
) {
var errs error
var attemptedItems, success int
objectWriter := kw.NewJsonSerializationWriter()
//Todo this has to return all the errors in the status
for aFolder, tasks := range tasklist {
// Get the same folder
edc := collections[aFolder]
if edc == nil {
for _, task := range tasks {
errs = support.WrapAndAppend(task, errors.New("unable to query: collection not found during populateFromTaskList"), errs)
}
continue
}
for _, task := range tasks {
response, err := sc.client.UsersById(edc.User).MessagesById(task).Get()
if err != nil {
details := support.ConnectorStackErrorTrace(err)
errs = support.WrapAndAppend(edc.User, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs)
continue
}
err = messageToDataCollection(&sc.client, ctx, objectWriter, edc.Data, response, edc.User)
success++
if err != nil {
errs = support.WrapAndAppendf(edc.User, err, errs)
success--
}
if errs != nil && sc.failFast {
break
}
}
edc.FinishPopulation()
attemptedItems += len(tasks)
}
status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, len(tasklist), errs)
logger.Ctx(ctx).Debug(status.String())
statusChannel <- status
}
func messageToDataCollection(
client *msgraphsdk.GraphServiceClient,
ctx context.Context,
objectWriter *kw.JsonSerializationWriter,
dataChannel chan<- data.Stream,
message models.Messageable,
user string,
) error {
var err error
aMessage := message
adtl := message.GetAdditionalData()
if len(adtl) > 2 {
aMessage, err = support.ConvertFromMessageable(adtl, message)
if err != nil {
return err
}
}
if *aMessage.GetHasAttachments() {
// getting all the attachments might take a couple attempts due to filesize
var retriesErr error
for count := 0; count < numberOfRetries; count++ {
attached, err := client.
UsersById(user).
MessagesById(*aMessage.GetId()).
Attachments().
Get()
retriesErr = err
if err == nil && attached != nil {
aMessage.SetAttachments(attached.GetValue())
break
}
}
if retriesErr != nil {
logger.Ctx(ctx).Debug("exceeded maximum retries")
return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil)
}
}
err = objectWriter.WriteObjectValue("", aMessage)
if err != nil {
return support.SetNonRecoverableError(errors.Wrapf(err, "%s", *aMessage.GetId()))
}
byteArray, err := objectWriter.GetSerializedContent()
objectWriter.Close()
if err != nil {
return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil)
}
if byteArray != nil {
dataChannel <- &exchange.Stream{Id: *aMessage.GetId(), Message: byteArray, Inf: exchange.MessageInfo(aMessage)}
}
return nil
}
// SetStatus helper function
func (gc *GraphConnector) SetStatus(cos support.ConnectorOperationStatus) {
gc.status = &cos

View File

@ -15,6 +15,7 @@ import (
"github.com/alcionai/corso/internal/data"
ctesting "github.com/alcionai/corso/internal/testing"
"github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/backup/details"
"github.com/alcionai/corso/pkg/credentials"
"github.com/alcionai/corso/pkg/selectors"
)
@ -86,8 +87,10 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_restoreMessages(
if err != nil {
suite.T().Skipf("Support file not accessible: %v\n", err)
}
ds := exchange.Stream{Id: "test", Message: bytes}
ds := exchange.NewStream("test", bytes, details.ExchangeInfo{})
edc := exchange.NewCollection("tenant", []string{"tenantId", evs[user], mailCategory, "Inbox"})
edc.PopulateCollection(&ds)
edc.FinishPopulation()
err = suite.connector.RestoreMessages(context.Background(), []data.Collection{&edc})
@ -244,16 +247,6 @@ func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_ErrorChecking()
}
}
func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_TaskList() {
tasks := NewTaskList()
tasks.AddTask("person1", "Go to store")
tasks.AddTask("person1", "drop off mail")
values := tasks["person1"]
suite.Equal(len(values), 2)
nonValues := tasks["unknown"]
suite.Zero(len(nonValues))
}
func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_TestOptionsForMailFolders() {
tests := []struct {
name string

View File

@ -7,8 +7,6 @@ import (
msmessage "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages"
)
// TaskList is a a generic map of a list of items with a string index
type TaskList map[string][]string
type optionIdentifier int
//go:generate stringer -type=optionIdentifier
@ -19,22 +17,6 @@ const (
users
)
// NewTaskList constructor for TaskList
func NewTaskList() TaskList {
return make(map[string][]string, 0)
}
// AddTask helper method to ensure that keys and items are created properly
func (tl *TaskList) AddTask(key, value string) {
aMap := *tl
_, isCreated := aMap[key]
if isCreated {
aMap[key] = append(aMap[key], value)
} else {
aMap[key] = []string{value}
}
}
// Contains is a helper method for verifying if element
// is contained within the slice
func Contains(elems []string, value string) bool {

View File

@ -6,6 +6,25 @@ import (
"github.com/microsoftgraph/msgraph-sdk-go/models"
)
// TaskList is a a generic map of a list of items with a string index
type TaskList map[string][]string
// NewTaskList constructor for TaskList
func NewTaskList() TaskList {
return make(map[string][]string, 0)
}
// AddTask helper method to ensure that keys and items are created properly
func (tl *TaskList) AddTask(key, value string) {
aMap := *tl
_, isCreated := aMap[key]
if isCreated {
aMap[key] = append(aMap[key], value)
} else {
aMap[key] = []string{value}
}
}
// CreateFromBytes helper function to initialize m365 object form bytes.
// @param bytes -> source, createFunc -> abstract function for initialization
func CreateFromBytes(bytes []byte, createFunc absser.ParsableFactory) (absser.Parsable, error) {

View File

@ -59,3 +59,13 @@ func (suite *DataSupportSuite) TestCreateMessageFromBytes() {
test.checkObject(suite.T(), result)
}
}
func (suite *DataSupportSuite) TestDataSupport_TaskList() {
tasks := NewTaskList()
tasks.AddTask("person1", "Go to store")
tasks.AddTask("person1", "drop off mail")
values := tasks["person1"]
suite.Equal(len(values), 2)
nonValues := tasks["unknown"]
suite.Zero(len(nonValues))
}