diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index bba1335b8..259b19b10 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -41,17 +41,26 @@ type Collection struct { // is desired to be sent through the data channel for eventual storage jobs []string + service graph.Service + statusCh chan<- *support.ConnectorOperationStatus // FullPath is the slice representation of the action context passed down through the hierarchy. //The original request can be gleaned from the slice. (e.g. {, , "emails"}) fullPath []string } // NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated -func NewCollection(aUser string, pathRepresentation []string) Collection { +func NewCollection( + aUser string, + pathRepresentation []string, + aService graph.Service, + statusCh chan<- *support.ConnectorOperationStatus, +) Collection { collection := Collection{ user: aUser, data: make(chan data.Stream, collectionChannelBufferSize), jobs: make([]string, 0), + service: aService, + statusCh: statusCh, fullPath: pathRepresentation, } return collection @@ -62,9 +71,15 @@ func (eoc *Collection) AddJob(objID string) { eoc.jobs = append(eoc.jobs, objID) } -// PopulateCollection TODO: remove after async functionilty completed -func (eoc *Collection) PopulateCollection(newData *Stream) { - eoc.data <- newData +// Items utility function to asynchronously execute process to fill data channel with +// M365 exchange objects and returns the data channel +func (eoc *Collection) Items() <-chan data.Stream { + go eoc.PopulateFromCollection(context.TODO(), eoc.service, eoc.statusCh) + return eoc.data +} + +func (edc *Collection) FullPath() []string { + return append([]string{}, edc.fullPath...) } // populateFromTaskList async call to fill DataCollection via channel implementation @@ -155,14 +170,6 @@ func messageToDataCollection( return nil } -func (eoc *Collection) Items() <-chan data.Stream { - return eoc.data -} - -func (edc *Collection) FullPath() []string { - return append([]string{}, edc.fullPath...) -} - // Stream represents a single item retrieved from exchange type Stream struct { id string @@ -185,6 +192,8 @@ func (od *Stream) ToReader() io.ReadCloser { func (od *Stream) Info() details.ItemInfo { return details.ItemInfo{Exchange: od.info} } + +// NewStream constructor for exchange.Stream object func NewStream(identifier string, bytes []byte, detail details.ExchangeInfo) Stream { return Stream{ id: identifier, diff --git a/src/internal/connector/exchange/exchange_data_collection_test.go b/src/internal/connector/exchange/exchange_data_collection_test.go index c606d0dae..2f09e3d06 100644 --- a/src/internal/connector/exchange/exchange_data_collection_test.go +++ b/src/internal/connector/exchange/exchange_data_collection_test.go @@ -50,52 +50,30 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Empty() { func (suite *ExchangeDataCollectionSuite) TestExchangeData_FullPath() { user := "a-user" fullPath := []string{"a-tenant", user, "emails"} - edc := NewCollection(user, fullPath) + edc := Collection{ + user: user, + fullPath: fullPath, + } assert.Equal(suite.T(), edc.FullPath(), fullPath) } func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NewExchangeDataCollection() { name := "User" - edc := NewCollection(name, []string{"Directory", "File", "task"}) + edc := Collection{ + user: name, + fullPath: []string{"Directory", "File", "task"}, + } suite.Equal(name, edc.user) suite.True(contains(edc.FullPath(), "Directory")) suite.True(contains(edc.FullPath(), "File")) suite.True(contains(edc.FullPath(), "task")) } -func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCollection() { - inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to", - "fetch", "a", "pail", "of", "water"} - expected := len(inputStrings) / 2 // We are using pairs - edc := NewCollection("Fletcher", []string{"sugar", "horses", "painted red"}) - for i := 0; i < expected; i++ { - edc.PopulateCollection(&Stream{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}) - } - suite.Equal(expected, len(edc.data)) -} - -func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_Items() { - inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to", - "fetch", "a", "pail", "of", "water"} - expected := len(inputStrings) / 2 // We are using pairs - edc := NewCollection("Fletcher", []string{"sugar", "horses", "painted red"}) - for i := 0; i < expected; i++ { - edc.data <- &Stream{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])} - } - close(edc.data) - suite.Equal(expected, len(edc.data)) - streams := edc.Items() - suite.Equal(expected, len(streams)) - count := 0 - for item := range streams { - assert.NotNil(suite.T(), item) - count++ - } - suite.Equal(count, expected) -} - func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_AddJob() { - eoc := NewCollection("Dexter", []string{"Today", "is", "was", "different"}) + eoc := Collection{ + user: "Dexter", + fullPath: []string{"Today", "is", "currently", "different"}, + } suite.Zero(len(eoc.jobs)) shopping := []string{"tomotoes", "potatoes", "pasta", "ice tea"} for _, item := range shopping { diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 989459dcb..4324ef2d6 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -325,16 +325,24 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m } // Create collection of ExchangeDataCollection and create data Holder collections := make(map[string]*exchange.Collection) + var errs error + // callbackFunc iterates through all models.Messageable and fills exchange.Collection.jobs[] + // with corresponding messageIDs. New collections are created for each directory callbackFunc := func(messageItem any) bool { message, ok := messageItem.(models.Messageable) if !ok { - err = support.WrapAndAppendf(gc.graphService.adapter.GetBaseUrl(), errors.New("message iteration failure"), err) + errs = support.WrapAndAppendf(gc.graphService.adapter.GetBaseUrl(), errors.New("message iteration failure"), err) return true } // Saving to messages to list. Indexed by folder directory := *message.GetParentFolderId() if _, ok = collections[directory]; !ok { - edc := exchange.NewCollection(user, []string{gc.tenant, user, mailCategory, directory}) + service, err := gc.createService(gc.failFast) + if err != nil { + errs = support.WrapAndAppend(user, err, errs) + return true + } + edc := exchange.NewCollection(user, []string{gc.tenant, user, mailCategory, directory}, service, gc.statusCh) collections[directory] = &edc } collections[directory].AddJob(*message.GetId()) @@ -342,22 +350,17 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m } iterateError := pageIterator.Iterate(callbackFunc) if iterateError != nil { - err = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), iterateError, err) + errs = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), iterateError, errs) } - if err != nil { - return nil, err // return error if snapshot is incomplete + if errs != nil { + return nil, errs // return error if snapshot is incomplete } - service, err := gc.createService(gc.failFast) - if err != nil { - return nil, support.WrapAndAppend(user, err, err) - } - for _, edc := range collections { - go edc.PopulateFromCollection(ctx, service, gc.statusCh) + for range collections { gc.incrementAwaitingMessages() } - return collections, err + return collections, errs } // AwaitStatus updates status field based on item within statusChannel. diff --git a/src/internal/connector/graph_connector_disconnected_test.go b/src/internal/connector/graph_connector_disconnected_test.go index 44956460f..71951847d 100644 --- a/src/internal/connector/graph_connector_disconnected_test.go +++ b/src/internal/connector/graph_connector_disconnected_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/alcionai/corso/internal/connector/exchange" + "github.com/alcionai/corso/internal/connector/mockconnector" "github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/data" "github.com/alcionai/corso/pkg/account" @@ -86,8 +86,8 @@ func (suite *DisconnectedGraphConnectorSuite) TestBuild() { func (suite *DisconnectedGraphConnectorSuite) TestInterfaceAlignment() { var dc data.Collection - concrete := exchange.NewCollection("Check", []string{"interface", "works"}) - dc = &concrete + concrete := mockconnector.NewMockExchangeCollection([]string{"a", "path"}, 1) + dc = concrete assert.NotNil(suite.T(), dc) } diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index e32e09c2b..6b7a65c63 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -1,6 +1,7 @@ package connector import ( + "bytes" "context" "testing" @@ -65,6 +66,13 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_ExchangeDataColl assert.Nil(suite.T(), err) assert.True(suite.T(), suite.connector.awaitingMessages > 0) assert.Nil(suite.T(), suite.connector.status) + // Verify Items() call returns an iterable channel(e.g. a channel that has been closed) + channel := collectionList[0].Items() + for object := range channel { + buf := &bytes.Buffer{} + _, err := buf.ReadFrom(object.ToReader()) + assert.Nil(suite.T(), err, "received a buf.Read error") + } status := suite.connector.AwaitStatus() assert.NotNil(suite.T(), status, "status not blocking on async call")