OneDrive Collection (#580)
## Description Introduces a OneDrive data collection. Follow up PRs will implement the `collection.driveItemReader()` method that uses the Graph API ## Type of change Please check the type of change your PR introduces: - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Test - [ ] 🐹 Trivial/Minor ## Issue(s) - #387 ## Test Plan <!-- How will this be tested prior to merging.--> - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
a1b9f876ae
commit
c4e5915983
137
src/internal/connector/onedrive/collection.go
Normal file
137
src/internal/connector/onedrive/collection.go
Normal file
@ -0,0 +1,137 @@
|
||||
// Package onedrive provides support for retrieving M365 OneDrive objects
|
||||
package onedrive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/alcionai/corso/internal/connector/graph"
|
||||
"github.com/alcionai/corso/internal/connector/support"
|
||||
"github.com/alcionai/corso/internal/data"
|
||||
"github.com/alcionai/corso/pkg/backup/details"
|
||||
"github.com/alcionai/corso/pkg/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
// TODO: This number needs to be tuned
|
||||
collectionChannelBufferSize = 50
|
||||
)
|
||||
|
||||
var (
|
||||
_ data.Collection = &Collection{}
|
||||
_ data.Stream = &Item{}
|
||||
_ data.StreamInfo = &Item{}
|
||||
)
|
||||
|
||||
// Collection represents a set of OneDrive objects retreived from M365
|
||||
type Collection struct {
|
||||
// data is used to share data streams with the collection consumer
|
||||
data chan data.Stream
|
||||
// folderPath indicates what level in the hierarchy this collection
|
||||
// represents
|
||||
folderPath string
|
||||
// M365 IDs of file items within this collection
|
||||
driveItemIDs []string
|
||||
// M365 ID of the drive this collection was created from
|
||||
driveID string
|
||||
service graph.Service
|
||||
statusCh chan<- *support.ConnectorOperationStatus
|
||||
itemReader itemReaderFunc
|
||||
}
|
||||
|
||||
// itemReadFunc returns a reader for the specified item
|
||||
type itemReaderFunc func(ctx context.Context, itemID string) (name string, itemData io.ReadCloser, err error)
|
||||
|
||||
// NewCollection creates a Collection
|
||||
func NewCollection(folderPath, driveID string, service graph.Service,
|
||||
statusCh chan<- *support.ConnectorOperationStatus,
|
||||
) *Collection {
|
||||
c := &Collection{
|
||||
folderPath: folderPath,
|
||||
driveItemIDs: []string{},
|
||||
driveID: driveID,
|
||||
service: service,
|
||||
data: make(chan data.Stream, collectionChannelBufferSize),
|
||||
statusCh: statusCh,
|
||||
}
|
||||
// Allows tests to set a mock populator
|
||||
c.itemReader = c.driveItemReader
|
||||
return c
|
||||
}
|
||||
|
||||
// TODO: Implement drive item reader
|
||||
func (oc *Collection) driveItemReader(
|
||||
ctx context.Context,
|
||||
itemID string,
|
||||
) (name string, itemData io.ReadCloser, err error) {
|
||||
return "", nil, nil
|
||||
}
|
||||
|
||||
// Adds an itemID to the collection
|
||||
// This will make it eligible to be populated
|
||||
func (oc *Collection) Add(itemID string) {
|
||||
oc.driveItemIDs = append(oc.driveItemIDs, itemID)
|
||||
}
|
||||
|
||||
// Items() returns the channel containing M365 Exchange objects
|
||||
func (oc *Collection) Items() <-chan data.Stream {
|
||||
go oc.populateItems(context.Background())
|
||||
return oc.data
|
||||
}
|
||||
|
||||
func (oc *Collection) FullPath() []string {
|
||||
return filepath.SplitList(oc.folderPath)
|
||||
}
|
||||
|
||||
// Item represents a single item retrieved from OneDrive
|
||||
type Item struct {
|
||||
id string
|
||||
data io.ReadCloser
|
||||
info *details.OnedriveInfo
|
||||
}
|
||||
|
||||
func (od *Item) UUID() string {
|
||||
return od.id
|
||||
}
|
||||
|
||||
func (od *Item) ToReader() io.ReadCloser {
|
||||
return od.data
|
||||
}
|
||||
|
||||
func (od *Item) Info() details.ItemInfo {
|
||||
return details.ItemInfo{Onedrive: od.info}
|
||||
}
|
||||
|
||||
// populateItems iterates through items added to the collection
|
||||
// and uses the collection `itemReader` to read the item
|
||||
func (oc *Collection) populateItems(ctx context.Context) {
|
||||
var errs error
|
||||
itemsRead := 0
|
||||
for _, itemID := range oc.driveItemIDs {
|
||||
// Read the item
|
||||
itemName, itemData, err := oc.itemReader(ctx, itemID)
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppendf(itemID, err, errs)
|
||||
if oc.service.ErrPolicy() {
|
||||
break
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Item read successfully, add to collection
|
||||
itemsRead++
|
||||
oc.data <- &Item{
|
||||
id: itemID,
|
||||
data: itemData,
|
||||
info: &details.OnedriveInfo{ItemName: itemName, ParentPath: oc.folderPath},
|
||||
}
|
||||
}
|
||||
close(oc.data)
|
||||
status := support.CreateStatus(ctx, support.Backup,
|
||||
len(oc.driveItemIDs), // items to read
|
||||
itemsRead, // items read successfully
|
||||
1, // num folders (always 1)
|
||||
errs)
|
||||
logger.Ctx(ctx).Debug(status.String())
|
||||
oc.statusCh <- status
|
||||
}
|
||||
95
src/internal/connector/onedrive/collection_test.go
Normal file
95
src/internal/connector/onedrive/collection_test.go
Normal file
@ -0,0 +1,95 @@
|
||||
package onedrive
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/internal/data"
|
||||
)
|
||||
|
||||
type OnedriveCollectionSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
// Allows `*OnedriveCollectionSuite` to be used as a graph.Service
|
||||
// TODO: Implement these methods
|
||||
|
||||
func (suite *OnedriveCollectionSuite) Client() *msgraphsdk.GraphServiceClient {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (suite *OnedriveCollectionSuite) Adapter() *msgraphsdk.GraphRequestAdapter {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (suite *OnedriveCollectionSuite) ErrPolicy() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func TestOnedriveCollectionSuite(t *testing.T) {
|
||||
suite.Run(t, new(OnedriveCollectionSuite))
|
||||
}
|
||||
|
||||
func (suite *OnedriveCollectionSuite) TestOnedriveCollection() {
|
||||
folderPath := "dir1/dir2/dir3"
|
||||
coll := NewCollection(folderPath, "fakeDriveID", suite, nil)
|
||||
require.NotNil(suite.T(), coll)
|
||||
assert.Equal(suite.T(), filepath.SplitList(folderPath), coll.FullPath())
|
||||
|
||||
testItemID := "fakeItemID"
|
||||
testItemName := "itemName"
|
||||
testItemData := []byte("testdata")
|
||||
|
||||
// Set a item reader, add an item and validate we get the item back
|
||||
coll.Add(testItemID)
|
||||
|
||||
coll.itemReader = func(ctx context.Context, itemID string) (name string, data io.ReadCloser, err error) {
|
||||
return testItemName, io.NopCloser(bytes.NewReader(testItemData)), nil
|
||||
}
|
||||
|
||||
// Read items from the collection
|
||||
readItems := []data.Stream{}
|
||||
for item := range coll.Items() {
|
||||
readItems = append(readItems, item)
|
||||
}
|
||||
|
||||
// Expect only 1 item
|
||||
require.Len(suite.T(), readItems, 1)
|
||||
|
||||
// Validate item info and data
|
||||
readItem := readItems[0]
|
||||
readItemInfo := readItem.(data.StreamInfo)
|
||||
|
||||
assert.Equal(suite.T(), testItemID, readItem.UUID())
|
||||
readData, err := io.ReadAll(readItem.ToReader())
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
assert.Equal(suite.T(), testItemData, readData)
|
||||
require.NotNil(suite.T(), readItemInfo.Info())
|
||||
require.NotNil(suite.T(), readItemInfo.Info().Onedrive)
|
||||
assert.Equal(suite.T(), testItemName, readItemInfo.Info().Onedrive.ItemName)
|
||||
assert.Equal(suite.T(), folderPath, readItemInfo.Info().Onedrive.ParentPath)
|
||||
}
|
||||
|
||||
func (suite *OnedriveCollectionSuite) TestOnedriveCollectionReadError() {
|
||||
coll := NewCollection("folderPath", "fakeDriveID", suite, nil)
|
||||
coll.Add("testItemID")
|
||||
|
||||
readError := errors.New("Test error")
|
||||
|
||||
coll.itemReader = func(ctx context.Context, itemID string) (name string, data io.ReadCloser, err error) {
|
||||
return "", nil, readError
|
||||
}
|
||||
|
||||
// Expect no items
|
||||
require.Len(suite.T(), coll.Items(), 0)
|
||||
}
|
||||
@ -23,6 +23,12 @@ type Details struct {
|
||||
mu sync.Mutex `json:"-"`
|
||||
}
|
||||
|
||||
func (d *Details) Add(repoRef string, info ItemInfo) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
d.Entries = append(d.Entries, DetailsEntry{RepoRef: repoRef, ItemInfo: info})
|
||||
}
|
||||
|
||||
// DetailsEntry describes a single item stored in a Backup
|
||||
type DetailsEntry struct {
|
||||
// TODO: `RepoRef` is currently the full path to the item in Kopia
|
||||
@ -57,6 +63,9 @@ func (de DetailsEntry) Headers() []string {
|
||||
if de.ItemInfo.Sharepoint != nil {
|
||||
hs = append(hs, de.ItemInfo.Sharepoint.Headers()...)
|
||||
}
|
||||
if de.ItemInfo.Onedrive != nil {
|
||||
hs = append(hs, de.ItemInfo.Onedrive.Headers()...)
|
||||
}
|
||||
return hs
|
||||
}
|
||||
|
||||
@ -69,6 +78,9 @@ func (de DetailsEntry) Values() []string {
|
||||
if de.ItemInfo.Sharepoint != nil {
|
||||
vs = append(vs, de.ItemInfo.Sharepoint.Values()...)
|
||||
}
|
||||
if de.ItemInfo.Onedrive != nil {
|
||||
vs = append(vs, de.ItemInfo.Onedrive.Values()...)
|
||||
}
|
||||
return vs
|
||||
}
|
||||
|
||||
@ -77,6 +89,7 @@ func (de DetailsEntry) Values() []string {
|
||||
type ItemInfo struct {
|
||||
Exchange *ExchangeInfo `json:"exchange,omitempty"`
|
||||
Sharepoint *SharepointInfo `json:"sharepoint,omitempty"`
|
||||
Onedrive *OnedriveInfo `json:"onedrive,omitempty"`
|
||||
}
|
||||
|
||||
// ExchangeInfo describes an exchange item
|
||||
@ -104,12 +117,6 @@ func (e ExchangeInfo) Values() []string {
|
||||
// just to illustrate usage
|
||||
type SharepointInfo struct{}
|
||||
|
||||
func (d *Details) Add(repoRef string, info ItemInfo) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
d.Entries = append(d.Entries, DetailsEntry{RepoRef: repoRef, ItemInfo: info})
|
||||
}
|
||||
|
||||
// Headers returns the human-readable names of properties in a SharepointInfo
|
||||
// for printing out to a terminal in a columnar display.
|
||||
func (s SharepointInfo) Headers() []string {
|
||||
@ -121,3 +128,21 @@ func (s SharepointInfo) Headers() []string {
|
||||
func (s SharepointInfo) Values() []string {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
// OnedriveInfo describes a onedrive item
|
||||
type OnedriveInfo struct {
|
||||
ParentPath string `json:"parentPath"`
|
||||
ItemName string `json:"itemName"`
|
||||
}
|
||||
|
||||
// Headers returns the human-readable names of properties in a OnedriveInfo
|
||||
// for printing out to a terminal in a columnar display.
|
||||
func (oi OnedriveInfo) Headers() []string {
|
||||
return []string{"ItemName", "ParentPath"}
|
||||
}
|
||||
|
||||
// Values returns the values matching the Headers list for printing
|
||||
// out to a terminal in a columnar display.
|
||||
func (oi OnedriveInfo) Values() []string {
|
||||
return []string{oi.ItemName, oi.ParentPath}
|
||||
}
|
||||
|
||||
@ -159,6 +159,20 @@ func (suite *DetailsUnitSuite) TestDetailsEntry_HeadersValues() {
|
||||
expectHs: []string{"Repo Ref"},
|
||||
expectVs: []string{"reporef"},
|
||||
},
|
||||
{
|
||||
name: "onedrive info",
|
||||
entry: details.DetailsEntry{
|
||||
RepoRef: "reporef",
|
||||
ItemInfo: details.ItemInfo{
|
||||
Onedrive: &details.OnedriveInfo{
|
||||
ItemName: "itemName",
|
||||
ParentPath: "parentPath",
|
||||
},
|
||||
},
|
||||
},
|
||||
expectHs: []string{"Repo Ref", "ItemName", "ParentPath"},
|
||||
expectVs: []string{"reporef", "itemName", "parentPath"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range table {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user