Switch DataCollection to return a channel (#244)

* Change DataCollection to return channel directly

Precursor to restoring multiple items from kopia. Allows one to keep a
DataCollection open until all items are processed without blocking
consumers of the DataCollection (they can use a select-block if needed).

* Update tests for new DataCollection interface

* Handle context cancellation with DataCollection
This commit is contained in:
ashmrtn 2022-06-28 12:48:24 -07:00 committed by GitHub
parent 0707d00ab5
commit 1c6f0994e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 116 additions and 113 deletions

View File

@ -14,9 +14,11 @@ const (
// A DataCollection represents a collection of data of the // A DataCollection represents a collection of data of the
// same type (e.g. mail) // same type (e.g. mail)
type DataCollection interface { type DataCollection interface {
// Returns either the next item in the collection or an error if one occurred. // Items returns a channel from which items in the collection can be read.
// If not more items are available in the collection, returns (nil, nil). // Each returned struct contains the next item in the collection
NextItem() (DataStream, error) // The channel is closed when there are no more items in the collection or if
// an unrecoverable error caused an early termination in the sender.
Items() <-chan DataStream
// FullPath returns a slice of strings that act as metadata tags for this // FullPath returns a slice of strings that act as metadata tags for this
// DataCollection. Returned items should be ordered from most generic to least // DataCollection. Returned items should be ordered from most generic to least
// generic. For example, a DataCollection for emails from a specific user // generic. For example, a DataCollection for emails from a specific user
@ -27,9 +29,9 @@ type DataCollection interface {
// DataStream represents a single item within a DataCollection // DataStream represents a single item within a DataCollection
// that can be consumed as a stream (it embeds io.Reader) // that can be consumed as a stream (it embeds io.Reader)
type DataStream interface { type DataStream interface {
// Returns an io.Reader for the DataStream // ToReader returns an io.Reader for the DataStream
ToReader() io.ReadCloser ToReader() io.ReadCloser
// Provides a unique identifier for this data // UUID provides a unique identifier for this data
UUID() string UUID() string
} }
@ -40,12 +42,8 @@ type DataStream interface {
type ExchangeDataCollection struct { type ExchangeDataCollection struct {
// M365 user // M365 user
user string user string
// TODO: We would want to replace this with a channel so that we data chan DataStream
// don't need to wait for all data to be retrieved before reading it out
data chan ExchangeData
// FullPath is the slice representation of the action context passed down through the hierarchy. // FullPath is the slice representation of the action context passed down through the hierarchy.
//The original request can be gleaned from the slice. (e.g. {<tenant ID>, <user ID>, "emails"}) //The original request can be gleaned from the slice. (e.g. {<tenant ID>, <user ID>, "emails"})
fullPath []string fullPath []string
} }
@ -54,13 +52,13 @@ type ExchangeDataCollection struct {
func NewExchangeDataCollection(aUser string, pathRepresentation []string) ExchangeDataCollection { func NewExchangeDataCollection(aUser string, pathRepresentation []string) ExchangeDataCollection {
collection := ExchangeDataCollection{ collection := ExchangeDataCollection{
user: aUser, user: aUser,
data: make(chan ExchangeData, collectionChannelBufferSize), data: make(chan DataStream, collectionChannelBufferSize),
fullPath: pathRepresentation, fullPath: pathRepresentation,
} }
return collection return collection
} }
func (edc *ExchangeDataCollection) PopulateCollection(newData ExchangeData) { func (edc *ExchangeDataCollection) PopulateCollection(newData *ExchangeData) {
edc.data <- newData edc.data <- newData
} }
@ -76,14 +74,8 @@ func (edc *ExchangeDataCollection) Length() int {
return len(edc.data) return len(edc.data)
} }
// NextItem returns either the next item in the collection or an error if one occurred. func (edc *ExchangeDataCollection) Items() <-chan DataStream {
// If not more items are available in the collection, returns (nil, nil). return edc.data
func (edc *ExchangeDataCollection) NextItem() (DataStream, error) {
item, ok := <-edc.data
if !ok {
return nil, io.EOF
}
return &item, nil
} }
func (edc *ExchangeDataCollection) FullPath() []string { func (edc *ExchangeDataCollection) FullPath() []string {

View File

@ -61,27 +61,26 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCol
expected := len(inputStrings) / 2 // We are using pairs expected := len(inputStrings) / 2 // We are using pairs
edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"}) edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"})
for i := 0; i < expected; i++ { for i := 0; i < expected; i++ {
edc.PopulateCollection(ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}) edc.PopulateCollection(&ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])})
} }
suite.Equal(expected, edc.Length()) suite.Equal(expected, edc.Length())
} }
func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NextItem() { func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NextItem() {
inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to", inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to",
"fetch", "a", "pale", "of", "water"} "fetch", "a", "pale", "of", "water"}
expected := len(inputStrings) / 2 // We are using pairs expected := len(inputStrings) / 2 // We are using pairs
edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"}) edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"})
for i := 0; i < expected; i++ { for i := 0; i < expected; i++ {
edc.PopulateCollection(ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])}) edc.PopulateCollection(&ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])})
} }
edc.FinishPopulation() // finished writing edc.FinishPopulation() // finished writing
for i := 0; i < 6; i++ {
data, err := edc.NextItem()
assert.Nil(suite.T(), err)
assert.NotNil(suite.T(), data)
}
// Need that EOF
data, err := edc.NextItem()
assert.Nil(suite.T(), data)
assert.NotNil(suite.T(), err)
count := 0
for data := range edc.Items() {
assert.NotNil(suite.T(), data)
count++
}
assert.Equal(suite.T(), expected, count)
} }

