Move DataCollection to data package (#414)

DataCollection moved to `/src/internal/data` directory and imports changed to recognize the move.
This commit is contained in:
Danny 2022-07-27 08:04:35 -04:00 committed by GitHub
parent 587c239dd1
commit 3e792e69eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 127 additions and 120 deletions

View File

@ -5,49 +5,18 @@ import (
"io"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/pkg/backup/details"
)
var _ data.DataCollection = &ExchangeDataCollection{}
var _ data.DataStream = &ExchangeData{}
var _ data.DataStreamInfo = &ExchangeData{}
const (
// TODO: Reduce this when https://github.com/alcionai/corso/issues/124 is closed
// and we make channel population async (decouple from collection initialization)
collectionChannelBufferSize = 1000
collectionChannelBufferSize = 120
)
// A DataCollection represents a collection of data of the
// same type (e.g. mail)
type DataCollection interface {
// Items returns a channel from which items in the collection can be read.
// Each returned struct contains the next item in the collection
// The channel is closed when there are no more items in the collection or if
// an unrecoverable error caused an early termination in the sender.
Items() <-chan DataStream
// FullPath returns a slice of strings that act as metadata tags for this
// DataCollection. Returned items should be ordered from most generic to least
// generic. For example, a DataCollection for emails from a specific user
// would be {"<tenant id>", "<user ID>", "emails"}.
FullPath() []string
}
// DataStream represents a single item within a DataCollection
// that can be consumed as a stream (it embeds io.Reader)
type DataStream interface {
// ToReader returns an io.Reader for the DataStream
ToReader() io.ReadCloser
// UUID provides a unique identifier for this data
UUID() string
}
// DataStreamInfo is used to provide service specific
// information about the DataStream
type DataStreamInfo interface {
Info() details.ItemInfo
}
var _ DataCollection = &ExchangeDataCollection{}
var _ DataStream = &ExchangeData{}
var _ DataStreamInfo = &ExchangeData{}
// ExchangeDataCollection represents exchange mailbox
// data for a single user.
//
@ -55,7 +24,7 @@ var _ DataStreamInfo = &ExchangeData{}
type ExchangeDataCollection struct {
// M365 user
user string
data chan DataStream
data chan data.DataStream
tasks []string
updateCh chan support.ConnectorOperationStatus
service graphService
@ -70,7 +39,7 @@ type ExchangeDataCollection struct {
func NewExchangeDataCollection(aUser string, pathRepresentation []string) ExchangeDataCollection {
collection := ExchangeDataCollection{
user: aUser,
data: make(chan DataStream, collectionChannelBufferSize),
data: make(chan data.DataStream, collectionChannelBufferSize),
fullPath: pathRepresentation,
}
return collection
@ -88,7 +57,7 @@ func (edc *ExchangeDataCollection) FinishPopulation() {
}
}
func (edc *ExchangeDataCollection) Items() <-chan DataStream {
func (edc *ExchangeDataCollection) Items() <-chan data.DataStream {
return edc.data
}

View File

@ -64,22 +64,3 @@ func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_PopulateCol
}
suite.Equal(expected, len(edc.data))
}
func (suite *ExchangeDataCollectionSuite) TestExchangeDataCollection_NextItem() {
inputStrings := []string{"Jack", "and", "Jill", "went", "up", "the", "hill to",
"fetch", "a", "pale", "of", "water"}
expected := len(inputStrings) / 2 // We are using pairs
edc := NewExchangeDataCollection("Fletcher", []string{"sugar", "horses", "painted red"})
for i := 0; i < expected; i++ {
edc.PopulateCollection(&ExchangeData{id: inputStrings[i*2], message: []byte(inputStrings[i*2+1])})
}
edc.FinishPopulation() // finished writing
count := 0
for data := range edc.Items() {
assert.NotNil(suite.T(), data)
count++
}
assert.Equal(suite.T(), expected, count)
}

View File

@ -19,6 +19,7 @@ import (
"github.com/alcionai/corso/internal/connector/exchange"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/logger"
"github.com/alcionai/corso/pkg/selectors"
@ -190,13 +191,13 @@ func buildFromMap(isKey bool, mapping map[string]string) []string {
// use to read mailbox data out for the specified user
// Assumption: User exists
// Add iota to this call -> mail, contacts, calendar, etc.
func (gc *GraphConnector) ExchangeDataCollection(ctx context.Context, selector selectors.Selector) ([]DataCollection, error) {
func (gc *GraphConnector) ExchangeDataCollection(ctx context.Context, selector selectors.Selector) ([]data.DataCollection, error) {
eb, err := selector.ToExchangeBackup()
if err != nil {
return nil, errors.Wrap(err, "collecting exchange data")
}
collections := []DataCollection{}
collections := []data.DataCollection{}
scopes := eb.Scopes()
var errs error
@ -235,7 +236,7 @@ func (gc *GraphConnector) ExchangeDataCollection(ctx context.Context, selector s
// RestoreMessages: Utility function to connect to M365 backstore
// and upload messages from DataCollection.
// FullPath: tenantId, userId, <mailCategory>, FolderId
func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []DataCollection) error {
func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []data.DataCollection) error {
var (
pathCounter = map[string]bool{}
attempts, successes int
@ -421,7 +422,7 @@ func messageToDataCollection(
client *msgraphsdk.GraphServiceClient,
ctx context.Context,
objectWriter *kw.JsonSerializationWriter,
dataChannel chan<- DataStream,
dataChannel chan<- data.DataStream,
message models.Messageable,
user string,
) error {

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data"
ctesting "github.com/alcionai/corso/internal/testing"
"github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/credentials"
@ -88,7 +89,7 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_restoreMessages(
edc := NewExchangeDataCollection("tenant", []string{"tenantId", evs[user], mailCategory, "Inbox"})
edc.PopulateCollection(&ds)
edc.FinishPopulation()
err = suite.connector.RestoreMessages(context.Background(), []DataCollection{&edc})
err = suite.connector.RestoreMessages(context.Background(), []data.DataCollection{&edc})
assert.NoError(suite.T(), err)
}
@ -172,7 +173,7 @@ func (suite *DisconnectedGraphConnectorSuite) TestBuild() {
}
func (suite *DisconnectedGraphConnectorSuite) TestInterfaceAlignment() {
var dc DataCollection
var dc data.DataCollection
concrete := NewExchangeDataCollection("Check", []string{"interface", "works"})
dc = &concrete
assert.NotNil(suite.T(), dc)

View File

@ -7,7 +7,7 @@ import (
"github.com/google/uuid"
"github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/pkg/backup/details"
)
@ -20,9 +20,9 @@ type MockExchangeDataCollection struct {
}
var (
_ connector.DataCollection = &MockExchangeDataCollection{}
_ connector.DataStream = &MockExchangeData{}
_ connector.DataStreamInfo = &MockExchangeData{}
_ data.DataCollection = &MockExchangeDataCollection{}
_ data.DataStream = &MockExchangeData{}
_ data.DataStreamInfo = &MockExchangeData{}
)
// NewMockExchangeDataCollection creates an data collection that will return the specified number of
@ -49,8 +49,8 @@ func (medc *MockExchangeDataCollection) FullPath() []string {
// Items returns a channel that has the next items in the collection. The
// channel is closed when there are no more items available.
func (medc *MockExchangeDataCollection) Items() <-chan connector.DataStream {
res := make(chan connector.DataStream)
func (medc *MockExchangeDataCollection) Items() <-chan data.DataStream {
res := make(chan data.DataStream)
go func() {
defer close(res)

View File

@ -0,0 +1,37 @@
package data
import (
"io"
"github.com/alcionai/corso/pkg/backup/details"
)
// A DataCollection represents a collection of data of the
// same type (e.g. mail)
type DataCollection interface {
// Items returns a channel from which items in the collection can be read.
// Each returned struct contains the next item in the collection
// The channel is closed when there are no more items in the collection or if
// an unrecoverable error caused an early termination in the sender.
Items() <-chan DataStream
// FullPath returns a slice of strings that act as metadata tags for this
// DataCollection. Returned items should be ordered from most generic to least
// generic. For example, a DataCollection for emails from a specific user
// would be {"<tenant id>", "<user ID>", "emails"}.
FullPath() []string
}
// DataStream represents a single item within a DataCollection
// that can be consumed as a stream (it embeds io.Reader)
type DataStream interface {
// ToReader returns an io.Reader for the DataStream
ToReader() io.ReadCloser
// UUID provides a unique identifier for this data
UUID() string
}
// DataStreamInfo is used to provide service specific
// information about the DataStream
type DataStreamInfo interface {
Info() details.ItemInfo
}

View File

@ -0,0 +1,15 @@
package data
import (
"testing"
"github.com/stretchr/testify/suite"
)
type DataCollectionSuite struct {
suite.Suite
}
func TestDataCollectionSuite(t *testing.T) {
suite.Run(t, new(DataCollectionSuite))
}

View File

@ -3,19 +3,19 @@ package kopia
import (
"io"
"github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/data"
)
var _ connector.DataCollection = &kopiaDataCollection{}
var _ connector.DataStream = &kopiaDataStream{}
var _ data.DataCollection = &kopiaDataCollection{}
var _ data.DataStream = &kopiaDataStream{}
type kopiaDataCollection struct {
path []string
streams []connector.DataStream
streams []data.DataStream
}
func (kdc *kopiaDataCollection) Items() <-chan connector.DataStream {
res := make(chan connector.DataStream)
func (kdc *kopiaDataCollection) Items() <-chan data.DataStream {
res := make(chan data.DataStream)
go func() {
defer close(res)

View File

@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/data"
)
// ---------------
@ -30,7 +30,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsPath() {
path := []string{"some", "path", "for", "data"}
c := kopiaDataCollection{
streams: []connector.DataStream{},
streams: []data.DataStream{},
path: path,
}
@ -38,7 +38,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsPath() {
}
func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() {
data := [][]byte{
testData := [][]byte{
[]byte("abcdefghijklmnopqrstuvwxyz"),
[]byte("zyxwvutsrqponmlkjihgfedcba"),
}
@ -50,26 +50,26 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() {
table := []struct {
name string
streams []connector.DataStream
streams []data.DataStream
}{
{
name: "SingleStream",
streams: []connector.DataStream{
streams: []data.DataStream{
&kopiaDataStream{
reader: io.NopCloser(bytes.NewReader(data[0])),
reader: io.NopCloser(bytes.NewReader(testData[0])),
uuid: uuids[0],
},
},
},
{
name: "MultipleStreams",
streams: []connector.DataStream{
streams: []data.DataStream{
&kopiaDataStream{
reader: io.NopCloser(bytes.NewReader(data[0])),
reader: io.NopCloser(bytes.NewReader(testData[0])),
uuid: uuids[0],
},
&kopiaDataStream{
reader: io.NopCloser(bytes.NewReader(data[1])),
reader: io.NopCloser(bytes.NewReader(testData[1])),
uuid: uuids[1],
},
},
@ -91,7 +91,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() {
buf, err := ioutil.ReadAll(returnedStream.ToReader())
require.NoError(t, err)
assert.Equal(t, buf, data[count])
assert.Equal(t, buf, testData[count])
count++
}

View File

@ -14,7 +14,7 @@ import (
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
"github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/pkg/backup/details"
"github.com/alcionai/corso/pkg/logger"
)
@ -80,7 +80,7 @@ func (w *Wrapper) Close(ctx context.Context) error {
// kopia callbacks on directory entries. It binds the directory to the given
// DataCollection.
func getStreamItemFunc(
collection connector.DataCollection,
collection data.DataCollection,
details *details.Details,
) func(context.Context, func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
@ -93,7 +93,7 @@ func getStreamItemFunc(
if !ok {
return nil
}
ei, ok := e.(connector.DataStreamInfo)
ei, ok := e.(data.DataStreamInfo)
if !ok {
return errors.New("item does not implement DataStreamInfo")
}
@ -143,7 +143,7 @@ func buildKopiaDirs(dirName string, dir *treeMap, details *details.Details) (fs.
type treeMap struct {
childDirs map[string]*treeMap
collection connector.DataCollection
collection data.DataCollection
}
func newTreeMap() *treeMap {
@ -156,7 +156,7 @@ func newTreeMap() *treeMap {
// ancestor of the streams and uses virtualfs.StaticDirectory for internal nodes
// in the hierarchy. Leaf nodes are virtualfs.StreamingDirectory with the given
// DataCollections.
func inflateDirTree(ctx context.Context, collections []connector.DataCollection, details *details.Details) (fs.Directory, error) {
func inflateDirTree(ctx context.Context, collections []data.DataCollection, details *details.Details) (fs.Directory, error) {
roots := make(map[string]*treeMap)
for _, s := range collections {
@ -228,7 +228,7 @@ func inflateDirTree(ctx context.Context, collections []connector.DataCollection,
func (w Wrapper) BackupCollections(
ctx context.Context,
collections []connector.DataCollection,
collections []data.DataCollection,
) (*BackupStats, *details.Details, error) {
if w.c == nil {
return nil, nil, errNotConnected
@ -332,7 +332,7 @@ func (w Wrapper) collectItems(
snapshotID string,
itemPath []string,
isDirectory bool,
) ([]connector.DataCollection, error) {
) ([]data.DataCollection, error) {
e, err := w.getEntry(ctx, snapshotID, itemPath)
if err != nil {
return nil, err
@ -362,7 +362,7 @@ func (w Wrapper) collectItems(
return nil, err
}
return []connector.DataCollection{c}, nil
return []data.DataCollection{c}, nil
}
// RestoreSingleItem looks up the item at the given path in the snapshot with id
@ -376,7 +376,7 @@ func (w Wrapper) RestoreSingleItem(
ctx context.Context,
snapshotID string,
itemPath []string,
) (connector.DataCollection, error) {
) (data.DataCollection, error) {
c, err := w.collectItems(ctx, snapshotID, itemPath, false)
if err != nil {
return nil, err
@ -396,14 +396,14 @@ func restoreSingleItem(
ctx context.Context,
f fs.File,
itemPath []string,
) (connector.DataCollection, error) {
) (data.DataCollection, error) {
r, err := f.Open(ctx)
if err != nil {
return nil, errors.Wrap(err, "opening file")
}
return &kopiaDataCollection{
streams: []connector.DataStream{
streams: []data.DataStream{
&kopiaDataStream{
uuid: f.Name(),
reader: r,
@ -457,8 +457,8 @@ func restoreSubtree(
ctx context.Context,
dir fs.Directory,
relativePath []string,
) ([]connector.DataCollection, *multierror.Error) {
collections := []connector.DataCollection{}
) ([]data.DataCollection, *multierror.Error) {
collections := []data.DataCollection{}
// Want a local copy of relativePath with our new element.
fullPath := append(append([]string{}, relativePath...), dir.Name())
var errs *multierror.Error
@ -475,7 +475,7 @@ func restoreSubtree(
return nil, errs
}
streams := make([]connector.DataStream, 0, len(files))
streams := make([]data.DataStream, 0, len(files))
for _, f := range files {
r, err := f.Open(ctx)
@ -524,7 +524,7 @@ func (w Wrapper) RestoreDirectory(
ctx context.Context,
snapshotID string,
basePath []string,
) ([]connector.DataCollection, error) {
) ([]data.DataCollection, error) {
return w.collectItems(ctx, snapshotID, basePath, true)
}
@ -539,9 +539,9 @@ func (w Wrapper) RestoreMultipleItems(
ctx context.Context,
snapshotID string,
paths [][]string,
) ([]connector.DataCollection, error) {
) ([]data.DataCollection, error) {
var (
dcs = []connector.DataCollection{}
dcs = []data.DataCollection{}
errs *multierror.Error
)
for _, path := range paths {

View File

@ -16,8 +16,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/connector/mockconnector"
"github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/internal/kopia/mockkopia"
ctesting "github.com/alcionai/corso/internal/testing"
"github.com/alcionai/corso/pkg/backup/details"
@ -63,7 +63,7 @@ func entriesToNames(entries []fs.Entry) []string {
func testForFiles(
t *testing.T,
expected map[string][]byte,
collections []connector.DataCollection,
collections []data.DataCollection,
) {
count := 0
for _, c := range collections {
@ -119,7 +119,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() {
details := &details.Details{}
collections := []connector.DataCollection{
collections := []data.DataCollection{
mockconnector.NewMockExchangeDataCollection(
[]string{tenant, user1, emails},
expectedFileCount[user1],
@ -181,7 +181,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() {
expectedFileCount := 42
details := &details.Details{}
collections := []connector.DataCollection{
collections := []data.DataCollection{
mockconnector.NewMockExchangeDataCollection(
[]string{emails},
expectedFileCount,
@ -203,7 +203,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() {
func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() {
table := []struct {
name string
layout []connector.DataCollection
layout []data.DataCollection
}{
{
"MultipleRoots",
@ -214,7 +214,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() {
// - user2
// - emails
// - 42 separate files
[]connector.DataCollection{
[]data.DataCollection{
mockconnector.NewMockExchangeDataCollection(
[]string{"user1", "emails"},
5,
@ -227,7 +227,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() {
},
{
"NoCollectionPath",
[]connector.DataCollection{
[]data.DataCollection{
mockconnector.NewMockExchangeDataCollection(
nil,
5,
@ -242,7 +242,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() {
// - emails
// - 5 separate files
// - 42 separate files
[]connector.DataCollection{
[]data.DataCollection{
mockconnector.NewMockExchangeDataCollection(
[]string{"a-tenant", "user1", "emails"},
5,
@ -401,7 +401,7 @@ func (suite *KopiaIntegrationSuite) TearDownTest() {
func (suite *KopiaIntegrationSuite) TestBackupCollections() {
t := suite.T()
collections := []connector.DataCollection{
collections := []data.DataCollection{
mockconnector.NewMockExchangeDataCollection(
[]string{"a-tenant", "user1", "emails"},
5,
@ -456,10 +456,10 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
suite.w = &Wrapper{c}
collections := []connector.DataCollection{
collections := []data.DataCollection{
&kopiaDataCollection{
path: testPath,
streams: []connector.DataStream{
streams: []data.DataStream{
&mockconnector.MockExchangeData{
ID: testFileName,
Reader: io.NopCloser(bytes.NewReader(testFileData)),
@ -472,7 +472,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
},
&kopiaDataCollection{
path: testPath2,
streams: []connector.DataStream{
streams: []data.DataStream{
&mockconnector.MockExchangeData{
ID: testFileName3,
Reader: io.NopCloser(bytes.NewReader(testFileData3)),
@ -688,7 +688,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() {
fp1 := append(p1, dc1.Names[0])
fp2 := append(p2, dc2.Names[0])
stats, _, err := w.BackupCollections(ctx, []connector.DataCollection{dc1, dc2})
stats, _, err := w.BackupCollections(ctx, []data.DataCollection{dc1, dc2})
require.NoError(t, err)
expected := map[string][]byte{

View File

@ -8,6 +8,7 @@ import (
"github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/internal/kopia"
"github.com/alcionai/corso/internal/model"
"github.com/alcionai/corso/internal/stats"
@ -98,7 +99,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
return errors.Wrap(err, "connecting to graph api")
}
var cs []connector.DataCollection
var cs []data.DataCollection
cs, err = gc.ExchangeDataCollection(ctx, op.Selectors)
if err != nil {
stats.readErr = err

View File

@ -9,6 +9,7 @@ import (
"github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/internal/kopia"
"github.com/alcionai/corso/internal/model"
"github.com/alcionai/corso/internal/stats"
@ -68,7 +69,7 @@ func (op RestoreOperation) validate() error {
// pointer wrapping the values, while those values
// get populated asynchronously.
type restoreStats struct {
cs []connector.DataCollection
cs []data.DataCollection
gc *support.ConnectorOperationStatus
readErr, writeErr error
}

View File

@ -12,6 +12,7 @@ import (
"github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/internal/kopia"
ctesting "github.com/alcionai/corso/internal/testing"
"github.com/alcionai/corso/pkg/account"
@ -45,7 +46,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
stats = restoreStats{
readErr: multierror.Append(nil, assert.AnError),
writeErr: assert.AnError,
cs: []connector.DataCollection{&connector.ExchangeDataCollection{}},
cs: []data.DataCollection{&connector.ExchangeDataCollection{}},
gc: &support.ConnectorOperationStatus{
ObjectCount: 1,
},