diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index 73350623b..417830791 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -67,11 +67,9 @@ func (eoc *Collection) PopulateCollection(newData *Stream) { eoc.data <- newData } -// NOTE: Refactor has not happened moving into folders // populateFromTaskList async call to fill DataCollection via channel implementation -func PopulateFromTaskList( +func PopulateFromCollection( ctx context.Context, - tasklist support.TaskList, service graph.Service, collections map[string]*Collection, statusChannel chan<- *support.ConnectorOperationStatus, @@ -80,18 +78,9 @@ func PopulateFromTaskList( var attemptedItems, success int objectWriter := kw.NewJsonSerializationWriter() - //Todo this has to return all the errors in the status - for aFolder, tasks := range tasklist { - // Get the same folder - edc := collections[aFolder] - if edc == nil { - for _, task := range tasks { - errs = support.WrapAndAppend(task, errors.New("unable to query: collection not found during populateFromTaskList"), errs) - } - continue - } + for _, edc := range collections { - for _, task := range tasks { + for _, task := range edc.jobs { response, err := service.Client().UsersById(edc.user).MessagesById(task).Get() if err != nil { details := support.ConnectorStackErrorTrace(err) @@ -109,10 +98,10 @@ func PopulateFromTaskList( } } close(edc.data) - attemptedItems += len(tasks) + attemptedItems += len(edc.jobs) } - status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, len(tasklist), errs) + status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, len(collections), errs) logger.Ctx(ctx).Debug(status.String()) statusChannel <- status } diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 3b77078ee..0388e17eb 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -319,7 +319,8 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m if err != nil { return nil, err } - tasklist := support.NewTaskList() // map[folder][] messageIds + // Create collection of ExchangeDataCollection and create data Holder + collections := make(map[string]*exchange.Collection) callbackFunc := func(messageItem any) bool { message, ok := messageItem.(models.Messageable) if !ok { @@ -327,7 +328,12 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m return true } // Saving to messages to list. Indexed by folder - tasklist.AddTask(*message.GetParentFolderId(), *message.GetId()) + directory := *message.GetParentFolderId() + if _, ok = collections[directory]; !ok { + edc := exchange.NewCollection(user, []string{gc.tenant, user, mailCategory, directory}) + collections[directory] = &edc + } + collections[directory].AddJob(*message.GetId()) return true } iterateError := pageIterator.Iterate(callbackFunc) @@ -337,31 +343,13 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m if err != nil { return nil, err // return error if snapshot is incomplete } - // Create collection of ExchangeDataCollection and create data Holder - collections := make(map[string]*exchange.Collection) - for aFolder := range tasklist { - // prep the items for handoff to the backup consumer - edc := exchange.NewCollection(user, []string{gc.tenant, user, mailCategory, aFolder}) - collections[aFolder] = &edc - } - - if len(collections) == 0 { - if len(tasklist) != 0 { - // Below error message needs revising. Assumption is that it should always - // find both items to fetch and a DataCollection to put them in - return nil, support.WrapAndAppend( - user, errors.New("found items but no directories"), err) - } - // return empty collection when no items found - return nil, err - } service, err := gc.createService(gc.failFast) if err != nil { return nil, support.WrapAndAppend(user, err, err) } // async call to populate - go exchange.PopulateFromTaskList(ctx, tasklist, service, collections, gc.statusCh) + go exchange.PopulateFromCollection(ctx, service, collections, gc.statusCh) gc.incrementAwaitingMessages() return collections, err diff --git a/src/internal/connector/support/m365Support.go b/src/internal/connector/support/m365Support.go index a2403b026..f62e4d330 100644 --- a/src/internal/connector/support/m365Support.go +++ b/src/internal/connector/support/m365Support.go @@ -6,25 +6,6 @@ import ( "github.com/microsoftgraph/msgraph-sdk-go/models" ) -// TaskList is a a generic map of a list of items with a string index -type TaskList map[string][]string - -// NewTaskList constructor for TaskList -func NewTaskList() TaskList { - return make(map[string][]string, 0) -} - -// AddTask helper method to ensure that keys and items are created properly -func (tl *TaskList) AddTask(key, value string) { - aMap := *tl - _, isCreated := aMap[key] - if isCreated { - aMap[key] = append(aMap[key], value) - } else { - aMap[key] = []string{value} - } -} - // CreateFromBytes helper function to initialize m365 object form bytes. // @param bytes -> source, createFunc -> abstract function for initialization func CreateFromBytes(bytes []byte, createFunc absser.ParsableFactory) (absser.Parsable, error) { diff --git a/src/internal/connector/support/m365Support_test.go b/src/internal/connector/support/m365Support_test.go index a562a752a..9edf81694 100644 --- a/src/internal/connector/support/m365Support_test.go +++ b/src/internal/connector/support/m365Support_test.go @@ -59,13 +59,3 @@ func (suite *DataSupportSuite) TestCreateMessageFromBytes() { test.checkObject(suite.T(), result) } } - -func (suite *DataSupportSuite) TestDataSupport_TaskList() { - tasks := NewTaskList() - tasks.AddTask("person1", "Go to store") - tasks.AddTask("person1", "drop off mail") - values := tasks["person1"] - suite.Equal(len(values), 2) - nonValues := tasks["unknown"] - suite.Zero(len(nonValues)) -}