diff --git a/src/internal/connector/exchange_data_collection.go b/src/internal/connector/exchange_data_collection.go index df5ee573a..49138f077 100644 --- a/src/internal/connector/exchange_data_collection.go +++ b/src/internal/connector/exchange_data_collection.go @@ -14,9 +14,11 @@ const ( // A DataCollection represents a collection of data of the // same type (e.g. mail) type DataCollection interface { - // Returns either the next item in the collection or an error if one occurred. - // If not more items are available in the collection, returns (nil, nil). - NextItem() (DataStream, error) + // Items returns a channel from which items in the collection can be read. + // Each returned struct contains the next item in the collection + // The channel is closed when there are no more items in the collection or if + // an unrecoverable error caused an early termination in the sender. + Items() <-chan DataStream // FullPath returns a slice of strings that act as metadata tags for this // DataCollection. Returned items should be ordered from most generic to least // generic. For example, a DataCollection for emails from a specific user @@ -27,9 +29,9 @@ type DataCollection interface { // DataStream represents a single item within a DataCollection // that can be consumed as a stream (it embeds io.Reader) type DataStream interface { - // Returns an io.Reader for the DataStream + // ToReader returns an io.Reader for the DataStream ToReader() io.ReadCloser - // Provides a unique identifier for this data + // UUID provides a unique identifier for this data UUID() string } @@ -40,12 +42,8 @@ type DataStream interface { type ExchangeDataCollection struct { // M365 user user string - // TODO: We would want to replace this with a channel so that we - // don't need to wait for all data to be retrieved before reading it out - - data chan ExchangeData + data chan DataStream // FullPath is the slice representation of the action context passed down through the hierarchy. - //The original request can be gleaned from the slice. (e.g. {, , "emails"}) fullPath []string } @@ -54,13 +52,13 @@ type ExchangeDataCollection struct { func NewExchangeDataCollection(aUser string, pathRepresentation []string) ExchangeDataCollection { collection := ExchangeDataCollection{ user: aUser, - data: make(chan ExchangeData, collectionChannelBufferSize), + data: make(chan DataStream, collectionChannelBufferSize), fullPath: pathRepresentation, } return collection } -func (edc *ExchangeDataCollection) PopulateCollection(newData ExchangeData) { +func (edc *ExchangeDataCollection) PopulateCollection(newData *ExchangeData) { edc.data <- newData } @@ -76,14 +74,8 @@ func (edc *ExchangeDataCollection) Length() int { return len(edc.data) } -// NextItem returns either the next item in the collection or an error if one occurred. -// If not more items are available in the collection, returns (nil, nil). -func (edc *ExchangeDataCollection) NextItem() (DataStream, error) { - item, ok := <-edc.data - if !ok { - return nil, io.EOF - } - return &item, nil +func (edc *ExchangeDataCollection) Items() <-chan DataStream { + return edc.data } func (edc *ExchangeDataCollection) FullPath() []string { diff --git a/src/internal/connector/exchange_data_collection_test.go b/src/internal/connector/exchange_data_collection_test.go index b0730f273..c8b1cb7cf 100644 --- a/src/internal/connector/exchange_data_collection_test.go +++ b/src/internal/connector/exchange_data_collection_test.go @@ -61,27 +61,26 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCol expected := len(inputStrings) / 2 // We are using pairs edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"}) for i := 0; i < expected; i++ { - edc.PopulateCollection(ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}) + edc.PopulateCollection(&ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}) } suite.Equal(expected, edc.Length()) } + func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NextItem() { inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to", "fetch", "a", "pale", "of", "water"} expected := len(inputStrings) / 2 // We are using pairs edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"}) for i := 0; i < expected; i++ { - edc.PopulateCollection(ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}) + edc.PopulateCollection(&ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}) } edc.FinishPopulation() // finished writing - for i := 0; i < 6; i++ { - data, err := edc.NextItem() - assert.Nil(suite.T(), err) - assert.NotNil(suite.T(), data) - } - // Need that EOF - data, err := edc.NextItem() - assert.Nil(suite.T(), data) - assert.NotNil(suite.T(), err) + count := 0 + for data := range edc.Items() { + assert.NotNil(suite.T(), data) + count++ + } + + assert.Equal(suite.T(), expected, count) } diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 7f77d977e..54866f25d 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -6,7 +6,6 @@ import ( "bytes" "context" "fmt" - "io" az "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/alcionai/corso/internal/connector/support" @@ -168,49 +167,54 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dc DataCollection var errs error // must be user.GetId(), PrimaryName no longer works 6-15-2022 user := dc.FullPath()[1] + items := dc.Items() + for { - data, err := dc.NextItem() - if err == io.EOF { - break - } + select { + case <-ctx.Done(): + return support.WrapAndAppend("context cancelled", ctx.Err(), errs) + case data, ok := <-items: + if !ok { + return errs + } - buf := &bytes.Buffer{} - _, err = buf.ReadFrom(data.ToReader()) - if err != nil { - errs = support.WrapAndAppend(data.UUID(), err, errs) - continue - } - message, err := support.CreateMessageFromBytes(buf.Bytes()) - if err != nil { - errs = support.WrapAndAppend(data.UUID(), err, errs) - continue - } - clone := support.ToMessage(message) - address := dc.FullPath()[3] - valueId := "Integer 0x0E07" - enableValue := "4" - sv := models.NewSingleValueLegacyExtendedProperty() - sv.SetId(&valueId) - sv.SetValue(&enableValue) - svlep := []models.SingleValueLegacyExtendedPropertyable{sv} - clone.SetSingleValueExtendedProperties(svlep) - draft := false - clone.SetIsDraft(&draft) - sentMessage, err := gc.client.UsersById(user).MailFoldersById(address).Messages().Post(clone) - if err != nil { - errs = support.WrapAndAppend(data.UUID()+": "+ - support.ConnectorStackErrorTrace(err), err, errs) - continue - // TODO: Add to retry Handler for the for failure - } + buf := &bytes.Buffer{} + _, err := buf.ReadFrom(data.ToReader()) + if err != nil { + errs = support.WrapAndAppend(data.UUID(), err, errs) + continue + } + message, err := support.CreateMessageFromBytes(buf.Bytes()) + if err != nil { + errs = support.WrapAndAppend(data.UUID(), err, errs) + continue + } + clone := support.ToMessage(message) + address := dc.FullPath()[3] + valueId := "Integer 0x0E07" + enableValue := "4" + sv := models.NewSingleValueLegacyExtendedProperty() + sv.SetId(&valueId) + sv.SetValue(&enableValue) + svlep := []models.SingleValueLegacyExtendedPropertyable{sv} + clone.SetSingleValueExtendedProperties(svlep) + draft := false + clone.SetIsDraft(&draft) + sentMessage, err := gc.client.UsersById(user).MailFoldersById(address).Messages().Post(clone) + if err != nil { + errs = support.WrapAndAppend(data.UUID()+": "+ + support.ConnectorStackErrorTrace(err), err, errs) + continue + // TODO: Add to retry Handler for the for failure + } - if sentMessage == nil && err == nil { - errs = support.WrapAndAppend(data.UUID(), errors.New("Message not Sent: Blocked by server"), errs) + if sentMessage == nil && err == nil { + errs = support.WrapAndAppend(data.UUID(), errors.New("Message not Sent: Blocked by server"), errs) + } + // This completes the restore loop for a message.. } - // This completes the restore loop for a message.. } - return errs } // serializeMessages: Temp Function as place Holder until Collections have been added @@ -286,7 +290,7 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([ return true } if byteArray != nil { - edc.PopulateCollection(ExchangeData{id: *message.GetId(), message: byteArray}) + edc.PopulateCollection(&ExchangeData{id: *message.GetId(), message: byteArray}) } return true } diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index 06a0f6549..780f7b04a 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -86,7 +86,7 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_restoreMessages( } ds := ExchangeData{id: "test", message: bytes} edc := NewExchangeDataCollection("tenant", []string{"tenantId", evs[user], mailCategory, "Inbox"}) - edc.PopulateCollection(ds) + edc.PopulateCollection(&ds) edc.FinishPopulation() err = suite.connector.RestoreMessages(context.Background(), &edc) assert.NoError(suite.T(), err) diff --git a/src/internal/connector/mockconnector/mock_data_collection.go b/src/internal/connector/mockconnector/mock_data_collection.go index 780389627..01724606a 100644 --- a/src/internal/connector/mockconnector/mock_data_collection.go +++ b/src/internal/connector/mockconnector/mock_data_collection.go @@ -13,7 +13,6 @@ import ( type MockExchangeDataCollection struct { fullPath []string messageCount int - messagesRead int } var ( @@ -27,7 +26,6 @@ func NewMockExchangeDataCollection(pathRepresentation []string, numMessagesToRet collection := &MockExchangeDataCollection{ fullPath: pathRepresentation, messageCount: numMessagesToReturn, - messagesRead: 0, } return collection } @@ -36,16 +34,22 @@ func (medc *MockExchangeDataCollection) FullPath() []string { return append([]string{}, medc.fullPath...) } -// NextItem returns either the next item in the collection or an error if one occurred. -// If not more items are available in the collection, returns (nil, nil). -func (medc *MockExchangeDataCollection) NextItem() (connector.DataStream, error) { - if medc.messagesRead < medc.messageCount { - medc.messagesRead++ - // We can plug in whatever data we want here (can be an io.Reader to a test data file if needed) - m := []byte("test message") - return &MockExchangeData{uuid.NewString(), io.NopCloser(bytes.NewReader(m))}, nil - } - return nil, io.EOF +// Items returns a channel that has the next items in the collection. The +// channel is closed when there are no more items available. +func (medc *MockExchangeDataCollection) Items() <-chan connector.DataStream { + res := make(chan connector.DataStream) + + go func() { + defer close(res) + + for i := 0; i < medc.messageCount; i++ { + // We can plug in whatever data we want here (can be an io.Reader to a test data file if needed) + m := []byte("test message") + res <- &MockExchangeData{uuid.NewString(), io.NopCloser(bytes.NewReader(m))} + } + }() + + return res } // ExchangeData represents a single item retrieved from exchange diff --git a/src/internal/connector/mockconnector/mock_data_collection_test.go b/src/internal/connector/mockconnector/mock_data_collection_test.go index f62db00c4..3700dda0a 100644 --- a/src/internal/connector/mockconnector/mock_data_collection_test.go +++ b/src/internal/connector/mockconnector/mock_data_collection_test.go @@ -1,7 +1,6 @@ package mockconnector_test import ( - "io" "io/ioutil" "testing" @@ -24,7 +23,7 @@ func (suite *MockExchangeDataCollectionSuite) TestMockExchangeDataCollection() { messagesRead := 0 - for item, err := mdc.NextItem(); err != io.EOF; item, err = mdc.NextItem() { + for item := range mdc.Items() { _, err := ioutil.ReadAll(item.ToReader()) assert.NoError(suite.T(), err) messagesRead++ diff --git a/src/internal/kopia/data_collection.go b/src/internal/kopia/data_collection.go index a682a93af..fba819b3a 100644 --- a/src/internal/kopia/data_collection.go +++ b/src/internal/kopia/data_collection.go @@ -17,13 +17,16 @@ type singleItemCollection struct { used bool } -func (sic *singleItemCollection) NextItem() (connector.DataStream, error) { +func (sic *singleItemCollection) Items() <-chan connector.DataStream { if sic.used { - return nil, io.EOF + return nil } sic.used = true - return sic.stream, nil + res := make(chan connector.DataStream, 1) + res <- sic.stream + close(res) + return res } func (sic singleItemCollection) FullPath() []string { diff --git a/src/internal/kopia/data_collection_test.go b/src/internal/kopia/data_collection_test.go index ce112df74..4376dd363 100644 --- a/src/internal/kopia/data_collection_test.go +++ b/src/internal/kopia/data_collection_test.go @@ -50,15 +50,16 @@ func (suite *SingleItemCollectionUnitSuite) TestReturnsOnlyOneItem() { path: []string{}, } - returnedStream, err := c.NextItem() - require.NoError(t, err) + count := 0 + for returnedStream := range c.Items() { + assert.Equal(t, returnedStream.UUID(), uuid) - assert.Equal(t, returnedStream.UUID(), uuid) + buf, err := ioutil.ReadAll(returnedStream.ToReader()) + require.NoError(t, err) + assert.Equal(t, buf, data) - _, err = c.NextItem() - assert.ErrorIs(t, err, io.EOF) + count++ + } - buf, err := ioutil.ReadAll(returnedStream.ToReader()) - require.NoError(t, err) - assert.Equal(t, buf, data) + assert.Equal(t, 1, count) } diff --git a/src/internal/kopia/kopia.go b/src/internal/kopia/kopia.go index 58cf438ff..82648c69c 100644 --- a/src/internal/kopia/kopia.go +++ b/src/internal/kopia/kopia.go @@ -2,7 +2,6 @@ package kopia import ( "context" - "io" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/virtualfs" @@ -173,19 +172,20 @@ func getStreamItemFunc( collection connector.DataCollection, ) func(context.Context, func(context.Context, fs.Entry) error) error { return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error { + items := collection.Items() for { - e, err := collection.NextItem() - if err != nil { - if err == io.EOF { + select { + case <-ctx.Done(): + return ctx.Err() + case e, ok := <-items: + if !ok { return nil } - return errors.Wrap(err, "materializing directory entry") - } - - entry := virtualfs.StreamingFileFromReader(e.UUID(), e.ToReader()) - if err = cb(ctx, entry); err != nil { - return errors.Wrap(err, "executing callback") + entry := virtualfs.StreamingFileFromReader(e.UUID(), e.ToReader()) + if err := cb(ctx, entry); err != nil { + return errors.Wrap(err, "executing callback") + } } } } diff --git a/src/internal/kopia/kopia_test.go b/src/internal/kopia/kopia_test.go index b887eb46f..634c85e1f 100644 --- a/src/internal/kopia/kopia_test.go +++ b/src/internal/kopia/kopia_test.go @@ -335,15 +335,16 @@ func (suite *KopiaIntegrationSuite) TestBackupAndRestoreSingleItem() { assert.Equal(t, c.FullPath(), testPath) - resultStream, err := c.NextItem() - require.NoError(t, err) + count := 0 + for resultStream := range c.Items() { + buf, err := ioutil.ReadAll(resultStream.ToReader()) + require.NoError(t, err) + assert.Equal(t, buf, testFileData) - _, err = c.NextItem() - assert.ErrorIs(t, err, io.EOF) + count++ + } - buf, err := ioutil.ReadAll(resultStream.ToReader()) - require.NoError(t, err) - assert.Equal(t, buf, testFileData) + assert.Equal(t, 1, count) } // TestBackupAndRestoreSingleItem_Errors exercises the public RestoreSingleItem