From 34a7a1a80c5e633a343ab402cf3c1e6ec53c653e Mon Sep 17 00:00:00 2001 From: Danny Date: Wed, 27 Jul 2022 12:04:31 -0400 Subject: [PATCH] Data Collection --> Collection refactor (#415) DataCollection changed to Collection in the repository. All associated imports changed to reflect the change. --- .../connector/exchange_data_collection.go | 12 +++---- .../exchange_data_collection_test.go | 22 ++++++++++++- src/internal/connector/graph_connector.go | 8 ++--- .../connector/graph_connector_test.go | 4 +-- .../mockconnector/mock_data_collection.go | 10 +++--- src/internal/data/data_collection.go | 12 +++---- src/internal/data/data_collection_test.go | 4 +-- src/internal/kopia/data_collection.go | 10 +++--- src/internal/kopia/data_collection_test.go | 8 ++--- src/internal/kopia/wrapper.go | 32 +++++++++---------- src/internal/kopia/wrapper_test.go | 24 +++++++------- src/internal/operations/backup.go | 2 +- src/internal/operations/restore.go | 2 +- src/internal/operations/restore_test.go | 2 +- 14 files changed, 86 insertions(+), 66 deletions(-) diff --git a/src/internal/connector/exchange_data_collection.go b/src/internal/connector/exchange_data_collection.go index ad52d019f..ebe3d42a7 100644 --- a/src/internal/connector/exchange_data_collection.go +++ b/src/internal/connector/exchange_data_collection.go @@ -9,9 +9,9 @@ import ( "github.com/alcionai/corso/pkg/backup/details" ) -var _ data.DataCollection = &ExchangeDataCollection{} -var _ data.DataStream = &ExchangeData{} -var _ data.DataStreamInfo = &ExchangeData{} +var _ data.Collection = &ExchangeDataCollection{} +var _ data.Stream = &ExchangeData{} +var _ data.StreamInfo = &ExchangeData{} const ( collectionChannelBufferSize = 120 @@ -24,7 +24,7 @@ const ( type ExchangeDataCollection struct { // M365 user user string - data chan data.DataStream + data chan data.Stream tasks []string updateCh chan support.ConnectorOperationStatus service graphService @@ -39,7 +39,7 @@ type ExchangeDataCollection struct { func NewExchangeDataCollection(aUser string, pathRepresentation []string) ExchangeDataCollection { collection := ExchangeDataCollection{ user: aUser, - data: make(chan data.DataStream, collectionChannelBufferSize), + data: make(chan data.Stream, collectionChannelBufferSize), fullPath: pathRepresentation, } return collection @@ -57,7 +57,7 @@ func (edc *ExchangeDataCollection) FinishPopulation() { } } -func (edc *ExchangeDataCollection) Items() <-chan data.DataStream { +func (edc *ExchangeDataCollection) Items() <-chan data.Stream { return edc.data } diff --git a/src/internal/connector/exchange_data_collection_test.go b/src/internal/connector/exchange_data_collection_test.go index a634efbf9..70587d93e 100644 --- a/src/internal/connector/exchange_data_collection_test.go +++ b/src/internal/connector/exchange_data_collection_test.go @@ -56,7 +56,7 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NewExchange func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCollection() { inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to", - "fetch", "a", "pale", "of", "water"} + "fetch", "a", "pail", "of", "water"} expected := len(inputStrings) / 2 // We are using pairs edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"}) for i := 0; i < expected; i++ { @@ -64,3 +64,23 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCol } suite.Equal(expected, len(edc.data)) } + +func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_Items() { + inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to", + "fetch", "a", "pail", "of", "water"} + expected := len(inputStrings) / 2 // We are using pairs + edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"}) + for i := 0; i < expected; i++ { + edc.data <- &ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])} + } + close(edc.data) + suite.Equal(expected, len(edc.data)) + streams := edc.Items() + suite.Equal(expected, len(streams)) + count := 0 + for item := range streams { + assert.NotNil(suite.T(), item) + count++ + } + suite.Equal(count, expected) +} diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 1cf8645da..4b53954c0 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -191,13 +191,13 @@ func buildFromMap(isKey bool, mapping map[string]string) []string { // use to read mailbox data out for the specified user // Assumption: User exists // Add iota to this call -> mail, contacts, calendar, etc. -func (gc *GraphConnector) ExchangeDataCollection(ctx context.Context, selector selectors.Selector) ([]data.DataCollection, error) { +func (gc *GraphConnector) ExchangeDataCollection(ctx context.Context, selector selectors.Selector) ([]data.Collection, error) { eb, err := selector.ToExchangeBackup() if err != nil { return nil, errors.Wrap(err, "collecting exchange data") } - collections := []data.DataCollection{} + collections := []data.Collection{} scopes := eb.Scopes() var errs error @@ -236,7 +236,7 @@ func (gc *GraphConnector) ExchangeDataCollection(ctx context.Context, selector s // RestoreMessages: Utility function to connect to M365 backstore // and upload messages from DataCollection. // FullPath: tenantId, userId, , FolderId -func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []data.DataCollection) error { +func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []data.Collection) error { var ( pathCounter = map[string]bool{} attempts, successes int @@ -422,7 +422,7 @@ func messageToDataCollection( client *msgraphsdk.GraphServiceClient, ctx context.Context, objectWriter *kw.JsonSerializationWriter, - dataChannel chan<- data.DataStream, + dataChannel chan<- data.Stream, message models.Messageable, user string, ) error { diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index a3783c9e4..f2c965767 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -89,7 +89,7 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_restoreMessages( edc := NewExchangeDataCollection("tenant", []string{"tenantId", evs[user], mailCategory, "Inbox"}) edc.PopulateCollection(&ds) edc.FinishPopulation() - err = suite.connector.RestoreMessages(context.Background(), []data.DataCollection{&edc}) + err = suite.connector.RestoreMessages(context.Background(), []data.Collection{&edc}) assert.NoError(suite.T(), err) } @@ -173,7 +173,7 @@ func (suite *DisconnectedGraphConnectorSuite) TestBuild() { } func (suite *DisconnectedGraphConnectorSuite) TestInterfaceAlignment() { - var dc data.DataCollection + var dc data.Collection concrete := NewExchangeDataCollection("Check", []string{"interface", "works"}) dc = &concrete assert.NotNil(suite.T(), dc) diff --git a/src/internal/connector/mockconnector/mock_data_collection.go b/src/internal/connector/mockconnector/mock_data_collection.go index a7d1df15d..a2b3e9833 100644 --- a/src/internal/connector/mockconnector/mock_data_collection.go +++ b/src/internal/connector/mockconnector/mock_data_collection.go @@ -20,9 +20,9 @@ type MockExchangeDataCollection struct { } var ( - _ data.DataCollection = &MockExchangeDataCollection{} - _ data.DataStream = &MockExchangeData{} - _ data.DataStreamInfo = &MockExchangeData{} + _ data.Collection = &MockExchangeDataCollection{} + _ data.Stream = &MockExchangeData{} + _ data.StreamInfo = &MockExchangeData{} ) // NewMockExchangeDataCollection creates an data collection that will return the specified number of @@ -49,8 +49,8 @@ func (medc *MockExchangeDataCollection) FullPath() []string { // Items returns a channel that has the next items in the collection. The // channel is closed when there are no more items available. -func (medc *MockExchangeDataCollection) Items() <-chan data.DataStream { - res := make(chan data.DataStream) +func (medc *MockExchangeDataCollection) Items() <-chan data.Stream { + res := make(chan data.Stream) go func() { defer close(res) diff --git a/src/internal/data/data_collection.go b/src/internal/data/data_collection.go index 403b5a09a..7c143b17e 100644 --- a/src/internal/data/data_collection.go +++ b/src/internal/data/data_collection.go @@ -6,14 +6,14 @@ import ( "github.com/alcionai/corso/pkg/backup/details" ) -// A DataCollection represents a collection of data of the -// same type (e.g. mail) -type DataCollection interface { +// A Collection represents a compilation of data from the +// same type application (e.g. mail) +type Collection interface { // Items returns a channel from which items in the collection can be read. // Each returned struct contains the next item in the collection // The channel is closed when there are no more items in the collection or if // an unrecoverable error caused an early termination in the sender. - Items() <-chan DataStream + Items() <-chan Stream // FullPath returns a slice of strings that act as metadata tags for this // DataCollection. Returned items should be ordered from most generic to least // generic. For example, a DataCollection for emails from a specific user @@ -23,7 +23,7 @@ type DataCollection interface { // DataStream represents a single item within a DataCollection // that can be consumed as a stream (it embeds io.Reader) -type DataStream interface { +type Stream interface { // ToReader returns an io.Reader for the DataStream ToReader() io.ReadCloser // UUID provides a unique identifier for this data @@ -32,6 +32,6 @@ type DataStream interface { // DataStreamInfo is used to provide service specific // information about the DataStream -type DataStreamInfo interface { +type StreamInfo interface { Info() details.ItemInfo } diff --git a/src/internal/data/data_collection_test.go b/src/internal/data/data_collection_test.go index 064223f3d..d982a1658 100644 --- a/src/internal/data/data_collection_test.go +++ b/src/internal/data/data_collection_test.go @@ -6,10 +6,10 @@ import ( "github.com/stretchr/testify/suite" ) -type DataCollectionSuite struct { +type CollectionSuite struct { suite.Suite } func TestDataCollectionSuite(t *testing.T) { - suite.Run(t, new(DataCollectionSuite)) + suite.Run(t, new(CollectionSuite)) } diff --git a/src/internal/kopia/data_collection.go b/src/internal/kopia/data_collection.go index 2bf48974e..7448eb0f7 100644 --- a/src/internal/kopia/data_collection.go +++ b/src/internal/kopia/data_collection.go @@ -6,16 +6,16 @@ import ( "github.com/alcionai/corso/internal/data" ) -var _ data.DataCollection = &kopiaDataCollection{} -var _ data.DataStream = &kopiaDataStream{} +var _ data.Collection = &kopiaDataCollection{} +var _ data.Stream = &kopiaDataStream{} type kopiaDataCollection struct { path []string - streams []data.DataStream + streams []data.Stream } -func (kdc *kopiaDataCollection) Items() <-chan data.DataStream { - res := make(chan data.DataStream) +func (kdc *kopiaDataCollection) Items() <-chan data.Stream { + res := make(chan data.Stream) go func() { defer close(res) diff --git a/src/internal/kopia/data_collection_test.go b/src/internal/kopia/data_collection_test.go index e067ccb71..3633c41f8 100644 --- a/src/internal/kopia/data_collection_test.go +++ b/src/internal/kopia/data_collection_test.go @@ -30,7 +30,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsPath() { path := []string{"some", "path", "for", "data"} c := kopiaDataCollection{ - streams: []data.DataStream{}, + streams: []data.Stream{}, path: path, } @@ -50,11 +50,11 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() { table := []struct { name string - streams []data.DataStream + streams []data.Stream }{ { name: "SingleStream", - streams: []data.DataStream{ + streams: []data.Stream{ &kopiaDataStream{ reader: io.NopCloser(bytes.NewReader(testData[0])), uuid: uuids[0], @@ -63,7 +63,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() { }, { name: "MultipleStreams", - streams: []data.DataStream{ + streams: []data.Stream{ &kopiaDataStream{ reader: io.NopCloser(bytes.NewReader(testData[0])), uuid: uuids[0], diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index ae595ecbe..18c4f4447 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -80,7 +80,7 @@ func (w *Wrapper) Close(ctx context.Context) error { // kopia callbacks on directory entries. It binds the directory to the given // DataCollection. func getStreamItemFunc( - collection data.DataCollection, + collection data.Collection, details *details.Details, ) func(context.Context, func(context.Context, fs.Entry) error) error { return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error { @@ -93,7 +93,7 @@ func getStreamItemFunc( if !ok { return nil } - ei, ok := e.(data.DataStreamInfo) + ei, ok := e.(data.StreamInfo) if !ok { return errors.New("item does not implement DataStreamInfo") } @@ -143,7 +143,7 @@ func buildKopiaDirs(dirName string, dir *treeMap, details *details.Details) (fs. type treeMap struct { childDirs map[string]*treeMap - collection data.DataCollection + collection data.Collection } func newTreeMap() *treeMap { @@ -156,7 +156,7 @@ func newTreeMap() *treeMap { // ancestor of the streams and uses virtualfs.StaticDirectory for internal nodes // in the hierarchy. Leaf nodes are virtualfs.StreamingDirectory with the given // DataCollections. -func inflateDirTree(ctx context.Context, collections []data.DataCollection, details *details.Details) (fs.Directory, error) { +func inflateDirTree(ctx context.Context, collections []data.Collection, details *details.Details) (fs.Directory, error) { roots := make(map[string]*treeMap) for _, s := range collections { @@ -228,7 +228,7 @@ func inflateDirTree(ctx context.Context, collections []data.DataCollection, deta func (w Wrapper) BackupCollections( ctx context.Context, - collections []data.DataCollection, + collections []data.Collection, ) (*BackupStats, *details.Details, error) { if w.c == nil { return nil, nil, errNotConnected @@ -332,7 +332,7 @@ func (w Wrapper) collectItems( snapshotID string, itemPath []string, isDirectory bool, -) ([]data.DataCollection, error) { +) ([]data.Collection, error) { e, err := w.getEntry(ctx, snapshotID, itemPath) if err != nil { return nil, err @@ -362,7 +362,7 @@ func (w Wrapper) collectItems( return nil, err } - return []data.DataCollection{c}, nil + return []data.Collection{c}, nil } // RestoreSingleItem looks up the item at the given path in the snapshot with id @@ -376,7 +376,7 @@ func (w Wrapper) RestoreSingleItem( ctx context.Context, snapshotID string, itemPath []string, -) (data.DataCollection, error) { +) (data.Collection, error) { c, err := w.collectItems(ctx, snapshotID, itemPath, false) if err != nil { return nil, err @@ -396,14 +396,14 @@ func restoreSingleItem( ctx context.Context, f fs.File, itemPath []string, -) (data.DataCollection, error) { +) (data.Collection, error) { r, err := f.Open(ctx) if err != nil { return nil, errors.Wrap(err, "opening file") } return &kopiaDataCollection{ - streams: []data.DataStream{ + streams: []data.Stream{ &kopiaDataStream{ uuid: f.Name(), reader: r, @@ -457,8 +457,8 @@ func restoreSubtree( ctx context.Context, dir fs.Directory, relativePath []string, -) ([]data.DataCollection, *multierror.Error) { - collections := []data.DataCollection{} +) ([]data.Collection, *multierror.Error) { + collections := []data.Collection{} // Want a local copy of relativePath with our new element. fullPath := append(append([]string{}, relativePath...), dir.Name()) var errs *multierror.Error @@ -475,7 +475,7 @@ func restoreSubtree( return nil, errs } - streams := make([]data.DataStream, 0, len(files)) + streams := make([]data.Stream, 0, len(files)) for _, f := range files { r, err := f.Open(ctx) @@ -524,7 +524,7 @@ func (w Wrapper) RestoreDirectory( ctx context.Context, snapshotID string, basePath []string, -) ([]data.DataCollection, error) { +) ([]data.Collection, error) { return w.collectItems(ctx, snapshotID, basePath, true) } @@ -539,9 +539,9 @@ func (w Wrapper) RestoreMultipleItems( ctx context.Context, snapshotID string, paths [][]string, -) ([]data.DataCollection, error) { +) ([]data.Collection, error) { var ( - dcs = []data.DataCollection{} + dcs = []data.Collection{} errs *multierror.Error ) for _, path := range paths { diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index cb6eeff88..3571ef4e4 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -63,7 +63,7 @@ func entriesToNames(entries []fs.Entry) []string { func testForFiles( t *testing.T, expected map[string][]byte, - collections []data.DataCollection, + collections []data.Collection, ) { count := 0 for _, c := range collections { @@ -119,7 +119,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() { details := &details.Details{} - collections := []data.DataCollection{ + collections := []data.Collection{ mockconnector.NewMockExchangeDataCollection( []string{tenant, user1, emails}, expectedFileCount[user1], @@ -181,7 +181,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() { expectedFileCount := 42 details := &details.Details{} - collections := []data.DataCollection{ + collections := []data.Collection{ mockconnector.NewMockExchangeDataCollection( []string{emails}, expectedFileCount, @@ -203,7 +203,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() { func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() { table := []struct { name string - layout []data.DataCollection + layout []data.Collection }{ { "MultipleRoots", @@ -214,7 +214,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() { // - user2 // - emails // - 42 separate files - []data.DataCollection{ + []data.Collection{ mockconnector.NewMockExchangeDataCollection( []string{"user1", "emails"}, 5, @@ -227,7 +227,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() { }, { "NoCollectionPath", - []data.DataCollection{ + []data.Collection{ mockconnector.NewMockExchangeDataCollection( nil, 5, @@ -242,7 +242,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() { // - emails // - 5 separate files // - 42 separate files - []data.DataCollection{ + []data.Collection{ mockconnector.NewMockExchangeDataCollection( []string{"a-tenant", "user1", "emails"}, 5, @@ -401,7 +401,7 @@ func (suite *KopiaIntegrationSuite) TearDownTest() { func (suite *KopiaIntegrationSuite) TestBackupCollections() { t := suite.T() - collections := []data.DataCollection{ + collections := []data.Collection{ mockconnector.NewMockExchangeDataCollection( []string{"a-tenant", "user1", "emails"}, 5, @@ -456,10 +456,10 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { suite.w = &Wrapper{c} - collections := []data.DataCollection{ + collections := []data.Collection{ &kopiaDataCollection{ path: testPath, - streams: []data.DataStream{ + streams: []data.Stream{ &mockconnector.MockExchangeData{ ID: testFileName, Reader: io.NopCloser(bytes.NewReader(testFileData)), @@ -472,7 +472,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { }, &kopiaDataCollection{ path: testPath2, - streams: []data.DataStream{ + streams: []data.Stream{ &mockconnector.MockExchangeData{ ID: testFileName3, Reader: io.NopCloser(bytes.NewReader(testFileData3)), @@ -688,7 +688,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() { fp1 := append(p1, dc1.Names[0]) fp2 := append(p2, dc2.Names[0]) - stats, _, err := w.BackupCollections(ctx, []data.DataCollection{dc1, dc2}) + stats, _, err := w.BackupCollections(ctx, []data.Collection{dc1, dc2}) require.NoError(t, err) expected := map[string][]byte{ diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 7646af269..371eec1dc 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -99,7 +99,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { return errors.Wrap(err, "connecting to graph api") } - var cs []data.DataCollection + var cs []data.Collection cs, err = gc.ExchangeDataCollection(ctx, op.Selectors) if err != nil { stats.readErr = err diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 89ca47468..cd32ecf54 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -69,7 +69,7 @@ func (op RestoreOperation) validate() error { // pointer wrapping the values, while those values // get populated asynchronously. type restoreStats struct { - cs []data.DataCollection + cs []data.Collection gc *support.ConnectorOperationStatus readErr, writeErr error } diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index e2e77b82c..1a91c5e53 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -46,7 +46,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { stats = restoreStats{ readErr: multierror.Append(nil, assert.AnError), writeErr: assert.AnError, - cs: []data.DataCollection{&connector.ExchangeDataCollection{}}, + cs: []data.Collection{&connector.ExchangeDataCollection{}}, gc: &support.ConnectorOperationStatus{ ObjectCount: 1, },