ExchangeDataCollection moved to exchange.ObjectCollection (#423)

Exchange DataCollection moved to the exchange package. Unused fields removed at this time.
This commit is contained in:
Danny 2022-07-27 17:22:27 -04:00 committed by GitHub
parent e35acb79ca
commit 665fa21b13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 124 additions and 120 deletions

View File

@ -0,0 +1,84 @@
// Package exchange provides support for retrieving M365 Exchange objects
// from M365 servers using the Graph API. M365 object support centers
// on the applications: Mail, Contacts, and Calendar.
package exchange
import (
"bytes"
"io"
"github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/pkg/backup/details"
)
var _ data.Collection = &Collection{}
var _ data.Stream = &Stream{}
var _ data.StreamInfo = &Stream{}
const (
collectionChannelBufferSize = 1000
)
// Collection represents an compilation of M365 objects from a specific exchange application.
// Each Collection is to only hold one application type at a time. This is not enforced
type Collection struct {
// M365 user
User string // M365 user
Data chan data.Stream // represents a single M365 object from an Exchange application
// FullPath is the slice representation of the action context passed down through the hierarchy.
//The original request can be gleaned from the slice. (e.g. {<tenant ID>, <user ID>, "emails"})
fullPath []string
}
// NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated
func NewCollection(aUser string, pathRepresentation []string) Collection {
collection := Collection{
User: aUser,
Data: make(chan data.Stream, collectionChannelBufferSize),
fullPath: pathRepresentation,
}
return collection
}
func (eoc *Collection) PopulateCollection(newData *Stream) {
eoc.Data <- newData
}
// FinishPopulation is used to indicate data population of the collection is complete
// TODO: This should be an internal method once we move the message retrieval logic into `ExchangeDataCollection`
func (eoc *Collection) FinishPopulation() {
if eoc.Data != nil {
close(eoc.Data)
}
}
// Items() returns the channel containing M365 Exchange objects
func (eoc *Collection) Items() <-chan data.Stream {
return eoc.Data
}
func (edc *Collection) FullPath() []string {
return append([]string{}, edc.fullPath...)
}
// Stream represents a single item retrieved from exchange
type Stream struct {
Id string
// TODO: We may need this to be a "oneOf" of `message`, `contact`, etc.
// going forward. Using []byte for now but I assume we'll have
// some structured type in here (serialization to []byte can be done in `Read`)
Message []byte
Inf *details.ExchangeInfo //temporary change to bring populate function into directory
}
func (od *Stream) UUID() string {
return od.Id
}
func (od *Stream) ToReader() io.ReadCloser {
return io.NopCloser(bytes.NewReader(od.Message))
}
func (od *Stream) Info() details.ItemInfo {
return details.ItemInfo{Exchange: od.Inf}
}

View File

@ -1,4 +1,4 @@
package connector package exchange
import ( import (
"bytes" "bytes"
@ -12,6 +12,15 @@ type ExchangeDataCollectionSuite struct {
suite.Suite suite.Suite
} }
func contains(elems []string, value string) bool {
for _, s := range elems {
if value == s {
return true
}
}
return false
}
func TestExchangeDataCollectionSuite(t *testing.T) { func TestExchangeDataCollectionSuite(t *testing.T) {
suite.Run(t, new(ExchangeDataCollectionSuite)) suite.Run(t, new(ExchangeDataCollectionSuite))
} }
@ -19,7 +28,7 @@ func TestExchangeDataCollectionSuite(t *testing.T) {
func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Valid() { func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Valid() {
m := []byte("test message") m := []byte("test message")
description := "aFile" description := "aFile"
ed := &ExchangeData{id: description, message: m} ed := &Stream{Id: description, Message: m}
// Read the message using the `ExchangeData` reader and validate it matches what we set // Read the message using the `ExchangeData` reader and validate it matches what we set
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
@ -32,7 +41,7 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Valid() {
func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Empty() { func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Empty() {
var empty []byte var empty []byte
expected := int64(0) expected := int64(0)
ed := &ExchangeData{message: empty} ed := &Stream{Message: empty}
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
received, err := buf.ReadFrom(ed.ToReader()) received, err := buf.ReadFrom(ed.ToReader())
suite.Equal(expected, received) suite.Equal(expected, received)
@ -41,40 +50,40 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Empty() {
func (suite *ExchangeDataCollectionSuite) TestExchangeData_FullPath() { func (suite *ExchangeDataCollectionSuite) TestExchangeData_FullPath() {
user := "a-user" user := "a-user"
fullPath := []string{"a-tenant", user, "emails"} fullPath := []string{"a-tenant", user, "emails"}
edc := NewExchangeDataCollection(user, fullPath) edc := NewCollection(user, fullPath)
assert.Equal(suite.T(), edc.FullPath(), fullPath) assert.Equal(suite.T(), edc.FullPath(), fullPath)
} }
func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NewExchangeDataCollection() { func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NewExchangeDataCollection() {
name := "User" name := "User"
edc := NewExchangeDataCollection(name, []string{"Directory", "File", "task"}) edc := NewCollection(name, []string{"Directory", "File", "task"})
suite.Equal(name, edc.user) suite.Equal(name, edc.User)
suite.True(Contains(edc.FullPath(), "Directory")) suite.True(contains(edc.FullPath(), "Directory"))
suite.True(Contains(edc.FullPath(), "File")) suite.True(contains(edc.FullPath(), "File"))
suite.True(Contains(edc.FullPath(), "task")) suite.True(contains(edc.FullPath(), "task"))
} }
func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCollection() { func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCollection() {
inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to", inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to",
"fetch", "a", "pail", "of", "water"} "fetch", "a", "pail", "of", "water"}
expected := len(inputStrings) / 2 // We are using pairs expected := len(inputStrings) / 2 // We are using pairs
edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"}) edc := NewCollection("Fletcher", []string{"sugar", "horses", "painted red"})
for i := 0; i < expected; i++ { for i := 0; i < expected; i++ {
edc.PopulateCollection(&ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}) edc.PopulateCollection(&Stream{Id: inputStrings[i*2], Message: []byte(inputStrings[i*2+1])})
} }
suite.Equal(expected, len(edc.data)) suite.Equal(expected, len(edc.Data))
} }
func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_Items() { func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_Items() {
inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to", inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to",
"fetch", "a", "pail", "of", "water"} "fetch", "a", "pail", "of", "water"}
expected := len(inputStrings) / 2 // We are using pairs expected := len(inputStrings) / 2 // We are using pairs
edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"}) edc := NewCollection("Fletcher", []string{"sugar", "horses", "painted red"})
for i := 0; i < expected; i++ { for i := 0; i < expected; i++ {
edc.data <- &ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])} edc.Data <- &Stream{Id: inputStrings[i*2], Message: []byte(inputStrings[i*2+1])}
} }
close(edc.data) close(edc.Data)
suite.Equal(expected, len(edc.data)) suite.Equal(expected, len(edc.Data))
streams := edc.Items() streams := edc.Items()
suite.Equal(expected, len(streams)) suite.Equal(expected, len(streams))
count := 0 count := 0

View File

@ -1,88 +0,0 @@
package connector
import (
"bytes"
"io"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/pkg/backup/details"
)
var _ data.Collection = &ExchangeDataCollection{}
var _ data.Stream = &ExchangeData{}
var _ data.StreamInfo = &ExchangeData{}
const (
collectionChannelBufferSize = 120
)
// ExchangeDataCollection represents exchange mailbox
// data for a single user.
//
// It implements the DataCollection interface
type ExchangeDataCollection struct {
// M365 user
user string
data chan data.Stream
tasks []string
updateCh chan support.ConnectorOperationStatus
service graphService
populateFunc PopulateFunc
// FullPath is the slice representation of the action context passed down through the hierarchy.
//The original request can be gleaned from the slice. (e.g. {<tenant ID>, <user ID>, "emails"})
fullPath []string
}
// NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated
func NewExchangeDataCollection(aUser string, pathRepresentation []string) ExchangeDataCollection {
collection := ExchangeDataCollection{
user: aUser,
data: make(chan data.Stream, collectionChannelBufferSize),
fullPath: pathRepresentation,
}
return collection
}
func (edc *ExchangeDataCollection) PopulateCollection(newData *ExchangeData) {
edc.data <- newData
}
// FinishPopulation is used to indicate data population of the collection is complete
// TODO: This should be an internal method once we move the message retrieval logic into `ExchangeDataCollection`
func (edc *ExchangeDataCollection) FinishPopulation() {
if edc != nil && edc.data != nil {
close(edc.data)
}
}
func (edc *ExchangeDataCollection) Items() <-chan data.Stream {
return edc.data
}
func (edc *ExchangeDataCollection) FullPath() []string {
return append([]string{}, edc.fullPath...)
}
// ExchangeData represents a single item retrieved from exchange
type ExchangeData struct {
id string
// TODO: We may need this to be a "oneOf" of `message`, `contact`, etc.
// going forward. Using []byte for now but I assume we'll have
// some structured type in here (serialization to []byte can be done in `Read`)
message []byte
info *details.ExchangeInfo
}
func (ed *ExchangeData) UUID() string {
return ed.id
}
func (ed *ExchangeData) ToReader() io.ReadCloser {
return io.NopCloser(bytes.NewReader(ed.message))
}
func (ed *ExchangeData) Info() details.ItemInfo {
return details.ItemInfo{Exchange: ed.info}
}

View File

@ -49,8 +49,6 @@ type graphService struct {
failFast bool // if true service will exit sequence upon encountering an error failFast bool // if true service will exit sequence upon encountering an error
} }
type PopulateFunc func(context.Context, graphService, ExchangeDataCollection, chan *support.ConnectorOperationStatus)
func NewGraphConnector(acct account.Account) (*GraphConnector, error) { func NewGraphConnector(acct account.Account) (*GraphConnector, error) {
m365, err := acct.M365Config() m365, err := acct.M365Config()
if err != nil { if err != nil {
@ -311,7 +309,7 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []data.Collec
// serializeMessages: Temp Function as place Holder until Collections have been added // serializeMessages: Temp Function as place Holder until Collections have been added
// to the GraphConnector struct. // to the GraphConnector struct.
func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (map[string]*ExchangeDataCollection, error) { func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (map[string]*exchange.Collection, error) {
options := optionsForMessageSnapshot() options := optionsForMessageSnapshot()
response, err := gc.graphService.client.UsersById(user).Messages().GetWithRequestConfigurationAndResponseHandler(options, nil) response, err := gc.graphService.client.UsersById(user).Messages().GetWithRequestConfigurationAndResponseHandler(options, nil)
if err != nil { if err != nil {
@ -340,11 +338,11 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m
return nil, err // return error if snapshot is incomplete return nil, err // return error if snapshot is incomplete
} }
// Create collection of ExchangeDataCollection and create data Holder // Create collection of ExchangeDataCollection and create data Holder
collections := make(map[string]*ExchangeDataCollection) collections := make(map[string]*exchange.Collection)
for aFolder := range tasklist { for aFolder := range tasklist {
// prep the items for handoff to the backup consumer // prep the items for handoff to the backup consumer
edc := NewExchangeDataCollection(user, []string{gc.tenant, user, mailCategory, aFolder}) edc := exchange.NewCollection(user, []string{gc.tenant, user, mailCategory, aFolder})
collections[aFolder] = &edc collections[aFolder] = &edc
} }
@ -373,7 +371,7 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m
func (sc *graphService) populateFromTaskList( func (sc *graphService) populateFromTaskList(
ctx context.Context, ctx context.Context,
tasklist TaskList, tasklist TaskList,
collections map[string]*ExchangeDataCollection, collections map[string]*exchange.Collection,
statusChannel chan<- *support.ConnectorOperationStatus, statusChannel chan<- *support.ConnectorOperationStatus,
) { ) {
var errs error var errs error
@ -392,16 +390,16 @@ func (sc *graphService) populateFromTaskList(
} }
for _, task := range tasks { for _, task := range tasks {
response, err := sc.client.UsersById(edc.user).MessagesById(task).Get() response, err := sc.client.UsersById(edc.User).MessagesById(task).Get()
if err != nil { if err != nil {
details := support.ConnectorStackErrorTrace(err) details := support.ConnectorStackErrorTrace(err)
errs = support.WrapAndAppend(edc.user, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs) errs = support.WrapAndAppend(edc.User, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs)
continue continue
} }
err = messageToDataCollection(&sc.client, ctx, objectWriter, edc.data, response, edc.user) err = messageToDataCollection(&sc.client, ctx, objectWriter, edc.Data, response, edc.User)
success++ success++
if err != nil { if err != nil {
errs = support.WrapAndAppendf(edc.user, err, errs) errs = support.WrapAndAppendf(edc.User, err, errs)
success-- success--
} }
if errs != nil && sc.failFast { if errs != nil && sc.failFast {
@ -466,7 +464,7 @@ func messageToDataCollection(
return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil) return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil)
} }
if byteArray != nil { if byteArray != nil {
dataChannel <- &ExchangeData{id: *aMessage.GetId(), message: byteArray, info: exchange.MessageInfo(aMessage)} dataChannel <- &exchange.Stream{Id: *aMessage.GetId(), Message: byteArray, Inf: exchange.MessageInfo(aMessage)}
} }
return nil return nil
} }

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/connector/exchange"
"github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data" "github.com/alcionai/corso/internal/data"
ctesting "github.com/alcionai/corso/internal/testing" ctesting "github.com/alcionai/corso/internal/testing"
@ -85,8 +86,8 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_restoreMessages(
if err != nil { if err != nil {
suite.T().Skipf("Support file not accessible: %v\n", err) suite.T().Skipf("Support file not accessible: %v\n", err)
} }
ds := ExchangeData{id: "test", message: bytes} ds := exchange.Stream{Id: "test", Message: bytes}
edc := NewExchangeDataCollection("tenant", []string{"tenantId", evs[user], mailCategory, "Inbox"}) edc := exchange.NewCollection("tenant", []string{"tenantId", evs[user], mailCategory, "Inbox"})
edc.PopulateCollection(&ds) edc.PopulateCollection(&ds)
edc.FinishPopulation() edc.FinishPopulation()
err = suite.connector.RestoreMessages(context.Background(), []data.Collection{&edc}) err = suite.connector.RestoreMessages(context.Background(), []data.Collection{&edc})
@ -174,7 +175,7 @@ func (suite *DisconnectedGraphConnectorSuite) TestBuild() {
func (suite *DisconnectedGraphConnectorSuite) TestInterfaceAlignment() { func (suite *DisconnectedGraphConnectorSuite) TestInterfaceAlignment() {
var dc data.Collection var dc data.Collection
concrete := NewExchangeDataCollection("Check", []string{"interface", "works"}) concrete := exchange.NewCollection("Check", []string{"interface", "works"})
dc = &concrete dc = &concrete
assert.NotNil(suite.T(), dc) assert.NotNil(suite.T(), dc)

View File

@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/internal/connector/exchange"
"github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data" "github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/internal/kopia" "github.com/alcionai/corso/internal/kopia"
@ -46,7 +46,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
stats = restoreStats{ stats = restoreStats{
readErr: multierror.Append(nil, assert.AnError), readErr: multierror.Append(nil, assert.AnError),
writeErr: assert.AnError, writeErr: assert.AnError,
cs: []data.Collection{&connector.ExchangeDataCollection{}}, cs: []data.Collection{&exchange.Collection{}},
gc: &support.ConnectorOperationStatus{ gc: &support.ConnectorOperationStatus{
ObjectCount: 1, ObjectCount: 1,
}, },