diff --git a/src/internal/connector/exchange_data_collection.go b/src/internal/connector/exchange_data_collection.go index 7ce5170bf..155920381 100644 --- a/src/internal/connector/exchange_data_collection.go +++ b/src/internal/connector/exchange_data_collection.go @@ -5,6 +5,12 @@ import ( "io" ) +const ( + // TODO: Reduce this when https://github.com/alcionai/corso/issues/124 is closed + // and we make channel population async (decouple from collection initialization) + collectionChannelBufferSize = 1000 +) + // A DataCollection represents a collection of data of the // same type (e.g. mail) type DataCollection interface { @@ -36,40 +42,50 @@ type ExchangeDataCollection struct { user string // TODO: We would want to replace this with a channel so that we // don't need to wait for all data to be retrieved before reading it out - data []ExchangeData - // fullPath is the slice representation of the action context passed down through the hierarchy. + + data chan ExchangeData + // FullPath is the slice representation of the action context passed down through the hierarchy. + //The original request can be gleaned from the slice. (e.g. {, , "emails"}) fullPath []string } -// NewExchangeDataCollection creates an ExchangeDataCollection where -// the FullPath is confgured +// NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated func NewExchangeDataCollection(aUser string, pathRepresentation []string) ExchangeDataCollection { collection := ExchangeDataCollection{ user: aUser, - data: make([]ExchangeData, 0), + data: make(chan ExchangeData, collectionChannelBufferSize), fullPath: pathRepresentation, } return collection } -func (ec *ExchangeDataCollection) PopulateCollection(newData ExchangeData) { - ec.data = append(ec.data, newData) +func (edc *ExchangeDataCollection) PopulateCollection(newData ExchangeData) { + edc.data <- newData } -func (ec *ExchangeDataCollection) Length() int { - return len(ec.data) + +// FinishPopulation is used to indicate data population of the collection is complete +// TODO: This should be an internal method once we move the message retrieval logic into `ExchangeDataCollection` +func (edc *ExchangeDataCollection) FinishPopulation() { + close(edc.data) +} + +func (edc *ExchangeDataCollection) Length() int { + return len(edc.data) } // NextItem returns either the next item in the collection or an error if one occurred. // If not more items are available in the collection, returns (nil, nil). -func (*ExchangeDataCollection) NextItem() (DataStream, error) { - // TODO: Return the next "to be read" item in the collection as a - // DataStream - return nil, nil +func (edc *ExchangeDataCollection) NextItem() (DataStream, error) { + item, ok := <-edc.data + if !ok { + return nil, io.EOF + } + return &item, nil } -func (ec *ExchangeDataCollection) FullPath() []string { - return append([]string{}, ec.fullPath...) +func (edc *ExchangeDataCollection) FullPath() []string { + return append([]string{}, edc.fullPath...) } // ExchangeData represents a single item retrieved from exchange diff --git a/src/internal/connector/exchange_data_collection_test.go b/src/internal/connector/exchange_data_collection_test.go index c8d22ac76..b0730f273 100644 --- a/src/internal/connector/exchange_data_collection_test.go +++ b/src/internal/connector/exchange_data_collection_test.go @@ -16,20 +16,72 @@ func TestExchangeDataCollectionSuite(t *testing.T) { suite.Run(t, new(ExchangeDataCollectionSuite)) } -func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader() { +func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Valid() { m := []byte("test message") - ed := &ExchangeData{message: m} + description := "aFile" + ed := &ExchangeData{id: description, message: m} // Read the message using the `ExchangeData` reader and validate it matches what we set buf := &bytes.Buffer{} - buf.ReadFrom(ed.ToReader()) + _, err := buf.ReadFrom(ed.ToReader()) + assert.Nil(suite.T(), err, "received a buf.Read error") assert.Equal(suite.T(), buf.Bytes(), m) + assert.Equal(suite.T(), description, ed.UUID()) } +func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Empty() { + var empty []byte + expected := int64(0) + ed := &ExchangeData{message: empty} + buf := &bytes.Buffer{} + received, err := buf.ReadFrom(ed.ToReader()) + suite.Equal(expected, received) + assert.Nil(suite.T(), err, "received buf.Readfrom error ") +} func (suite *ExchangeDataCollectionSuite) TestExchangeData_FullPath() { user := "a-user" fullPath := []string{"a-tenant", user, "emails"} edc := NewExchangeDataCollection(user, fullPath) - assert.Equal(suite.T(), edc.FullPath(), fullPath) } + +func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NewExchangeDataCollection() { + name := "User" + edc := NewExchangeDataCollection(name, []string{"Directory", "File", "task"}) + suite.Equal(name, edc.user) + suite.True(Contains(edc.FullPath(), "Directory")) + suite.True(Contains(edc.FullPath(), "File")) + suite.True(Contains(edc.FullPath(), "task")) + suite.Zero(edc.Length()) +} + +func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCollection() { + inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to", + "fetch", "a", "pale", "of", "water"} + expected := len(inputStrings) / 2 // We are using pairs + edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"}) + for i := 0; i < expected; i++ { + edc.PopulateCollection(ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}) + } + suite.Equal(expected, edc.Length()) +} +func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NextItem() { + inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to", + "fetch", "a", "pale", "of", "water"} + expected := len(inputStrings) / 2 // We are using pairs + edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"}) + for i := 0; i < expected; i++ { + edc.PopulateCollection(ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}) + } + edc.FinishPopulation() // finished writing + for i := 0; i < 6; i++ { + data, err := edc.NextItem() + assert.Nil(suite.T(), err) + assert.NotNil(suite.T(), data) + } + // Need that EOF + data, err := edc.NextItem() + assert.Nil(suite.T(), data) + assert.NotNil(suite.T(), err) + +} diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 92940189f..a6291bc11 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -230,6 +230,7 @@ func (gc *GraphConnector) serializeMessages(user string, dc ExchangeDataCollecti } fmt.Printf("Returning ExchangeDataColection with %d items\n", dc.Length()) fmt.Printf("Errors: \n%s\n", ConvertErrorList(errorList)) + dc.FinishPopulation() var errs error if len(errorList) > 0 { errs = errors.New(ConvertErrorList(errorList))