exchange.Collection to fill in key information from selectors (#457)
Selectors and scope information are used to create the correct populate methods. Populate methods changed to fit type abstractions definitions for iteration, querying, and collection population.
This commit is contained in:
parent
88a8e992fd
commit
bb7531d95c
@ -32,10 +32,8 @@ const (
|
||||
RestoreCanonicalEnableValue = "4"
|
||||
)
|
||||
|
||||
// ExchangeDataCollection represents exchange mailbox
|
||||
// data for a single user.
|
||||
//
|
||||
// It implements the DataCollection interface
|
||||
// Collection implements the interface from data.Collection
|
||||
// Structure holds data for an Exchange application for a single user
|
||||
type Collection struct {
|
||||
// M365 user
|
||||
user string // M365 user
|
||||
@ -43,34 +41,52 @@ type Collection struct {
|
||||
// jobs represents items from the inventory of M365 objectIds whose information
|
||||
// is desired to be sent through the data channel for eventual storage
|
||||
jobs []string
|
||||
|
||||
service graph.Service
|
||||
// service - client/adapter pair used to access M365 back store
|
||||
service graph.Service
|
||||
// populate - Utility function to populate collection based on the M365 application type and granularity
|
||||
populate populater
|
||||
statusCh chan<- *support.ConnectorOperationStatus
|
||||
// FullPath is the slice representation of the action context passed down through the hierarchy.
|
||||
//The original request can be gleaned from the slice. (e.g. {<tenant ID>, <user ID>, "emails"})
|
||||
fullPath []string
|
||||
}
|
||||
|
||||
// Populater are a class of functions that can be used to fill exchange.Collections with
|
||||
// the corresponding information
|
||||
type populater func(context.Context, graph.Service, *Collection, chan<- *support.ConnectorOperationStatus)
|
||||
|
||||
// NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated
|
||||
func NewCollection(
|
||||
aUser string,
|
||||
pathRepresentation []string,
|
||||
user string,
|
||||
fullPath []string,
|
||||
collectionType optionIdentifier,
|
||||
aService graph.Service,
|
||||
service graph.Service,
|
||||
statusCh chan<- *support.ConnectorOperationStatus,
|
||||
) Collection {
|
||||
collection := Collection{
|
||||
user: aUser,
|
||||
user: user,
|
||||
data: make(chan data.Stream, collectionChannelBufferSize),
|
||||
jobs: make([]string, 0),
|
||||
service: aService,
|
||||
service: service,
|
||||
statusCh: statusCh,
|
||||
fullPath: pathRepresentation,
|
||||
fullPath: fullPath,
|
||||
populate: getPopulateFunction(collectionType),
|
||||
}
|
||||
return collection
|
||||
}
|
||||
|
||||
// AddJob appends additional objectID to job field job
|
||||
// getPopulateFunction is a function to set populate function field
|
||||
// with exchange-application specific functions
|
||||
func getPopulateFunction(optId optionIdentifier) populater {
|
||||
switch optId {
|
||||
case messages:
|
||||
return PopulateFromCollection
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// AddJob appends additional objectID to structure's jobs field
|
||||
func (eoc *Collection) AddJob(objID string) {
|
||||
eoc.jobs = append(eoc.jobs, objID)
|
||||
}
|
||||
@ -78,18 +94,22 @@ func (eoc *Collection) AddJob(objID string) {
|
||||
// Items utility function to asynchronously execute process to fill data channel with
|
||||
// M365 exchange objects and returns the data channel
|
||||
func (eoc *Collection) Items() <-chan data.Stream {
|
||||
go eoc.PopulateFromCollection(context.TODO(), eoc.service, eoc.statusCh)
|
||||
if eoc.populate != nil {
|
||||
go eoc.populate(context.TODO(), eoc.service, eoc, eoc.statusCh)
|
||||
}
|
||||
return eoc.data
|
||||
}
|
||||
|
||||
// FullPath returns the Collection's fullPath []string
|
||||
func (edc *Collection) FullPath() []string {
|
||||
return append([]string{}, edc.fullPath...)
|
||||
}
|
||||
|
||||
// populateFromTaskList async call to fill DataCollection via channel implementation
|
||||
func (edc *Collection) PopulateFromCollection(
|
||||
// PopulateFromCollection async call to fill DataCollection via channel implementation
|
||||
func PopulateFromCollection(
|
||||
ctx context.Context,
|
||||
service graph.Service,
|
||||
edc *Collection,
|
||||
statusChannel chan<- *support.ConnectorOperationStatus,
|
||||
) {
|
||||
var errs error
|
||||
|
||||
@ -2,10 +2,19 @@ package exchange
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/internal/connector/graph"
|
||||
"github.com/alcionai/corso/internal/connector/mockconnector"
|
||||
"github.com/alcionai/corso/internal/connector/support"
|
||||
"github.com/alcionai/corso/internal/data"
|
||||
"github.com/alcionai/corso/pkg/backup/details"
|
||||
)
|
||||
|
||||
type ExchangeDataCollectionSuite struct {
|
||||
@ -73,3 +82,35 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_AddJob() {
|
||||
suite.Equal(len(shopping), len(eoc.jobs))
|
||||
|
||||
}
|
||||
|
||||
// TestExchangeCollection_Items() tests for the Collection.Items() ability
|
||||
// to asynchronously fill `data` field with Stream objects
|
||||
func (suite *ExchangeDataCollectionSuite) TestExchangeCollection_Items() {
|
||||
expected := 5
|
||||
testFunction := func(ctx context.Context,
|
||||
service graph.Service,
|
||||
eoc *Collection,
|
||||
notUsed chan<- *support.ConnectorOperationStatus) {
|
||||
detail := &details.ExchangeInfo{Sender: "foo@bar.com", Subject: "Hello world!", Received: time.Now()}
|
||||
for i := 0; i < expected; i++ {
|
||||
temp := NewStream(uuid.NewString(), mockconnector.GetMockMessageBytes("Test_Items()"), *detail)
|
||||
eoc.data <- &temp
|
||||
}
|
||||
close(eoc.data)
|
||||
}
|
||||
|
||||
eoc := Collection{
|
||||
user: "Dexter",
|
||||
fullPath: []string{"Today", "is", "currently", "different"},
|
||||
data: make(chan data.Stream, expected),
|
||||
populate: testFunction,
|
||||
}
|
||||
t := suite.T()
|
||||
itemsReturn := eoc.Items()
|
||||
retrieved := 0
|
||||
for item := range itemsReturn {
|
||||
assert.NotNil(t, item)
|
||||
retrieved++
|
||||
}
|
||||
suite.Equal(expected, retrieved)
|
||||
}
|
||||
|
||||
@ -39,7 +39,7 @@ func NewMockExchangeCollection(pathRepresentation []string, numMessagesToReturn
|
||||
|
||||
for i := 0; i < c.messageCount; i++ {
|
||||
// We can plug in whatever data we want here (can be an io.Reader to a test data file if needed)
|
||||
c.Data = append(c.Data, getMockMessageBytes("From: NewMockExchangeCollection"))
|
||||
c.Data = append(c.Data, GetMockMessageBytes("From: NewMockExchangeCollection"))
|
||||
c.Names = append(c.Names, uuid.NewString())
|
||||
}
|
||||
return c
|
||||
@ -85,9 +85,9 @@ func (med *MockExchangeData) Info() details.ItemInfo {
|
||||
return details.ItemInfo{Exchange: &details.ExchangeInfo{Sender: "foo@bar.com", Subject: "Hello world!", Received: time.Now()}}
|
||||
}
|
||||
|
||||
// getMockMessageBytes returns bytes for Messageable item.
|
||||
// GetMockMessageBytes returns bytes for Messageable item.
|
||||
// Contents verified as working with sample data from kiota-serialization-json-go v0.5.5
|
||||
func getMockMessageBytes(subject string) []byte {
|
||||
func GetMockMessageBytes(subject string) []byte {
|
||||
|
||||
userID, err := tester.M365UserID()
|
||||
if err != nil {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user