View File

@ -6,7 +6,6 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io"
az "github.com/Azure/azure-sdk-for-go/sdk/azidentity" az "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/connector/support"
@ -168,14 +167,19 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dc DataCollection
var errs error var errs error
// must be user.GetId(), PrimaryName no longer works 6-15-2022 // must be user.GetId(), PrimaryName no longer works 6-15-2022
user := dc.FullPath()[1] user := dc.FullPath()[1]
items := dc.Items()
for { for {
data, err := dc.NextItem() select {
if err == io.EOF { case <-ctx.Done():
break return support.WrapAndAppend("context cancelled", ctx.Err(), errs)
case data, ok := <-items:
if !ok {
return errs
} }
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
_, err = buf.ReadFrom(data.ToReader()) _, err := buf.ReadFrom(data.ToReader())
if err != nil { if err != nil {
errs = support.WrapAndAppend(data.UUID(), err, errs) errs = support.WrapAndAppend(data.UUID(), err, errs)
continue continue
@ -210,7 +214,7 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dc DataCollection
} }
// This completes the restore loop for a message.. // This completes the restore loop for a message..
} }
return errs }
} }
// serializeMessages: Temp Function as place Holder until Collections have been added // serializeMessages: Temp Function as place Holder until Collections have been added
@ -286,7 +290,7 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([
return true return true
} }
if byteArray != nil { if byteArray != nil {
edc.PopulateCollection(ExchangeData{id: *message.GetId(), message: byteArray}) edc.PopulateCollection(&ExchangeData{id: *message.GetId(), message: byteArray})
} }
return true return true
} }

View File

@ -86,7 +86,7 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_restoreMessages(
} }
ds := ExchangeData{id: "test", message: bytes} ds := ExchangeData{id: "test", message: bytes}
edc := NewExchangeDataCollection("tenant", []string{"tenantId", evs[user], mailCategory, "Inbox"}) edc := NewExchangeDataCollection("tenant", []string{"tenantId", evs[user], mailCategory, "Inbox"})
edc.PopulateCollection(ds) edc.PopulateCollection(&ds)
edc.FinishPopulation() edc.FinishPopulation()
err = suite.connector.RestoreMessages(context.Background(), &edc) err = suite.connector.RestoreMessages(context.Background(), &edc)
assert.NoError(suite.T(), err) assert.NoError(suite.T(), err)

View File

