Exchange.Collection.Items() to initiate populate method (#446)

`Items()` starts populate method for collection for the `exchange.Collection` package. Test packages expanded to verify the functionality of this feature.
This commit is contained in:
Danny 2022-07-31 13:57:14 -04:00 committed by GitHub
parent babf8b8d95
commit 2f56a849c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 59 additions and 61 deletions

View File

@ -41,17 +41,26 @@ type Collection struct {
// is desired to be sent through the data channel for eventual storage // is desired to be sent through the data channel for eventual storage
jobs []string jobs []string
service graph.Service
statusCh chan<- *support.ConnectorOperationStatus
// FullPath is the slice representation of the action context passed down through the hierarchy. // FullPath is the slice representation of the action context passed down through the hierarchy.
//The original request can be gleaned from the slice. (e.g. {<tenant ID>, <user ID>, "emails"}) //The original request can be gleaned from the slice. (e.g. {<tenant ID>, <user ID>, "emails"})
fullPath []string fullPath []string
} }
// NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated // NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated
func NewCollection(aUser string, pathRepresentation []string) Collection { func NewCollection(
aUser string,
pathRepresentation []string,
aService graph.Service,
statusCh chan<- *support.ConnectorOperationStatus,
) Collection {
collection := Collection{ collection := Collection{
user: aUser, user: aUser,
data: make(chan data.Stream, collectionChannelBufferSize), data: make(chan data.Stream, collectionChannelBufferSize),
jobs: make([]string, 0), jobs: make([]string, 0),
service: aService,
statusCh: statusCh,
fullPath: pathRepresentation, fullPath: pathRepresentation,
} }
return collection return collection
@ -62,9 +71,15 @@ func (eoc *Collection) AddJob(objID string) {
eoc.jobs = append(eoc.jobs, objID) eoc.jobs = append(eoc.jobs, objID)
} }
// PopulateCollection TODO: remove after async functionilty completed // Items utility function to asynchronously execute process to fill data channel with
func (eoc *Collection) PopulateCollection(newData *Stream) { // M365 exchange objects and returns the data channel
eoc.data <- newData func (eoc *Collection) Items() <-chan data.Stream {
go eoc.PopulateFromCollection(context.TODO(), eoc.service, eoc.statusCh)
return eoc.data
}
func (edc *Collection) FullPath() []string {
return append([]string{}, edc.fullPath...)
} }
// populateFromTaskList async call to fill DataCollection via channel implementation // populateFromTaskList async call to fill DataCollection via channel implementation
@ -155,14 +170,6 @@ func messageToDataCollection(
return nil return nil
} }
func (eoc *Collection) Items() <-chan data.Stream {
return eoc.data
}
func (edc *Collection) FullPath() []string {
return append([]string{}, edc.fullPath...)
}
// Stream represents a single item retrieved from exchange // Stream represents a single item retrieved from exchange
type Stream struct { type Stream struct {
id string id string
@ -185,6 +192,8 @@ func (od *Stream) ToReader() io.ReadCloser {
func (od *Stream) Info() details.ItemInfo { func (od *Stream) Info() details.ItemInfo {
return details.ItemInfo{Exchange: od.info} return details.ItemInfo{Exchange: od.info}
} }
// NewStream constructor for exchange.Stream object
func NewStream(identifier string, bytes []byte, detail details.ExchangeInfo) Stream { func NewStream(identifier string, bytes []byte, detail details.ExchangeInfo) Stream {
return Stream{ return Stream{
id: identifier, id: identifier,

View File

@ -50,52 +50,30 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Empty() {
func (suite *ExchangeDataCollectionSuite) TestExchangeData_FullPath() { func (suite *ExchangeDataCollectionSuite) TestExchangeData_FullPath() {
user := "a-user" user := "a-user"
fullPath := []string{"a-tenant", user, "emails"} fullPath := []string{"a-tenant", user, "emails"}
edc := NewCollection(user, fullPath) edc := Collection{
user: user,
fullPath: fullPath,
}
assert.Equal(suite.T(), edc.FullPath(), fullPath) assert.Equal(suite.T(), edc.FullPath(), fullPath)
} }
func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NewExchangeDataCollection() { func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NewExchangeDataCollection() {
name := "User" name := "User"
edc := NewCollection(name, []string{"Directory", "File", "task"}) edc := Collection{
user: name,
fullPath: []string{"Directory", "File", "task"},
}
suite.Equal(name, edc.user) suite.Equal(name, edc.user)
suite.True(contains(edc.FullPath(), "Directory")) suite.True(contains(edc.FullPath(), "Directory"))
suite.True(contains(edc.FullPath(), "File")) suite.True(contains(edc.FullPath(), "File"))
suite.True(contains(edc.FullPath(), "task")) suite.True(contains(edc.FullPath(), "task"))
} }
func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCollection() {
inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to",
"fetch", "a", "pail", "of", "water"}
expected := len(inputStrings) / 2 // We are using pairs
edc := NewCollection("Fletcher", []string{"sugar", "horses", "painted red"})
for i := 0; i < expected; i++ {
edc.PopulateCollection(&Stream{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])})
}
suite.Equal(expected, len(edc.data))
}
func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_Items() {
inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to",
"fetch", "a", "pail", "of", "water"}
expected := len(inputStrings) / 2 // We are using pairs
edc := NewCollection("Fletcher", []string{"sugar", "horses", "painted red"})
for i := 0; i < expected; i++ {
edc.data <- &Stream{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}
}
close(edc.data)
suite.Equal(expected, len(edc.data))
streams := edc.Items()
suite.Equal(expected, len(streams))
count := 0
for item := range streams {
assert.NotNil(suite.T(), item)
count++
}
suite.Equal(count, expected)
}
func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_AddJob() { func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_AddJob() {
eoc := NewCollection("Dexter", []string{"Today", "is", "was", "different"}) eoc := Collection{
user: "Dexter",
fullPath: []string{"Today", "is", "currently", "different"},
}
suite.Zero(len(eoc.jobs)) suite.Zero(len(eoc.jobs))
shopping := []string{"tomotoes", "potatoes", "pasta", "ice tea"} shopping := []string{"tomotoes", "potatoes", "pasta", "ice tea"}
for _, item := range shopping { for _, item := range shopping {

View File

@ -325,16 +325,24 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m
} }
// Create collection of ExchangeDataCollection and create data Holder // Create collection of ExchangeDataCollection and create data Holder
collections := make(map[string]*exchange.Collection) collections := make(map[string]*exchange.Collection)
var errs error
// callbackFunc iterates through all models.Messageable and fills exchange.Collection.jobs[]
// with corresponding messageIDs. New collections are created for each directory
callbackFunc := func(messageItem any) bool { callbackFunc := func(messageItem any) bool {
message, ok := messageItem.(models.Messageable) message, ok := messageItem.(models.Messageable)
if !ok { if !ok {
err = support.WrapAndAppendf(gc.graphService.adapter.GetBaseUrl(), errors.New("message iteration failure"), err) errs = support.WrapAndAppendf(gc.graphService.adapter.GetBaseUrl(), errors.New("message iteration failure"), err)
return true return true
} }
// Saving to messages to list. Indexed by folder // Saving to messages to list. Indexed by folder
directory := *message.GetParentFolderId() directory := *message.GetParentFolderId()
if _, ok = collections[directory]; !ok { if _, ok = collections[directory]; !ok {
edc := exchange.NewCollection(user, []string{gc.tenant, user, mailCategory, directory}) service, err := gc.createService(gc.failFast)
if err != nil {
errs = support.WrapAndAppend(user, err, errs)
return true
}
edc := exchange.NewCollection(user, []string{gc.tenant, user, mailCategory, directory}, service, gc.statusCh)
collections[directory] = &edc collections[directory] = &edc
} }
collections[directory].AddJob(*message.GetId()) collections[directory].AddJob(*message.GetId())
@ -342,22 +350,17 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m
} }
iterateError := pageIterator.Iterate(callbackFunc) iterateError := pageIterator.Iterate(callbackFunc)
if iterateError != nil { if iterateError != nil {
err = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), iterateError, err) errs = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), iterateError, errs)
} }
if err != nil { if errs != nil {
return nil, err // return error if snapshot is incomplete return nil, errs // return error if snapshot is incomplete
} }
service, err := gc.createService(gc.failFast) for range collections {
if err != nil {
return nil, support.WrapAndAppend(user, err, err)
}
for _, edc := range collections {
go edc.PopulateFromCollection(ctx, service, gc.statusCh)
gc.incrementAwaitingMessages() gc.incrementAwaitingMessages()
} }
return collections, err return collections, errs
} }
// AwaitStatus updates status field based on item within statusChannel. // AwaitStatus updates status field based on item within statusChannel.

