133 implement exchangedatacollection populator (#139)

* Issue 133: Tests Failing.
Unable to troubleshoot the iteration error. Reflecting on an object that
is nil. Fundamentally, the code works in so far as the workflow. Initial
query finds the amount of emails, returns... This is where
DataCollection would be created, then the goroutine would have the
information it would need to run asynchronously.

* Issue #133: Populate DataCollection
Bug fix on NewPageIterator: Incorrect MailDiscriminatorValue.
DataCollection implemented. Brute force. However, this does not use the
populate method. The populate method to be added in the next step.

Test Suite: TestMailCount is an end-to-end test. Tests to see if the
amount of values in a mailbox are the amount of messages that are
retrieved. This is false at present. There are several errors from
ParseNodePackage: `unsupported AdditionalData type:
map[string]*jsonserialization.JsonParseNode`. Will have to determine how
errors are to be sent to the DataCollection. Issue #137. Upon
inspection, there are 59 messages on the test account. 7 of them receive
a parsing error and 2 are unaccounted for. Issue #138

* Issue #133: Collection passed into Serialize Message method.

* Issue #133: Changes for PR
Comments have been updated as requested. Additionally, input params have
been updated for clarity.

* Issue #133: Input structure for Options changed to string literal.

* Update src/internal/connector/graph_connector.go

Co-authored-by: Vaibhav Kamra <vkamra@alcion.ai>

* Update src/internal/connector/graph_connector.go

Co-authored-by: Vaibhav Kamra <vkamra@alcion.ai>

* Update src/internal/connector/graph_connector.go

Internal helper function populateCollection() removed as it was commented out anyway.

Co-authored-by: Vaibhav Kamra <vkamra@alcion.ai>

* Update src/internal/connector/graph_connector.go according to suggestion

Consolidate line 193 response.GetValue() into to for loop over folders

Co-authored-by: Vaibhav Kamra <vkamra@alcion.ai>

* Issue #133: Expected item removed
Expected Item can could change between the time that the DataCollection is created. Therefore, the channel will stay open until the object is filled. Definition updated.

* Issue #133: Updates made to graph_connector packages
- Test package updated to test private functions. Results in import
  changes and calls to package level items. All tests verified as
  operational prior to upload.
- GraphConnector removal of GetCount() for Mail objects.
  ExchangeDataCollection call updated in response to definition change.
  Small changes to call stack to remove troubleshooting stubs.

* Gofmt applied to exchange_data_collection.go

* Issue #133: Merge resolution
These changese resolves the issues with the merge that caused the testing to fail. Tests all passing and now merging back into trunk.

* PR Suggestion Get Length -> Length()

Co-authored-by: Vaibhav Kamra <vkamra@alcion.ai>
This commit is contained in:
Danny 2022-06-07 16:04:47 -04:00 committed by GitHub
parent 21bf1d8b39
commit ed1f444b89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 199 additions and 24 deletions

View File

@ -23,10 +23,32 @@ type DataStream interface {
//
// It implements the DataCollection interface
type ExchangeDataCollection struct {
// M365 user
user string
// TODO: We would want to replace this with a channel so that we
// don't need to wait for all data to be retrieved before reading it out
data []ExchangeData
// FullPath is the slice representation of the action context passed down through the hierarchy.
//The original request can be gleaned from the slice. (e.g. {<tenant ID>, <user ID>, "emails"})
FullPath []string
}
// NewExchangeDataCollection creates an ExchangeDataCollection where
// the FullPath is confgured
func NewExchangeDataCollection(aUser string, pathRepresentation []string) ExchangeDataCollection {
collection := ExchangeDataCollection{
user: aUser,
data: make([]ExchangeData, 0),
FullPath: pathRepresentation,
}
return collection
}
func (ec *ExchangeDataCollection) PopulateCollection(newData ExchangeData) {
ec.data = append(ec.data, newData)
}
func (ec *ExchangeDataCollection) Length() int {
return len(ec.data)
}
// NextItem returns either the next item in the collection or an error if one occurred.
@ -37,12 +59,6 @@ func (*ExchangeDataCollection) NextItem() (DataStream, error) {
return nil, nil
}
// Internal Helper that is invoked when the data collection is created to populate it
func (ed *ExchangeDataCollection) populateCollection() error {
// TODO: Read data for `ed.user` and add to collection
return nil
}
// ExchangeData represents a single item retrieved from exchange
type ExchangeData struct {
id string

View File

@ -8,10 +8,12 @@ import (
az "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
ka "github.com/microsoft/kiota-authentication-azure-go"
kw "github.com/microsoft/kiota-serialization-json-go"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
"github.com/microsoftgraph/msgraph-sdk-go/models"
msuser "github.com/microsoftgraph/msgraph-sdk-go/users"
msfolder "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders"
)
// GraphConnector is a struct used to wrap the GraphServiceClient and
@ -80,7 +82,7 @@ func (gc *GraphConnector) setTenantUsers() error {
callbackFunc := func(userItem interface{}) bool {
user, ok := userItem.(models.Userable)
if !ok {
errorList = append(errorList, errors.New("Unable to iterable to user"))
errorList = append(errorList, errors.New("unable to iterable to user"))
return true
}
gc.Users[*user.GetMail()] = *user.GetId()
@ -95,6 +97,7 @@ func (gc *GraphConnector) setTenantUsers() error {
// ConvertsErrorList takes a list of errors and converts returns
// a string
// TODO: Place in error package after merged
func ConvertErrorList(errorList []error) string {
errorLog := ""
for idx, err := range errorList {
@ -105,23 +108,131 @@ func ConvertErrorList(errorList []error) string {
// GetUsers returns the email address of users within tenant.
func (gc *GraphConnector) GetUsers() []string {
keys := make([]string, 0)
for k := range gc.Users {
keys = append(keys, k)
}
return keys
return buildFromMap(true, gc.Users)
}
func (gc *GraphConnector) GetUsersIds() []string {
values := make([]string, 0)
for _, v := range gc.Users {
values = append(values, v)
return buildFromMap(false, gc.Users)
}
func buildFromMap(isKey bool, mapping map[string]string) []string {
returnString := make([]string, 0)
if isKey {
for k := range mapping {
returnString = append(returnString, k)
}
} else {
for _, v := range mapping {
returnString = append(returnString, v)
}
}
return values
return returnString
}
// ExchangeDataStream returns a DataCollection that the caller can
// ExchangeDataStream returns a DataCollection which the caller can
// use to read mailbox data out for the specified user
func (gc *GraphConnector) ExchangeDataCollection(user string) DataCollection {
return &ExchangeDataCollection{user: user}
// Assumption: User exists
// TODO: https://github.com/alcionai/corso/issues/135
// Add iota to this call -> mail, contacts, calendar, etc.
func (gc *GraphConnector) ExchangeDataCollection(user string) (DataCollection, error) {
// TODO replace with completion of Issue 124:
collection := NewExchangeDataCollection(user, []string{gc.tenant, user})
return gc.serializeMessages(user, collection)
}
// optionsForMailFolders creates transforms the 'select' into a more dynamic call for MailFolders.
// var moreOps is a comma separated string of options(e.g. "displayName, isHidden")
// return is first call in MailFolders().GetWithRequestConfigurationAndResponseHandler(options, handler)
func optionsForMailFolders(moreOps []string) *msfolder.MailFoldersRequestBuilderGetRequestConfiguration {
selecting := append(moreOps, "id")
requestParameters := &msfolder.MailFoldersRequestBuilderGetQueryParameters{
Select: selecting,
}
options := &msfolder.MailFoldersRequestBuilderGetRequestConfiguration{
QueryParameters: requestParameters,
}
return options
}
// serializeMessages: Temp Function as place Holder until Collections have been added
// to the GraphConnector struct.
func (gc *GraphConnector) serializeMessages(user string, dc ExchangeDataCollection) (DataCollection, error) {
options := optionsForMailFolders([]string{})
response, err := gc.client.UsersById(user).MailFolders().GetWithRequestConfigurationAndResponseHandler(options, nil)
if err != nil {
return nil, err
}
if response == nil {
return nil, fmt.Errorf("unable to access folders for %s", user)
}
folderList := make([]string, 0)
errorList := make([]error, 0)
for _, folderable := range response.GetValue() {
folderList = append(folderList, *folderable.GetId())
}
fmt.Printf("Folder List: %v\n", folderList)
// Time to create Exchange data Holder
var byteArray []byte
var iterateError error
for _, aFolder := range folderList {
result, err := gc.client.UsersById(user).MailFoldersById(aFolder).Messages().Get()
if err != nil {
errorList = append(errorList, err)
}
if result == nil {
fmt.Println("Cannot Get result")
}
pageIterator, err := msgraphgocore.NewPageIterator(result, &gc.adapter, models.CreateMessageCollectionResponseFromDiscriminatorValue)
if err != nil {
errorList = append(errorList, err)
}
objectWriter := kw.NewJsonSerializationWriter()
callbackFunc := func(messageItem interface{}) bool {
message, ok := messageItem.(models.Messageable)
if !ok {
errorList = append(errorList, fmt.Errorf("unable to iterate on message for user: %s", user))
return true
}
if *message.GetHasAttachments() {
attached, err := gc.client.UsersById(user).MessagesById(*message.GetId()).Attachments().Get()
if err == nil && attached != nil {
message.SetAttachments(attached.GetValue())
}
if err != nil {
err = fmt.Errorf("Attachment Error: " + err.Error())
errorList = append(errorList, err)
}
}
err = objectWriter.WriteObjectValue("", message)
if err != nil {
errorList = append(errorList, err)
return true
}
byteArray, err = objectWriter.GetSerializedContent()
objectWriter.Close()
if err != nil {
errorList = append(errorList, err)
return true
}
if byteArray != nil {
dc.PopulateCollection(ExchangeData{id: *message.GetId(), message: byteArray})
}
return true
}
iterateError = pageIterator.Iterate(callbackFunc)
if iterateError != nil {
errorList = append(errorList, err)
}
}
fmt.Printf("Returning ExchangeDataColection with %d items\n", dc.Length())
fmt.Printf("Errors: \n%s\n", ConvertErrorList(errorList))
var errs error
if len(errorList) > 0 {
errs = errors.New(ConvertErrorList(errorList))
}
return &dc, errs
}

View File

@ -1,19 +1,19 @@
package connector_test
package connector
import (
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
graph "github.com/alcionai/corso/internal/connector"
ctesting "github.com/alcionai/corso/internal/testing"
"github.com/alcionai/corso/pkg/credentials"
)
type GraphConnectorIntegrationSuite struct {
suite.Suite
connector *graph.GraphConnector
connector *GraphConnector
}
func TestGraphConnectorSuite(t *testing.T) {
@ -36,7 +36,7 @@ func (suite *GraphConnectorIntegrationSuite) SetupSuite() {
if err != nil {
suite.T().Fatal(err)
}
suite.connector, err = graph.NewGraphConnector(
suite.connector, err = NewGraphConnector(
evs[credentials.TenantID],
evs[credentials.ClientID],
evs[credentials.ClientSecret])
@ -46,6 +46,7 @@ func (suite *GraphConnectorIntegrationSuite) SetupSuite() {
func (suite *GraphConnectorIntegrationSuite) TestGraphConnector() {
ctesting.LogTimeOfTest(suite.T())
suite.NotNil(suite.connector)
}
// --------------------
@ -58,7 +59,19 @@ func TestDisconnectedGraphSuite(t *testing.T) {
suite.Run(t, new(DiconnectedGraphConnectorSuite))
}
// TestExchangeDataCollection is a call to the M365 backstore to very
func (suite *GraphConnectorIntegrationSuite) TestExchangeDataCollection() {
if os.Getenv("INTEGRATION_TESTING") != "" {
suite.T().Skip("Environmental Variables not set")
}
exchangeData, err := suite.connector.ExchangeDataCollection("dustina@8qzvrj.onmicrosoft.com")
assert.NotNil(suite.T(), exchangeData)
assert.Error(suite.T(), err) // TODO Remove after https://github.com/alcionai/corso/issues/140
suite.T().Logf("Missing Data: %s\n", err.Error())
}
func (suite *DiconnectedGraphConnectorSuite) TestBadConnection() {
table := []struct {
name string
params []string
@ -74,9 +87,44 @@ func (suite *DiconnectedGraphConnectorSuite) TestBadConnection() {
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
gc, err := graph.NewGraphConnector(test.params[0], test.params[1], test.params[2])
gc, err := NewGraphConnector(test.params[0], test.params[1], test.params[2])
assert.Nil(t, gc, test.name+" failed")
assert.NotNil(t, err, test.name+"failed")
})
}
}
// Contains is a helper method for verifying if element
// is contained within the slice
func Contains(elems []string, value string) bool {
for _, s := range elems {
if value == s {
return true
}
}
return false
}
func (suite *DiconnectedGraphConnectorSuite) TestBuild() {
names := make(map[string]string)
names["Al"] = "Bundy"
names["Ellen"] = "Ripley"
names["Axel"] = "Foley"
first := buildFromMap(true, names)
last := buildFromMap(false, names)
suite.True(Contains(first, "Al"))
suite.True(Contains(first, "Ellen"))
suite.True(Contains(first, "Axel"))
suite.True(Contains(last, "Bundy"))
suite.True(Contains(last, "Ripley"))
suite.True(Contains(last, "Foley"))
}
func (suite *DiconnectedGraphConnectorSuite) TestInterfaceAlignment() {
var dc DataCollection
concrete := NewExchangeDataCollection("Check", []string{"interface", "works"})
dc = &concrete
assert.NotNil(suite.T(), dc)
}