From 1143a33ce682b0c838968ae9490c30f552ad167f Mon Sep 17 00:00:00 2001 From: ashmrtn <3891298+ashmrtn@users.noreply.github.com> Date: Thu, 7 Jul 2022 14:54:46 -0700 Subject: [PATCH] Allow multiple items in DataCollection from kopia (#296) Use a slice to back the data instead of adding directly to the channel for two reasons (this may change in the future though): * kopia loads all data about a directory at the same time * consumers of the DataCollection may not pull items from the channel at a fast rate, which could block adding to the channel. This could lead to delays in discovering other directories to traverse in multi-threaded scenarios --- src/internal/kopia/data_collection.go | 32 ++++---- src/internal/kopia/data_collection_test.go | 93 +++++++++++++++------- src/internal/kopia/wrapper.go | 10 ++- src/internal/kopia/wrapper_test.go | 10 ++- 4 files changed, 92 insertions(+), 53 deletions(-) diff --git a/src/internal/kopia/data_collection.go b/src/internal/kopia/data_collection.go index fba819b3a..d130cabcb 100644 --- a/src/internal/kopia/data_collection.go +++ b/src/internal/kopia/data_collection.go @@ -6,31 +6,29 @@ import ( "github.com/alcionai/corso/internal/connector" ) -var _ connector.DataCollection = &singleItemCollection{} +var _ connector.DataCollection = &kopiaDataCollection{} var _ connector.DataStream = &kopiaDataStream{} -// singleItemCollection implements DataCollection but only returns a single -// DataStream. It is not safe for concurrent use. -type singleItemCollection struct { - path []string - stream connector.DataStream - used bool +type kopiaDataCollection struct { + path []string + streams []connector.DataStream } -func (sic *singleItemCollection) Items() <-chan connector.DataStream { - if sic.used { - return nil - } +func (kdc *kopiaDataCollection) Items() <-chan connector.DataStream { + res := make(chan connector.DataStream) + go func() { + defer close(res) + + for _, s := range kdc.streams { + res <- s + } + }() - sic.used = true - res := make(chan connector.DataStream, 1) - res <- sic.stream - close(res) return res } -func (sic singleItemCollection) FullPath() []string { - return append([]string{}, sic.path...) +func (kdc kopiaDataCollection) FullPath() []string { + return append([]string{}, kdc.path...) } type kopiaDataStream struct { diff --git a/src/internal/kopia/data_collection_test.go b/src/internal/kopia/data_collection_test.go index 4376dd363..9701c9927 100644 --- a/src/internal/kopia/data_collection_test.go +++ b/src/internal/kopia/data_collection_test.go @@ -9,57 +9,94 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/internal/connector" ) // --------------- // unit tests // --------------- -type SingleItemCollectionUnitSuite struct { +type KopiaDataCollectionUnitSuite struct { suite.Suite } -func TestSingleItemCollectionUnitSuite(t *testing.T) { - suite.Run(t, new(SingleItemCollectionUnitSuite)) +func TestKopiaDataCollectionUnitSuite(t *testing.T) { + suite.Run(t, new(KopiaDataCollectionUnitSuite)) } -func (suite *SingleItemCollectionUnitSuite) TestReturnsPath() { +func (suite *KopiaDataCollectionUnitSuite) TestReturnsPath() { t := suite.T() path := []string{"some", "path", "for", "data"} - c := singleItemCollection{ - stream: kopiaDataStream{}, - path: path, + c := kopiaDataCollection{ + streams: []connector.DataStream{}, + path: path, } assert.Equal(t, c.FullPath(), path) } -func (suite *SingleItemCollectionUnitSuite) TestReturnsOnlyOneItem() { - t := suite.T() - - data := []byte("abcdefghijklmnopqrstuvwxyz") - uuid := "a-file" - stream := &kopiaDataStream{ - reader: io.NopCloser(bytes.NewReader(data)), - uuid: uuid, +func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() { + data := [][]byte{ + []byte("abcdefghijklmnopqrstuvwxyz"), + []byte("zyxwvutsrqponmlkjihgfedcba"), } - c := singleItemCollection{ - stream: stream, - path: []string{}, + uuids := []string{ + "a-file", + "another-file", } - count := 0 - for returnedStream := range c.Items() { - assert.Equal(t, returnedStream.UUID(), uuid) - - buf, err := ioutil.ReadAll(returnedStream.ToReader()) - require.NoError(t, err) - assert.Equal(t, buf, data) - - count++ + table := []struct { + name string + streams []connector.DataStream + }{ + { + name: "SingleStream", + streams: []connector.DataStream{ + &kopiaDataStream{ + reader: io.NopCloser(bytes.NewReader(data[0])), + uuid: uuids[0], + }, + }, + }, + { + name: "MultipleStreams", + streams: []connector.DataStream{ + &kopiaDataStream{ + reader: io.NopCloser(bytes.NewReader(data[0])), + uuid: uuids[0], + }, + &kopiaDataStream{ + reader: io.NopCloser(bytes.NewReader(data[1])), + uuid: uuids[1], + }, + }, + }, } - assert.Equal(t, 1, count) + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + c := kopiaDataCollection{ + streams: test.streams, + path: []string{}, + } + + count := 0 + for returnedStream := range c.Items() { + require.Less(t, count, len(test.streams)) + + assert.Equal(t, returnedStream.UUID(), uuids[count]) + + buf, err := ioutil.ReadAll(returnedStream.ToReader()) + require.NoError(t, err) + assert.Equal(t, buf, data[count]) + + count++ + } + + assert.Equal(t, len(test.streams), count) + }) + } } diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 6a8bc74a2..3fb28fc4f 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -329,10 +329,12 @@ func (w Wrapper) restoreSingleItem( pathWithRoot := []string{rootDir.Name()} pathWithRoot = append(pathWithRoot, itemPath[:len(itemPath)-1]...) - return &singleItemCollection{ - stream: kopiaDataStream{ - uuid: itemPath[len(itemPath)-1], - reader: r, + return &kopiaDataCollection{ + streams: []connector.DataStream{ + &kopiaDataStream{ + uuid: f.Name(), + reader: r, + }, }, path: pathWithRoot, }, nil diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 2ac7eaf65..8af9665e5 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -302,11 +302,13 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { suite.w = &Wrapper{c} collections := []connector.DataCollection{ - &singleItemCollection{ + &kopiaDataCollection{ path: testPath, - stream: &kopiaDataStream{ - uuid: testFileUUID, - reader: io.NopCloser(bytes.NewReader(testFileData)), + streams: []connector.DataStream{ + &kopiaDataStream{ + uuid: testFileUUID, + reader: io.NopCloser(bytes.NewReader(testFileData)), + }, }, }, }