View File

@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/connector/exchange" "github.com/alcionai/corso/internal/connector/mockconnector"
"github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data" "github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/pkg/account" "github.com/alcionai/corso/pkg/account"
@ -86,8 +86,8 @@ func (suite *DisconnectedGraphConnectorSuite) TestBuild() {
func (suite *DisconnectedGraphConnectorSuite) TestInterfaceAlignment() { func (suite *DisconnectedGraphConnectorSuite) TestInterfaceAlignment() {
var dc data.Collection var dc data.Collection
concrete := exchange.NewCollection("Check", []string{"interface", "works"}) concrete := mockconnector.NewMockExchangeCollection([]string{"a", "path"}, 1)
dc = &concrete dc = concrete
assert.NotNil(suite.T(), dc) assert.NotNil(suite.T(), dc)
} }

View File

@ -1,6 +1,7 @@
package connector package connector
import ( import (
"bytes"
"context" "context"
"testing" "testing"
@ -65,6 +66,13 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_ExchangeDataColl
assert.Nil(suite.T(), err) assert.Nil(suite.T(), err)
assert.True(suite.T(), suite.connector.awaitingMessages > 0) assert.True(suite.T(), suite.connector.awaitingMessages > 0)
assert.Nil(suite.T(), suite.connector.status) assert.Nil(suite.T(), suite.connector.status)
// Verify Items() call returns an iterable channel(e.g. a channel that has been closed)
channel := collectionList[0].Items()
for object := range channel {
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(object.ToReader())
assert.Nil(suite.T(), err, "received a buf.Read error")
}
status := suite.connector.AwaitStatus() status := suite.connector.AwaitStatus()
assert.NotNil(suite.T(), status, "status not blocking on async call") assert.NotNil(suite.T(), status, "status not blocking on async call")