Onedrive item reader (#582)
## Description This contains the following changes: - Support functions to enumerate a users drives and the files within it. - `driveItemReader` method that that reads a drive item - Integration tests for the above ## Type of change Please check the type of change your PR introduces: - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Test - [ ] 🐹 Trivial/Minor ## Issue(s) - #388 ## Test Plan <!-- How will this be tested prior to merging.--> - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
649255f112
commit
8fd867745f
@ -41,7 +41,11 @@ type Collection struct {
|
||||
}
|
||||
|
||||
// itemReadFunc returns a reader for the specified item
|
||||
type itemReaderFunc func(ctx context.Context, itemID string) (name string, itemData io.ReadCloser, err error)
|
||||
type itemReaderFunc func(
|
||||
ctx context.Context,
|
||||
service graph.Service,
|
||||
driveID, itemID string,
|
||||
) (name string, itemData io.ReadCloser, err error)
|
||||
|
||||
// NewCollection creates a Collection
|
||||
func NewCollection(folderPath, driveID string, service graph.Service,
|
||||
@ -56,18 +60,10 @@ func NewCollection(folderPath, driveID string, service graph.Service,
|
||||
statusCh: statusCh,
|
||||
}
|
||||
// Allows tests to set a mock populator
|
||||
c.itemReader = c.driveItemReader
|
||||
c.itemReader = driveItemReader
|
||||
return c
|
||||
}
|
||||
|
||||
// TODO: Implement drive item reader
|
||||
func (oc *Collection) driveItemReader(
|
||||
ctx context.Context,
|
||||
itemID string,
|
||||
) (name string, itemData io.ReadCloser, err error) {
|
||||
return "", nil, nil
|
||||
}
|
||||
|
||||
// Adds an itemID to the collection
|
||||
// This will make it eligible to be populated
|
||||
func (oc *Collection) Add(itemID string) {
|
||||
@ -110,7 +106,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
itemsRead := 0
|
||||
for _, itemID := range oc.driveItemIDs {
|
||||
// Read the item
|
||||
itemName, itemData, err := oc.itemReader(ctx, itemID)
|
||||
itemName, itemData, err := oc.itemReader(ctx, oc.service, oc.driveID, itemID)
|
||||
if err != nil {
|
||||
errs = support.WrapAndAppendf(itemID, err, errs)
|
||||
if oc.service.ErrPolicy() {
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/internal/connector/graph"
|
||||
"github.com/alcionai/corso/internal/data"
|
||||
)
|
||||
|
||||
@ -52,7 +53,7 @@ func (suite *OnedriveCollectionSuite) TestOnedriveCollection() {
|
||||
// Set a item reader, add an item and validate we get the item back
|
||||
coll.Add(testItemID)
|
||||
|
||||
coll.itemReader = func(ctx context.Context, itemID string) (name string, data io.ReadCloser, err error) {
|
||||
coll.itemReader = func(context.Context, graph.Service, string, string) (string, io.ReadCloser, error) {
|
||||
return testItemName, io.NopCloser(bytes.NewReader(testItemData)), nil
|
||||
}
|
||||
|
||||
@ -86,7 +87,7 @@ func (suite *OnedriveCollectionSuite) TestOnedriveCollectionReadError() {
|
||||
|
||||
readError := errors.New("Test error")
|
||||
|
||||
coll.itemReader = func(ctx context.Context, itemID string) (name string, data io.ReadCloser, err error) {
|
||||
coll.itemReader = func(context.Context, graph.Service, string, string) (name string, data io.ReadCloser, err error) {
|
||||
return "", nil, readError
|
||||
}
|
||||
|
||||
|
||||
72
src/internal/connector/onedrive/drive.go
Normal file
72
src/internal/connector/onedrive/drive.go
Normal file
@ -0,0 +1,72 @@
|
||||
package onedrive
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/drives/item/root/delta"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/internal/connector/graph"
|
||||
"github.com/alcionai/corso/internal/connector/support"
|
||||
"github.com/alcionai/corso/pkg/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
// nextLinkKey is used to find the next link in a paged
|
||||
// graph response
|
||||
nextLinkKey = "@odata.nextLink"
|
||||
)
|
||||
|
||||
// Enumerates the drives for the specified user
|
||||
func drives(ctx context.Context, service graph.Service, user string) ([]models.Driveable, error) {
|
||||
r, err := service.Client().UsersById(user).Drives().Get()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to retrieve user drives. user: %s, details: %s",
|
||||
user, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
logger.Ctx(ctx).Debugf("Found %d drives for user %s", len(r.GetValue()), user)
|
||||
|
||||
return r.GetValue(), nil
|
||||
}
|
||||
|
||||
// itemCollector functions collect the items found in a drive
|
||||
type itemCollector func(ctx context.Context, driveID string, items []models.DriveItemable) error
|
||||
|
||||
// collectItems will enumerate all items in the specified drive and hand them to the
|
||||
// provided `collector` method
|
||||
func collectItems(
|
||||
ctx context.Context,
|
||||
service graph.Service,
|
||||
driveID string,
|
||||
collector itemCollector,
|
||||
) error {
|
||||
// TODO: Specify a timestamp in the delta query
|
||||
// https://docs.microsoft.com/en-us/graph/api/driveitem-delta?
|
||||
// view=graph-rest-1.0&tabs=http#example-4-retrieving-delta-results-using-a-timestamp
|
||||
builder := service.Client().DrivesById(driveID).Root().Delta()
|
||||
for {
|
||||
r, err := builder.Get()
|
||||
if err != nil {
|
||||
return errors.Wrapf(
|
||||
err,
|
||||
"failed to query drive items. details: %s",
|
||||
support.ConnectorStackErrorTrace(err),
|
||||
)
|
||||
}
|
||||
|
||||
err = collector(ctx, driveID, r.GetValue())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if there are more items
|
||||
if _, found := r.GetAdditionalData()[nextLinkKey]; !found {
|
||||
break
|
||||
}
|
||||
nextLink := r.GetAdditionalData()[nextLinkKey].(*string)
|
||||
logger.Ctx(ctx).Debugf("Found %s nextLink", *nextLink)
|
||||
builder = delta.NewDeltaRequestBuilder(*nextLink, service.Adapter())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
53
src/internal/connector/onedrive/item.go
Normal file
53
src/internal/connector/onedrive/item.go
Normal file
@ -0,0 +1,53 @@
|
||||
package onedrive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/internal/connector/graph"
|
||||
"github.com/alcionai/corso/pkg/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
// downloadUrlKey is used to find the download URL in a
|
||||
// DriveItem response
|
||||
downloadURLKey = "@microsoft.graph.downloadUrl"
|
||||
)
|
||||
|
||||
// itemReader will return a io.ReadCloser for the specified item
|
||||
// It crafts this by querying M365 for a download URL for the item
|
||||
// and using a http client to initialize a reader
|
||||
func driveItemReader(
|
||||
ctx context.Context,
|
||||
service graph.Service,
|
||||
driveID, itemID string,
|
||||
) (string, io.ReadCloser, error) {
|
||||
logger.Ctx(ctx).Debugf("Reading Item %s at %s", itemID, time.Now())
|
||||
|
||||
item, err := service.Client().DrivesById(driveID).ItemsById(itemID).Get()
|
||||
if err != nil {
|
||||
return "", nil, errors.Wrapf(err, "failed to get item %s", itemID)
|
||||
}
|
||||
|
||||
// Get the download URL - https://docs.microsoft.com/en-us/graph/api/driveitem-get-content
|
||||
// These URLs are pre-authenticated and can be used to download the data using the standard
|
||||
// http client
|
||||
if _, found := item.GetAdditionalData()[downloadURLKey]; !found {
|
||||
return "", nil, errors.Errorf("file does not have a download URL. ID: %s, %#v",
|
||||
itemID, item.GetAdditionalData())
|
||||
}
|
||||
downloadURL := item.GetAdditionalData()[downloadURLKey].(*string)
|
||||
|
||||
// TODO: We should use the `msgraphgocore` http client which has the right
|
||||
// middleware/options configured
|
||||
resp, err := http.Get(*downloadURL)
|
||||
if err != nil {
|
||||
return "", nil, errors.Wrapf(err, "failed to download file from %s", *downloadURL)
|
||||
}
|
||||
|
||||
return *item.GetName(), resp.Body, nil
|
||||
}
|
||||
109
src/internal/connector/onedrive/item_test.go
Normal file
109
src/internal/connector/onedrive/item_test.go
Normal file
@ -0,0 +1,109 @@
|
||||
package onedrive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/internal/connector/graph"
|
||||
"github.com/alcionai/corso/internal/tester"
|
||||
)
|
||||
|
||||
type ItemIntegrationSuite struct {
|
||||
suite.Suite
|
||||
client *msgraphsdk.GraphServiceClient
|
||||
adapter *msgraphsdk.GraphRequestAdapter
|
||||
}
|
||||
|
||||
func (suite *ItemIntegrationSuite) Client() *msgraphsdk.GraphServiceClient {
|
||||
return suite.client
|
||||
}
|
||||
|
||||
func (suite *ItemIntegrationSuite) Adapter() *msgraphsdk.GraphRequestAdapter {
|
||||
return suite.adapter
|
||||
}
|
||||
|
||||
func (suite *ItemIntegrationSuite) ErrPolicy() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func TestItemIntegrationSuite(t *testing.T) {
|
||||
if err := tester.RunOnAny(
|
||||
tester.CorsoCITests,
|
||||
tester.CorsoGraphConnectorTests,
|
||||
); err != nil {
|
||||
t.Skip(err)
|
||||
}
|
||||
suite.Run(t, new(ItemIntegrationSuite))
|
||||
}
|
||||
|
||||
func (suite *ItemIntegrationSuite) SetupSuite() {
|
||||
_, err := tester.GetRequiredEnvVars(tester.M365AcctCredEnvs...)
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
a := tester.NewM365Account(suite.T())
|
||||
|
||||
m365, err := a.M365Config()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
adapter, err := graph.CreateAdapter(m365.TenantID, m365.ClientID, m365.ClientSecret)
|
||||
require.NoError(suite.T(), err)
|
||||
suite.client = msgraphsdk.NewGraphServiceClient(adapter)
|
||||
suite.adapter = adapter
|
||||
}
|
||||
|
||||
// TestItemReader is an integration test that makes a few assumptions
|
||||
// about the test environment
|
||||
// 1) It assumes the test user has a drive
|
||||
// 2) It assumes the drive has a file it can use to test `driveItemReader`
|
||||
// The test checks these in below
|
||||
func (suite *ItemIntegrationSuite) TestItemReader() {
|
||||
ctx := context.TODO()
|
||||
user := tester.M365UserID(suite.T())
|
||||
|
||||
drives, err := drives(ctx, suite, user)
|
||||
require.NoError(suite.T(), err)
|
||||
// Test Requirement 1: Need a drive
|
||||
require.Greaterf(suite.T(), len(drives), 0, "user %s does not have a drive", user)
|
||||
|
||||
// Pick the first drive
|
||||
driveID := *drives[0].GetId()
|
||||
|
||||
var driveItemID string
|
||||
// This item collector tries to find "a" drive item that is a file to test the reader function
|
||||
itemCollector := func(ctx context.Context, driveID string, items []models.DriveItemable) error {
|
||||
for _, item := range items {
|
||||
if item.GetFile() != nil {
|
||||
driveItemID = *item.GetId()
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
err = collectItems(ctx, suite, driveID, itemCollector)
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
// Test Requirement 2: Need a file
|
||||
require.NotEmpty(
|
||||
suite.T(),
|
||||
driveItemID,
|
||||
"no file item found for user %s drive %s",
|
||||
user,
|
||||
driveID,
|
||||
)
|
||||
|
||||
// Read data for the file
|
||||
|
||||
name, itemData, err := driveItemReader(ctx, suite, driveID, driveItemID)
|
||||
require.NoError(suite.T(), err)
|
||||
require.NotEmpty(suite.T(), name)
|
||||
size, err := io.Copy(io.Discard, itemData)
|
||||
require.NoError(suite.T(), err)
|
||||
require.NotZero(suite.T(), size)
|
||||
suite.T().Logf("Read %d bytes from file %s.", size, name)
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user