Implement ExchangeDataCollection iteration (#162)

DataCollection Feature added
In this merge, the DataCollection interface is defined for the GraphConnector. ExchangeDataCollection implements this interface. The subsequent definitions and test suites are included.
This commit is contained in:
Vaibhav Kamra 2022-06-08 14:59:03 -07:00 committed by GitHub
parent a91a739d0e
commit 9043351716
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 19 deletions

View File

@ -5,6 +5,12 @@ import (
"io"
)
const (
// TODO: Reduce this when https://github.com/alcionai/corso/issues/124 is closed
// and we make channel population async (decouple from collection initialization)
collectionChannelBufferSize = 1000
)
// A DataCollection represents a collection of data of the
// same type (e.g. mail)
type DataCollection interface {
@ -36,40 +42,50 @@ type ExchangeDataCollection struct {
user string
// TODO: We would want to replace this with a channel so that we
// don't need to wait for all data to be retrieved before reading it out
data []ExchangeData
// fullPath is the slice representation of the action context passed down through the hierarchy.
data chan ExchangeData
// FullPath is the slice representation of the action context passed down through the hierarchy.
//The original request can be gleaned from the slice. (e.g. {<tenant ID>, <user ID>, "emails"})
fullPath []string
}
// NewExchangeDataCollection creates an ExchangeDataCollection where
// the FullPath is confgured
// NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated
func NewExchangeDataCollection(aUser string, pathRepresentation []string) ExchangeDataCollection {
collection := ExchangeDataCollection{
user: aUser,
data: make([]ExchangeData, 0),
data: make(chan ExchangeData, collectionChannelBufferSize),
fullPath: pathRepresentation,
}
return collection
}
func (ec *ExchangeDataCollection) PopulateCollection(newData ExchangeData) {
ec.data = append(ec.data, newData)
func (edc *ExchangeDataCollection) PopulateCollection(newData ExchangeData) {
edc.data <- newData
}
func (ec *ExchangeDataCollection) Length() int {
return len(ec.data)
// FinishPopulation is used to indicate data population of the collection is complete
// TODO: This should be an internal method once we move the message retrieval logic into `ExchangeDataCollection`
func (edc *ExchangeDataCollection) FinishPopulation() {
close(edc.data)
}
func (edc *ExchangeDataCollection) Length() int {
return len(edc.data)
}
// NextItem returns either the next item in the collection or an error if one occurred.
// If not more items are available in the collection, returns (nil, nil).
func (*ExchangeDataCollection) NextItem() (DataStream, error) {
// TODO: Return the next "to be read" item in the collection as a
// DataStream
return nil, nil
func (edc *ExchangeDataCollection) NextItem() (DataStream, error) {
item, ok := <-edc.data
if !ok {
return nil, io.EOF
}
return &item, nil
}
func (ec *ExchangeDataCollection) FullPath() []string {
return append([]string{}, ec.fullPath...)
func (edc *ExchangeDataCollection) FullPath() []string {
return append([]string{}, edc.fullPath...)
}
// ExchangeData represents a single item retrieved from exchange

View File

@ -16,20 +16,72 @@ func TestExchangeDataCollectionSuite(t *testing.T) {
suite.Run(t, new(ExchangeDataCollectionSuite))
}
func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader() {
func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Valid() {
m := []byte("test message")
ed := &ExchangeData{message: m}
description := "aFile"
ed := &ExchangeData{id: description, message: m}
// Read the message using the `ExchangeData` reader and validate it matches what we set
buf := &bytes.Buffer{}
buf.ReadFrom(ed.ToReader())
_, err := buf.ReadFrom(ed.ToReader())
assert.Nil(suite.T(), err, "received a buf.Read error")
assert.Equal(suite.T(), buf.Bytes(), m)
assert.Equal(suite.T(), description, ed.UUID())
}
func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Empty() {
var empty []byte
expected := int64(0)
ed := &ExchangeData{message: empty}
buf := &bytes.Buffer{}
received, err := buf.ReadFrom(ed.ToReader())
suite.Equal(expected, received)
assert.Nil(suite.T(), err, "received buf.Readfrom error ")
}
func (suite *ExchangeDataCollectionSuite) TestExchangeData_FullPath() {
user := "a-user"
fullPath := []string{"a-tenant", user, "emails"}
edc := NewExchangeDataCollection(user, fullPath)
assert.Equal(suite.T(), edc.FullPath(), fullPath)
}
func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NewExchangeDataCollection() {
name := "User"
edc := NewExchangeDataCollection(name, []string{"Directory", "File", "task"})
suite.Equal(name, edc.user)
suite.True(Contains(edc.FullPath(), "Directory"))
suite.True(Contains(edc.FullPath(), "File"))
suite.True(Contains(edc.FullPath(), "task"))
suite.Zero(edc.Length())
}
func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCollection() {
inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to",
"fetch", "a", "pale", "of", "water"}
expected := len(inputStrings) / 2 // We are using pairs
edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"})
for i := 0; i < expected; i++ {
edc.PopulateCollection(ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])})
}
suite.Equal(expected, edc.Length())
}
func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NextItem() {
inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to",
"fetch", "a", "pale", "of", "water"}
expected := len(inputStrings) / 2 // We are using pairs
edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"})
for i := 0; i < expected; i++ {
edc.PopulateCollection(ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])})
}
edc.FinishPopulation() // finished writing
for i := 0; i < 6; i++ {
data, err := edc.NextItem()
assert.Nil(suite.T(), err)
assert.NotNil(suite.T(), data)
}
// Need that EOF
data, err := edc.NextItem()
assert.Nil(suite.T(), data)
assert.NotNil(suite.T(), err)
}

View File

@ -230,6 +230,7 @@ func (gc *GraphConnector) serializeMessages(user string, dc ExchangeDataCollecti
}
fmt.Printf("Returning ExchangeDataColection with %d items\n", dc.Length())
fmt.Printf("Errors: \n%s\n", ConvertErrorList(errorList))
dc.FinishPopulation()
var errs error
if len(errorList) > 0 {
errs = errors.New(ConvertErrorList(errorList))