diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index 072fe7a3b..2f1769de3 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -5,10 +5,19 @@ package exchange import ( "bytes" + "context" "io" + kw "github.com/microsoft/kiota-serialization-json-go" + msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" + "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/pkg/errors" + + "github.com/alcionai/corso/internal/connector/graph" + "github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/data" "github.com/alcionai/corso/pkg/backup/details" + "github.com/alcionai/corso/pkg/logger" ) var _ data.Collection = &Collection{} @@ -17,14 +26,18 @@ var _ data.StreamInfo = &Stream{} const ( collectionChannelBufferSize = 1000 + numberOfRetries = 4 ) -// Collection represents an compilation of M365 objects from a specific exchange application. -// Each Collection is to only hold one application type at a time. This is not enforced +// ExchangeDataCollection represents exchange mailbox +// data for a single user. +// +// It implements the DataCollection interface type Collection struct { // M365 user - User string // M365 user - Data chan data.Stream // represents a single M365 object from an Exchange application + User string // M365 user + Data chan data.Stream + // FullPath is the slice representation of the action context passed down through the hierarchy. //The original request can be gleaned from the slice. (e.g. {, , "emails"}) fullPath []string @@ -52,7 +65,110 @@ func (eoc *Collection) FinishPopulation() { } } -// Items() returns the channel containing M365 Exchange objects +// NOTE: Refactor has not happened moving into folders +// populateFromTaskList async call to fill DataCollection via channel implementation +func PopulateFromTaskList( + ctx context.Context, + tasklist support.TaskList, + service graph.Service, + collections map[string]*Collection, + statusChannel chan<- *support.ConnectorOperationStatus, +) { + var errs error + var attemptedItems, success int + objectWriter := kw.NewJsonSerializationWriter() + + //Todo this has to return all the errors in the status + for aFolder, tasks := range tasklist { + // Get the same folder + edc := collections[aFolder] + if edc == nil { + for _, task := range tasks { + errs = support.WrapAndAppend(task, errors.New("unable to query: collection not found during populateFromTaskList"), errs) + } + continue + } + + for _, task := range tasks { + response, err := service.Client().UsersById(edc.User).MessagesById(task).Get() + if err != nil { + details := support.ConnectorStackErrorTrace(err) + errs = support.WrapAndAppend(edc.User, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs) + continue + } + err = messageToDataCollection(service.Client(), ctx, objectWriter, edc.Data, response, edc.User) + success++ + if err != nil { + errs = support.WrapAndAppendf(edc.User, err, errs) + success-- + } + if errs != nil && service.ErrPolicy() { + break + } + } + + edc.FinishPopulation() + attemptedItems += len(tasks) + } + + status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, len(tasklist), errs) + logger.Ctx(ctx).Debug(status.String()) + statusChannel <- status +} + +func messageToDataCollection( + client *msgraphsdk.GraphServiceClient, + ctx context.Context, + objectWriter *kw.JsonSerializationWriter, + dataChannel chan<- data.Stream, + message models.Messageable, + user string, +) error { + var err error + aMessage := message + adtl := message.GetAdditionalData() + if len(adtl) > 2 { + aMessage, err = support.ConvertFromMessageable(adtl, message) + if err != nil { + return err + } + } + if *aMessage.GetHasAttachments() { + // getting all the attachments might take a couple attempts due to filesize + var retriesErr error + for count := 0; count < numberOfRetries; count++ { + attached, err := client. + UsersById(user). + MessagesById(*aMessage.GetId()). + Attachments(). + Get() + retriesErr = err + if err == nil && attached != nil { + aMessage.SetAttachments(attached.GetValue()) + break + } + } + if retriesErr != nil { + logger.Ctx(ctx).Debug("exceeded maximum retries") + return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil) + } + } + err = objectWriter.WriteObjectValue("", aMessage) + if err != nil { + return support.SetNonRecoverableError(errors.Wrapf(err, "%s", *aMessage.GetId())) + } + + byteArray, err := objectWriter.GetSerializedContent() + objectWriter.Close() + if err != nil { + return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil) + } + if byteArray != nil { + dataChannel <- &Stream{id: *aMessage.GetId(), message: byteArray, info: MessageInfo(aMessage)} + } + return nil +} + func (eoc *Collection) Items() <-chan data.Stream { return eoc.Data } @@ -63,22 +179,31 @@ func (edc *Collection) FullPath() []string { // Stream represents a single item retrieved from exchange type Stream struct { - Id string + id string // TODO: We may need this to be a "oneOf" of `message`, `contact`, etc. // going forward. Using []byte for now but I assume we'll have // some structured type in here (serialization to []byte can be done in `Read`) - Message []byte - Inf *details.ExchangeInfo //temporary change to bring populate function into directory + message []byte + info *details.ExchangeInfo //temporary change to bring populate function into directory } func (od *Stream) UUID() string { - return od.Id + return od.id + } func (od *Stream) ToReader() io.ReadCloser { - return io.NopCloser(bytes.NewReader(od.Message)) + return io.NopCloser(bytes.NewReader(od.message)) } func (od *Stream) Info() details.ItemInfo { - return details.ItemInfo{Exchange: od.Inf} + return details.ItemInfo{Exchange: od.info} +} +func NewStream(identifier string, bytes []byte, detail details.ExchangeInfo) Stream { + return Stream{ + id: identifier, + message: bytes, + info: &detail, + } + } diff --git a/src/internal/connector/exchange/exchange_data_collection_test.go b/src/internal/connector/exchange/exchange_data_collection_test.go index 9053724b4..ed423e3c2 100644 --- a/src/internal/connector/exchange/exchange_data_collection_test.go +++ b/src/internal/connector/exchange/exchange_data_collection_test.go @@ -28,7 +28,7 @@ func TestExchangeDataCollectionSuite(t *testing.T) { func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Valid() { m := []byte("test message") description := "aFile" - ed := &Stream{Id: description, Message: m} + ed := &Stream{id: description, message: m} // Read the message using the `ExchangeData` reader and validate it matches what we set buf := &bytes.Buffer{} @@ -41,7 +41,7 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Valid() { func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Empty() { var empty []byte expected := int64(0) - ed := &Stream{Message: empty} + ed := &Stream{message: empty} buf := &bytes.Buffer{} received, err := buf.ReadFrom(ed.ToReader()) suite.Equal(expected, received) @@ -69,7 +69,7 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCol expected := len(inputStrings) / 2 // We are using pairs edc := NewCollection("Fletcher", []string{"sugar", "horses", "painted red"}) for i := 0; i < expected; i++ { - edc.PopulateCollection(&Stream{Id: inputStrings[i*2], Message: []byte(inputStrings[i*2+1])}) + edc.PopulateCollection(&Stream{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}) } suite.Equal(expected, len(edc.Data)) } @@ -80,7 +80,7 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_Items() { expected := len(inputStrings) / 2 // We are using pairs edc := NewCollection("Fletcher", []string{"sugar", "horses", "painted red"}) for i := 0; i < expected; i++ { - edc.Data <- &Stream{Id: inputStrings[i*2], Message: []byte(inputStrings[i*2+1])} + edc.Data <- &Stream{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])} } close(edc.Data) suite.Equal(expected, len(edc.Data)) diff --git a/src/internal/connector/graph/service.go b/src/internal/connector/graph/service.go new file mode 100644 index 000000000..d04aa5576 --- /dev/null +++ b/src/internal/connector/graph/service.go @@ -0,0 +1,14 @@ +package graph + +import msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" + +type Service interface { + // Client() returns msgraph Service client that can be used to process and execute + // the majority of the queries to the M365 Backstore + Client() *msgraphsdk.GraphServiceClient + // Adapter() returns GraphRequest adapter used to process large requests, create batches + // and page iterators + Adapter() *msgraphsdk.GraphRequestAdapter + // ErrPolicy returns if the service is implementing a Fast-Fail policy or not + ErrPolicy() bool +} diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index a56936694..094779a25 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -10,7 +10,6 @@ import ( az "github.com/Azure/azure-sdk-for-go/sdk/azidentity" ka "github.com/microsoft/kiota-authentication-azure-go" - kw "github.com/microsoft/kiota-serialization-json-go" msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core" "github.com/microsoftgraph/msgraph-sdk-go/models" @@ -26,8 +25,7 @@ import ( ) const ( - numberOfRetries = 4 - mailCategory = "mail" + mailCategory = "mail" ) // GraphConnector is a struct used to wrap the GraphServiceClient and @@ -49,6 +47,18 @@ type graphService struct { failFast bool // if true service will exit sequence upon encountering an error } +func (gs *graphService) Client() *msgraphsdk.GraphServiceClient { + return &gs.client +} + +func (gs *graphService) Adapter() *msgraphsdk.GraphRequestAdapter { + return &gs.adapter +} + +func (gs *graphService) ErrPolicy() bool { + return gs.failFast +} + func NewGraphConnector(acct account.Account) (*GraphConnector, error) { m365, err := acct.M365Config() if err != nil { @@ -319,7 +329,7 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m if err != nil { return nil, err } - tasklist := NewTaskList() // map[folder][] messageIds + tasklist := support.NewTaskList() // map[folder][] messageIds callbackFunc := func(messageItem any) bool { message, ok := messageItem.(models.Messageable) if !ok { @@ -361,114 +371,12 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m return nil, support.WrapAndAppend(user, err, err) } // async call to populate - go service.populateFromTaskList(ctx, tasklist, collections, gc.statusCh) + go exchange.PopulateFromTaskList(ctx, tasklist, service, collections, gc.statusCh) gc.incrementAwaitingMessages() return collections, err } -// populateFromTaskList async call to fill DataCollection via channel implementation -func (sc *graphService) populateFromTaskList( - ctx context.Context, - tasklist TaskList, - collections map[string]*exchange.Collection, - statusChannel chan<- *support.ConnectorOperationStatus, -) { - var errs error - var attemptedItems, success int - objectWriter := kw.NewJsonSerializationWriter() - - //Todo this has to return all the errors in the status - for aFolder, tasks := range tasklist { - // Get the same folder - edc := collections[aFolder] - if edc == nil { - for _, task := range tasks { - errs = support.WrapAndAppend(task, errors.New("unable to query: collection not found during populateFromTaskList"), errs) - } - continue - } - - for _, task := range tasks { - response, err := sc.client.UsersById(edc.User).MessagesById(task).Get() - if err != nil { - details := support.ConnectorStackErrorTrace(err) - errs = support.WrapAndAppend(edc.User, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs) - continue - } - err = messageToDataCollection(&sc.client, ctx, objectWriter, edc.Data, response, edc.User) - success++ - if err != nil { - errs = support.WrapAndAppendf(edc.User, err, errs) - success-- - } - if errs != nil && sc.failFast { - break - } - } - - edc.FinishPopulation() - attemptedItems += len(tasks) - } - - status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, len(tasklist), errs) - logger.Ctx(ctx).Debug(status.String()) - statusChannel <- status -} - -func messageToDataCollection( - client *msgraphsdk.GraphServiceClient, - ctx context.Context, - objectWriter *kw.JsonSerializationWriter, - dataChannel chan<- data.Stream, - message models.Messageable, - user string, -) error { - var err error - aMessage := message - adtl := message.GetAdditionalData() - if len(adtl) > 2 { - aMessage, err = support.ConvertFromMessageable(adtl, message) - if err != nil { - return err - } - } - if *aMessage.GetHasAttachments() { - // getting all the attachments might take a couple attempts due to filesize - var retriesErr error - for count := 0; count < numberOfRetries; count++ { - attached, err := client. - UsersById(user). - MessagesById(*aMessage.GetId()). - Attachments(). - Get() - retriesErr = err - if err == nil && attached != nil { - aMessage.SetAttachments(attached.GetValue()) - break - } - } - if retriesErr != nil { - logger.Ctx(ctx).Debug("exceeded maximum retries") - return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil) - } - } - err = objectWriter.WriteObjectValue("", aMessage) - if err != nil { - return support.SetNonRecoverableError(errors.Wrapf(err, "%s", *aMessage.GetId())) - } - - byteArray, err := objectWriter.GetSerializedContent() - objectWriter.Close() - if err != nil { - return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil) - } - if byteArray != nil { - dataChannel <- &exchange.Stream{Id: *aMessage.GetId(), Message: byteArray, Inf: exchange.MessageInfo(aMessage)} - } - return nil -} - // SetStatus helper function func (gc *GraphConnector) SetStatus(cos support.ConnectorOperationStatus) { gc.status = &cos diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index f1eb2c580..872f44b03 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/internal/data" ctesting "github.com/alcionai/corso/internal/testing" "github.com/alcionai/corso/pkg/account" + "github.com/alcionai/corso/pkg/backup/details" "github.com/alcionai/corso/pkg/credentials" "github.com/alcionai/corso/pkg/selectors" ) @@ -86,8 +87,10 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_restoreMessages( if err != nil { suite.T().Skipf("Support file not accessible: %v\n", err) } - ds := exchange.Stream{Id: "test", Message: bytes} + + ds := exchange.NewStream("test", bytes, details.ExchangeInfo{}) edc := exchange.NewCollection("tenant", []string{"tenantId", evs[user], mailCategory, "Inbox"}) + edc.PopulateCollection(&ds) edc.FinishPopulation() err = suite.connector.RestoreMessages(context.Background(), []data.Collection{&edc}) @@ -244,16 +247,6 @@ func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_ErrorChecking() } } -func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_TaskList() { - tasks := NewTaskList() - tasks.AddTask("person1", "Go to store") - tasks.AddTask("person1", "drop off mail") - values := tasks["person1"] - suite.Equal(len(values), 2) - nonValues := tasks["unknown"] - suite.Zero(len(nonValues)) -} - func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_TestOptionsForMailFolders() { tests := []struct { name string diff --git a/src/internal/connector/query.go b/src/internal/connector/query.go index ff08f2e9f..94ea0a53a 100644 --- a/src/internal/connector/query.go +++ b/src/internal/connector/query.go @@ -7,8 +7,6 @@ import ( msmessage "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages" ) -// TaskList is a a generic map of a list of items with a string index -type TaskList map[string][]string type optionIdentifier int //go:generate stringer -type=optionIdentifier @@ -19,22 +17,6 @@ const ( users ) -// NewTaskList constructor for TaskList -func NewTaskList() TaskList { - return make(map[string][]string, 0) -} - -// AddTask helper method to ensure that keys and items are created properly -func (tl *TaskList) AddTask(key, value string) { - aMap := *tl - _, isCreated := aMap[key] - if isCreated { - aMap[key] = append(aMap[key], value) - } else { - aMap[key] = []string{value} - } -} - // Contains is a helper method for verifying if element // is contained within the slice func Contains(elems []string, value string) bool { diff --git a/src/internal/connector/support/m365Support.go b/src/internal/connector/support/m365Support.go index f62e4d330..a2403b026 100644 --- a/src/internal/connector/support/m365Support.go +++ b/src/internal/connector/support/m365Support.go @@ -6,6 +6,25 @@ import ( "github.com/microsoftgraph/msgraph-sdk-go/models" ) +// TaskList is a a generic map of a list of items with a string index +type TaskList map[string][]string + +// NewTaskList constructor for TaskList +func NewTaskList() TaskList { + return make(map[string][]string, 0) +} + +// AddTask helper method to ensure that keys and items are created properly +func (tl *TaskList) AddTask(key, value string) { + aMap := *tl + _, isCreated := aMap[key] + if isCreated { + aMap[key] = append(aMap[key], value) + } else { + aMap[key] = []string{value} + } +} + // CreateFromBytes helper function to initialize m365 object form bytes. // @param bytes -> source, createFunc -> abstract function for initialization func CreateFromBytes(bytes []byte, createFunc absser.ParsableFactory) (absser.Parsable, error) { diff --git a/src/internal/connector/support/m365Support_test.go b/src/internal/connector/support/m365Support_test.go index 9edf81694..a562a752a 100644 --- a/src/internal/connector/support/m365Support_test.go +++ b/src/internal/connector/support/m365Support_test.go @@ -59,3 +59,13 @@ func (suite *DataSupportSuite) TestCreateMessageFromBytes() { test.checkObject(suite.T(), result) } } + +func (suite *DataSupportSuite) TestDataSupport_TaskList() { + tasks := NewTaskList() + tasks.AddTask("person1", "Go to store") + tasks.AddTask("person1", "drop off mail") + values := tasks["person1"] + suite.Equal(len(values), 2) + nonValues := tasks["unknown"] + suite.Zero(len(nonValues)) +}