diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go new file mode 100644 index 000000000..072fe7a3b --- /dev/null +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -0,0 +1,84 @@ +// Package exchange provides support for retrieving M365 Exchange objects +// from M365 servers using the Graph API. M365 object support centers +// on the applications: Mail, Contacts, and Calendar. +package exchange + +import ( + "bytes" + "io" + + "github.com/alcionai/corso/internal/data" + "github.com/alcionai/corso/pkg/backup/details" +) + +var _ data.Collection = &Collection{} +var _ data.Stream = &Stream{} +var _ data.StreamInfo = &Stream{} + +const ( + collectionChannelBufferSize = 1000 +) + +// Collection represents an compilation of M365 objects from a specific exchange application. +// Each Collection is to only hold one application type at a time. This is not enforced +type Collection struct { + // M365 user + User string // M365 user + Data chan data.Stream // represents a single M365 object from an Exchange application + // FullPath is the slice representation of the action context passed down through the hierarchy. + //The original request can be gleaned from the slice. (e.g. {, , "emails"}) + fullPath []string +} + +// NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated +func NewCollection(aUser string, pathRepresentation []string) Collection { + collection := Collection{ + User: aUser, + Data: make(chan data.Stream, collectionChannelBufferSize), + fullPath: pathRepresentation, + } + return collection +} + +func (eoc *Collection) PopulateCollection(newData *Stream) { + eoc.Data <- newData +} + +// FinishPopulation is used to indicate data population of the collection is complete +// TODO: This should be an internal method once we move the message retrieval logic into `ExchangeDataCollection` +func (eoc *Collection) FinishPopulation() { + if eoc.Data != nil { + close(eoc.Data) + } +} + +// Items() returns the channel containing M365 Exchange objects +func (eoc *Collection) Items() <-chan data.Stream { + return eoc.Data +} + +func (edc *Collection) FullPath() []string { + return append([]string{}, edc.fullPath...) +} + +// Stream represents a single item retrieved from exchange +type Stream struct { + Id string + // TODO: We may need this to be a "oneOf" of `message`, `contact`, etc. + // going forward. Using []byte for now but I assume we'll have + // some structured type in here (serialization to []byte can be done in `Read`) + Message []byte + Inf *details.ExchangeInfo //temporary change to bring populate function into directory +} + +func (od *Stream) UUID() string { + return od.Id +} + +func (od *Stream) ToReader() io.ReadCloser { + return io.NopCloser(bytes.NewReader(od.Message)) +} + +func (od *Stream) Info() details.ItemInfo { + return details.ItemInfo{Exchange: od.Inf} +} diff --git a/src/internal/connector/exchange_data_collection_test.go b/src/internal/connector/exchange/exchange_data_collection_test.go similarity index 69% rename from src/internal/connector/exchange_data_collection_test.go rename to src/internal/connector/exchange/exchange_data_collection_test.go index 70587d93e..9053724b4 100644 --- a/src/internal/connector/exchange_data_collection_test.go +++ b/src/internal/connector/exchange/exchange_data_collection_test.go @@ -1,4 +1,4 @@ -package connector +package exchange import ( "bytes" @@ -12,6 +12,15 @@ type ExchangeDataCollectionSuite struct { suite.Suite } +func contains(elems []string, value string) bool { + for _, s := range elems { + if value == s { + return true + } + } + return false +} + func TestExchangeDataCollectionSuite(t *testing.T) { suite.Run(t, new(ExchangeDataCollectionSuite)) } @@ -19,7 +28,7 @@ func TestExchangeDataCollectionSuite(t *testing.T) { func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Valid() { m := []byte("test message") description := "aFile" - ed := &ExchangeData{id: description, message: m} + ed := &Stream{Id: description, Message: m} // Read the message using the `ExchangeData` reader and validate it matches what we set buf := &bytes.Buffer{} @@ -32,7 +41,7 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Valid() { func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Empty() { var empty []byte expected := int64(0) - ed := &ExchangeData{message: empty} + ed := &Stream{Message: empty} buf := &bytes.Buffer{} received, err := buf.ReadFrom(ed.ToReader()) suite.Equal(expected, received) @@ -41,40 +50,40 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataReader_Empty() { func (suite *ExchangeDataCollectionSuite) TestExchangeData_FullPath() { user := "a-user" fullPath := []string{"a-tenant", user, "emails"} - edc := NewExchangeDataCollection(user, fullPath) + edc := NewCollection(user, fullPath) assert.Equal(suite.T(), edc.FullPath(), fullPath) } func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NewExchangeDataCollection() { name := "User" - edc := NewExchangeDataCollection(name, []string{"Directory", "File", "task"}) - suite.Equal(name, edc.user) - suite.True(Contains(edc.FullPath(), "Directory")) - suite.True(Contains(edc.FullPath(), "File")) - suite.True(Contains(edc.FullPath(), "task")) + edc := NewCollection(name, []string{"Directory", "File", "task"}) + suite.Equal(name, edc.User) + suite.True(contains(edc.FullPath(), "Directory")) + suite.True(contains(edc.FullPath(), "File")) + suite.True(contains(edc.FullPath(), "task")) } func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCollection() { inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to", "fetch", "a", "pail", "of", "water"} expected := len(inputStrings) / 2 // We are using pairs - edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"}) + edc := NewCollection("Fletcher", []string{"sugar", "horses", "painted red"}) for i := 0; i < expected; i++ { - edc.PopulateCollection(&ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}) + edc.PopulateCollection(&Stream{Id: inputStrings[i*2], Message: []byte(inputStrings[i*2+1])}) } - suite.Equal(expected, len(edc.data)) + suite.Equal(expected, len(edc.Data)) } func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_Items() { inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to", "fetch", "a", "pail", "of", "water"} expected := len(inputStrings) / 2 // We are using pairs - edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"}) + edc := NewCollection("Fletcher", []string{"sugar", "horses", "painted red"}) for i := 0; i < expected; i++ { - edc.data <- &ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])} + edc.Data <- &Stream{Id: inputStrings[i*2], Message: []byte(inputStrings[i*2+1])} } - close(edc.data) - suite.Equal(expected, len(edc.data)) + close(edc.Data) + suite.Equal(expected, len(edc.Data)) streams := edc.Items() suite.Equal(expected, len(streams)) count := 0 diff --git a/src/internal/connector/exchange_data_collection.go b/src/internal/connector/exchange_data_collection.go deleted file mode 100644 index ebe3d42a7..000000000 --- a/src/internal/connector/exchange_data_collection.go +++ /dev/null @@ -1,88 +0,0 @@ -package connector - -import ( - "bytes" - "io" - - "github.com/alcionai/corso/internal/connector/support" - "github.com/alcionai/corso/internal/data" - "github.com/alcionai/corso/pkg/backup/details" -) - -var _ data.Collection = &ExchangeDataCollection{} -var _ data.Stream = &ExchangeData{} -var _ data.StreamInfo = &ExchangeData{} - -const ( - collectionChannelBufferSize = 120 -) - -// ExchangeDataCollection represents exchange mailbox -// data for a single user. -// -// It implements the DataCollection interface -type ExchangeDataCollection struct { - // M365 user - user string - data chan data.Stream - tasks []string - updateCh chan support.ConnectorOperationStatus - service graphService - populateFunc PopulateFunc - - // FullPath is the slice representation of the action context passed down through the hierarchy. - //The original request can be gleaned from the slice. (e.g. {, , "emails"}) - fullPath []string -} - -// NewExchangeDataCollection creates an ExchangeDataCollection with fullPath is annotated -func NewExchangeDataCollection(aUser string, pathRepresentation []string) ExchangeDataCollection { - collection := ExchangeDataCollection{ - user: aUser, - data: make(chan data.Stream, collectionChannelBufferSize), - fullPath: pathRepresentation, - } - return collection -} - -func (edc *ExchangeDataCollection) PopulateCollection(newData *ExchangeData) { - edc.data <- newData -} - -// FinishPopulation is used to indicate data population of the collection is complete -// TODO: This should be an internal method once we move the message retrieval logic into `ExchangeDataCollection` -func (edc *ExchangeDataCollection) FinishPopulation() { - if edc != nil && edc.data != nil { - close(edc.data) - } -} - -func (edc *ExchangeDataCollection) Items() <-chan data.Stream { - return edc.data -} - -func (edc *ExchangeDataCollection) FullPath() []string { - return append([]string{}, edc.fullPath...) -} - -// ExchangeData represents a single item retrieved from exchange -type ExchangeData struct { - id string - // TODO: We may need this to be a "oneOf" of `message`, `contact`, etc. - // going forward. Using []byte for now but I assume we'll have - // some structured type in here (serialization to []byte can be done in `Read`) - message []byte - info *details.ExchangeInfo -} - -func (ed *ExchangeData) UUID() string { - return ed.id -} - -func (ed *ExchangeData) ToReader() io.ReadCloser { - return io.NopCloser(bytes.NewReader(ed.message)) -} - -func (ed *ExchangeData) Info() details.ItemInfo { - return details.ItemInfo{Exchange: ed.info} -} diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 4b53954c0..a56936694 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -49,8 +49,6 @@ type graphService struct { failFast bool // if true service will exit sequence upon encountering an error } -type PopulateFunc func(context.Context, graphService, ExchangeDataCollection, chan *support.ConnectorOperationStatus) - func NewGraphConnector(acct account.Account) (*GraphConnector, error) { m365, err := acct.M365Config() if err != nil { @@ -311,7 +309,7 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []data.Collec // serializeMessages: Temp Function as place Holder until Collections have been added // to the GraphConnector struct. -func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (map[string]*ExchangeDataCollection, error) { +func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (map[string]*exchange.Collection, error) { options := optionsForMessageSnapshot() response, err := gc.graphService.client.UsersById(user).Messages().GetWithRequestConfigurationAndResponseHandler(options, nil) if err != nil { @@ -340,11 +338,11 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m return nil, err // return error if snapshot is incomplete } // Create collection of ExchangeDataCollection and create data Holder - collections := make(map[string]*ExchangeDataCollection) + collections := make(map[string]*exchange.Collection) for aFolder := range tasklist { // prep the items for handoff to the backup consumer - edc := NewExchangeDataCollection(user, []string{gc.tenant, user, mailCategory, aFolder}) + edc := exchange.NewCollection(user, []string{gc.tenant, user, mailCategory, aFolder}) collections[aFolder] = &edc } @@ -373,7 +371,7 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) (m func (sc *graphService) populateFromTaskList( ctx context.Context, tasklist TaskList, - collections map[string]*ExchangeDataCollection, + collections map[string]*exchange.Collection, statusChannel chan<- *support.ConnectorOperationStatus, ) { var errs error @@ -392,16 +390,16 @@ func (sc *graphService) populateFromTaskList( } for _, task := range tasks { - response, err := sc.client.UsersById(edc.user).MessagesById(task).Get() + response, err := sc.client.UsersById(edc.User).MessagesById(task).Get() if err != nil { details := support.ConnectorStackErrorTrace(err) - errs = support.WrapAndAppend(edc.user, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs) + errs = support.WrapAndAppend(edc.User, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs) continue } - err = messageToDataCollection(&sc.client, ctx, objectWriter, edc.data, response, edc.user) + err = messageToDataCollection(&sc.client, ctx, objectWriter, edc.Data, response, edc.User) success++ if err != nil { - errs = support.WrapAndAppendf(edc.user, err, errs) + errs = support.WrapAndAppendf(edc.User, err, errs) success-- } if errs != nil && sc.failFast { @@ -466,7 +464,7 @@ func messageToDataCollection( return support.WrapAndAppend(*aMessage.GetId(), errors.Wrap(err, "serializing mail content"), nil) } if byteArray != nil { - dataChannel <- &ExchangeData{id: *aMessage.GetId(), message: byteArray, info: exchange.MessageInfo(aMessage)} + dataChannel <- &exchange.Stream{Id: *aMessage.GetId(), Message: byteArray, Inf: exchange.MessageInfo(aMessage)} } return nil } diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index f2c965767..f1eb2c580 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/internal/connector/exchange" "github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/data" ctesting "github.com/alcionai/corso/internal/testing" @@ -85,8 +86,8 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_restoreMessages( if err != nil { suite.T().Skipf("Support file not accessible: %v\n", err) } - ds := ExchangeData{id: "test", message: bytes} - edc := NewExchangeDataCollection("tenant", []string{"tenantId", evs[user], mailCategory, "Inbox"}) + ds := exchange.Stream{Id: "test", Message: bytes} + edc := exchange.NewCollection("tenant", []string{"tenantId", evs[user], mailCategory, "Inbox"}) edc.PopulateCollection(&ds) edc.FinishPopulation() err = suite.connector.RestoreMessages(context.Background(), []data.Collection{&edc}) @@ -174,7 +175,7 @@ func (suite *DisconnectedGraphConnectorSuite) TestBuild() { func (suite *DisconnectedGraphConnectorSuite) TestInterfaceAlignment() { var dc data.Collection - concrete := NewExchangeDataCollection("Check", []string{"interface", "works"}) + concrete := exchange.NewCollection("Check", []string{"interface", "works"}) dc = &concrete assert.NotNil(suite.T(), dc) diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index 1a91c5e53..aa01a98b1 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/alcionai/corso/internal/connector" + "github.com/alcionai/corso/internal/connector/exchange" "github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/data" "github.com/alcionai/corso/internal/kopia" @@ -46,7 +46,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { stats = restoreStats{ readErr: multierror.Append(nil, assert.AnError), writeErr: assert.AnError, - cs: []data.Collection{&connector.ExchangeDataCollection{}}, + cs: []data.Collection{&exchange.Collection{}}, gc: &support.ConnectorOperationStatus{ ObjectCount: 1, },