@ -13,7 +13,6 @@ import (
type MockExchangeDataCollection struct { type MockExchangeDataCollection struct {
fullPath []string fullPath []string
messageCount int messageCount int
messagesRead int
} }
var ( var (
@ -27,7 +26,6 @@ func NewMockExchangeDataCollection(pathRepresentation []string, numMessagesToRet
collection := &MockExchangeDataCollection{ collection := &MockExchangeDataCollection{
fullPath: pathRepresentation, fullPath: pathRepresentation,
messageCount: numMessagesToReturn, messageCount: numMessagesToReturn,
messagesRead: 0,
} }
return collection return collection
} }
@ -36,16 +34,22 @@ func (medc *MockExchangeDataCollection) FullPath() []string {
return append([]string{}, medc.fullPath...) return append([]string{}, medc.fullPath...)
} }
// NextItem returns either the next item in the collection or an error if one occurred. // Items returns a channel that has the next items in the collection. The
// If not more items are available in the collection, returns (nil, nil). // channel is closed when there are no more items available.
func (medc *MockExchangeDataCollection) NextItem() (connector.DataStream, error) { func (medc *MockExchangeDataCollection) Items() <-chan connector.DataStream {
if medc.messagesRead < medc.messageCount { res := make(chan connector.DataStream)
medc.messagesRead++
go func() {
defer close(res)
for i := 0; i < medc.messageCount; i++ {
// We can plug in whatever data we want here (can be an io.Reader to a test data file if needed) // We can plug in whatever data we want here (can be an io.Reader to a test data file if needed)
m := []byte("test message") m := []byte("test message")
return &MockExchangeData{uuid.NewString(), io.NopCloser(bytes.NewReader(m))}, nil res <- &MockExchangeData{uuid.NewString(), io.NopCloser(bytes.NewReader(m))}
} }
return nil, io.EOF }()
return res
} }
// ExchangeData represents a single item retrieved from exchange // ExchangeData represents a single item retrieved from exchange

View File

@ -1,7 +1,6 @@
package mockconnector_test package mockconnector_test
import ( import (
"io"
"io/ioutil" "io/ioutil"
"testing" "testing"
@ -24,7 +23,7 @@ func (suite *MockExchangeDataCollectionSuite) TestMockExchangeDataCollection() {
messagesRead := 0 messagesRead := 0
for item, err := mdc.NextItem(); err != io.EOF; item, err = mdc.NextItem() { for item := range mdc.Items() {
_, err := ioutil.ReadAll(item.ToReader()) _, err := ioutil.ReadAll(item.ToReader())
assert.NoError(suite.T(), err) assert.NoError(suite.T(), err)
messagesRead++ messagesRead++

View File

@ -17,13 +17,16 @@ type singleItemCollection struct {
used bool used bool
} }
func (sic *singleItemCollection) NextItem() (connector.DataStream, error) { func (sic *singleItemCollection) Items() <-chan connector.DataStream {
if sic.used { if sic.used {
return nil, io.EOF return nil
} }
sic.used = true sic.used = true
return sic.stream, nil res := make(chan connector.DataStream, 1)
res <- sic.stream
close(res)
return res
} }
func (sic singleItemCollection) FullPath() []string { func (sic singleItemCollection) FullPath() []string {

View File

@ -50,15 +50,16 @@ func (suite *SingleItemCollectionUnitSuite) TestReturnsOnlyOneItem() {
path: []string{}, path: []string{},
} }
returnedStream, err := c.NextItem() count := 0
require.NoError(t, err) for returnedStream := range c.Items() {
assert.Equal(t, returnedStream.UUID(), uuid) assert.Equal(t, returnedStream.UUID(), uuid)
_, err = c.NextItem()
assert.ErrorIs(t, err, io.EOF)
buf, err := ioutil.ReadAll(returnedStream.ToReader()) buf, err := ioutil.ReadAll(returnedStream.ToReader())
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, buf, data) assert.Equal(t, buf, data)
count++
}
assert.Equal(t, 1, count)
} }

View File

@ -2,7 +2,6 @@ package kopia
import ( import (
"context" "context"
"io"
"github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs" "github.com/kopia/kopia/fs/virtualfs"
@ -173,23 +172,24 @@ func getStreamItemFunc(
collection connector.DataCollection, collection connector.DataCollection,
) func(context.Context, func(context.Context, fs.Entry) error) error { ) func(context.Context, func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error { return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
items := collection.Items()
for { for {
e, err := collection.NextItem() select {
if err != nil { case <-ctx.Done():
if err == io.EOF { return ctx.Err()
case e, ok := <-items:
if !ok {
return nil return nil
} }
return errors.Wrap(err, "materializing directory entry")
}
entry := virtualfs.StreamingFileFromReader(e.UUID(), e.ToReader()) entry := virtualfs.StreamingFileFromReader(e.UUID(), e.ToReader())
if err = cb(ctx, entry); err != nil { if err := cb(ctx, entry); err != nil {
return errors.Wrap(err, "executing callback") return errors.Wrap(err, "executing callback")
} }
} }
} }
} }
}
// buildKopiaDirs recursively builds a directory hierarchy from the roots up. // buildKopiaDirs recursively builds a directory hierarchy from the roots up.
// Returned directories are either virtualfs.StreamingDirectory or // Returned directories are either virtualfs.StreamingDirectory or

View File

@ -335,15 +335,16 @@ func (suite *KopiaIntegrationSuite) TestBackupAndRestoreSingleItem() {
assert.Equal(t, c.FullPath(), testPath) assert.Equal(t, c.FullPath(), testPath)
resultStream, err := c.NextItem() count := 0
require.NoError(t, err) for resultStream := range c.Items() {
_, err = c.NextItem()
assert.ErrorIs(t, err, io.EOF)
buf, err := ioutil.ReadAll(resultStream.ToReader()) buf, err := ioutil.ReadAll(resultStream.ToReader())
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, buf, testFileData) assert.Equal(t, buf, testFileData)
count++
}
assert.Equal(t, 1, count)
} }
// TestBackupAndRestoreSingleItem_Errors exercises the public RestoreSingleItem // TestBackupAndRestoreSingleItem_Errors exercises the public RestoreSingleItem