Allow multiple items in DataCollection from kopia (#296)

Use a slice to back the data instead of adding directly to the channel
for two reasons (this may change in the future though):
  * kopia loads all data about a directory at the same time
  * consumers of the DataCollection may not pull items from the channel
    at a fast rate, which could block adding to the channel. This could
    lead to delays in discovering other directories to traverse in
    multi-threaded scenarios
This commit is contained in:
ashmrtn 2022-07-07 14:54:46 -07:00 committed by GitHub
parent 10f112452a
commit 1143a33ce6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 92 additions and 53 deletions

View File

@ -6,31 +6,29 @@ import (
"github.com/alcionai/corso/internal/connector"
)
var _ connector.DataCollection = &singleItemCollection{}
var _ connector.DataCollection = &kopiaDataCollection{}
var _ connector.DataStream = &kopiaDataStream{}
// singleItemCollection implements DataCollection but only returns a single
// DataStream. It is not safe for concurrent use.
type singleItemCollection struct {
path []string
stream connector.DataStream
used bool
type kopiaDataCollection struct {
path []string
streams []connector.DataStream
}
func (sic *singleItemCollection) Items() <-chan connector.DataStream {
if sic.used {
return nil
}
func (kdc *kopiaDataCollection) Items() <-chan connector.DataStream {
res := make(chan connector.DataStream)
go func() {
defer close(res)
for _, s := range kdc.streams {
res <- s
}
}()
sic.used = true
res := make(chan connector.DataStream, 1)
res <- sic.stream
close(res)
return res
}
func (sic singleItemCollection) FullPath() []string {
return append([]string{}, sic.path...)
func (kdc kopiaDataCollection) FullPath() []string {
return append([]string{}, kdc.path...)
}
type kopiaDataStream struct {

View File

@ -9,57 +9,94 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/connector"
)
// ---------------
// unit tests
// ---------------
type SingleItemCollectionUnitSuite struct {
type KopiaDataCollectionUnitSuite struct {
suite.Suite
}
func TestSingleItemCollectionUnitSuite(t *testing.T) {
suite.Run(t, new(SingleItemCollectionUnitSuite))
func TestKopiaDataCollectionUnitSuite(t *testing.T) {
suite.Run(t, new(KopiaDataCollectionUnitSuite))
}
func (suite *SingleItemCollectionUnitSuite) TestReturnsPath() {
func (suite *KopiaDataCollectionUnitSuite) TestReturnsPath() {
t := suite.T()
path := []string{"some", "path", "for", "data"}
c := singleItemCollection{
stream: kopiaDataStream{},
path: path,
c := kopiaDataCollection{
streams: []connector.DataStream{},
path: path,
}
assert.Equal(t, c.FullPath(), path)
}
func (suite *SingleItemCollectionUnitSuite) TestReturnsOnlyOneItem() {
t := suite.T()
data := []byte("abcdefghijklmnopqrstuvwxyz")
uuid := "a-file"
stream := &kopiaDataStream{
reader: io.NopCloser(bytes.NewReader(data)),
uuid: uuid,
func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() {
data := [][]byte{
[]byte("abcdefghijklmnopqrstuvwxyz"),
[]byte("zyxwvutsrqponmlkjihgfedcba"),
}
c := singleItemCollection{
stream: stream,
path: []string{},
uuids := []string{
"a-file",
"another-file",
}
count := 0
for returnedStream := range c.Items() {
assert.Equal(t, returnedStream.UUID(), uuid)
buf, err := ioutil.ReadAll(returnedStream.ToReader())
require.NoError(t, err)
assert.Equal(t, buf, data)
count++
table := []struct {
name string
streams []connector.DataStream
}{
{
name: "SingleStream",
streams: []connector.DataStream{
&kopiaDataStream{
reader: io.NopCloser(bytes.NewReader(data[0])),
uuid: uuids[0],
},
},
},
{
name: "MultipleStreams",
streams: []connector.DataStream{
&kopiaDataStream{
reader: io.NopCloser(bytes.NewReader(data[0])),
uuid: uuids[0],
},
&kopiaDataStream{
reader: io.NopCloser(bytes.NewReader(data[1])),
uuid: uuids[1],
},
},
},
}
assert.Equal(t, 1, count)
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
c := kopiaDataCollection{
streams: test.streams,
path: []string{},
}
count := 0
for returnedStream := range c.Items() {
require.Less(t, count, len(test.streams))
assert.Equal(t, returnedStream.UUID(), uuids[count])
buf, err := ioutil.ReadAll(returnedStream.ToReader())
require.NoError(t, err)
assert.Equal(t, buf, data[count])
count++
}
assert.Equal(t, len(test.streams), count)
})
}
}

View File

@ -329,10 +329,12 @@ func (w Wrapper) restoreSingleItem(
pathWithRoot := []string{rootDir.Name()}
pathWithRoot = append(pathWithRoot, itemPath[:len(itemPath)-1]...)
return &singleItemCollection{
stream: kopiaDataStream{
uuid: itemPath[len(itemPath)-1],
reader: r,
return &kopiaDataCollection{
streams: []connector.DataStream{
&kopiaDataStream{
uuid: f.Name(),
reader: r,
},
},
path: pathWithRoot,
}, nil

View File

@ -302,11 +302,13 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
suite.w = &Wrapper{c}
collections := []connector.DataCollection{
&singleItemCollection{
&kopiaDataCollection{
path: testPath,
stream: &kopiaDataStream{
uuid: testFileUUID,
reader: io.NopCloser(bytes.NewReader(testFileData)),
streams: []connector.DataStream{
&kopiaDataStream{
uuid: testFileUUID,
reader: io.NopCloser(bytes.NewReader(testFileData)),
},
},
},
}