diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 020ed3895..f3a0c549d 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -41,7 +41,11 @@ type Collection struct { } // itemReadFunc returns a reader for the specified item -type itemReaderFunc func(ctx context.Context, itemID string) (name string, itemData io.ReadCloser, err error) +type itemReaderFunc func( + ctx context.Context, + service graph.Service, + driveID, itemID string, +) (name string, itemData io.ReadCloser, err error) // NewCollection creates a Collection func NewCollection(folderPath, driveID string, service graph.Service, @@ -56,18 +60,10 @@ func NewCollection(folderPath, driveID string, service graph.Service, statusCh: statusCh, } // Allows tests to set a mock populator - c.itemReader = c.driveItemReader + c.itemReader = driveItemReader return c } -// TODO: Implement drive item reader -func (oc *Collection) driveItemReader( - ctx context.Context, - itemID string, -) (name string, itemData io.ReadCloser, err error) { - return "", nil, nil -} - // Adds an itemID to the collection // This will make it eligible to be populated func (oc *Collection) Add(itemID string) { @@ -110,7 +106,7 @@ func (oc *Collection) populateItems(ctx context.Context) { itemsRead := 0 for _, itemID := range oc.driveItemIDs { // Read the item - itemName, itemData, err := oc.itemReader(ctx, itemID) + itemName, itemData, err := oc.itemReader(ctx, oc.service, oc.driveID, itemID) if err != nil { errs = support.WrapAndAppendf(itemID, err, errs) if oc.service.ErrPolicy() { diff --git a/src/internal/connector/onedrive/collection_test.go b/src/internal/connector/onedrive/collection_test.go index 8f08e074e..dff7d4090 100644 --- a/src/internal/connector/onedrive/collection_test.go +++ b/src/internal/connector/onedrive/collection_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/internal/connector/graph" "github.com/alcionai/corso/internal/data" ) @@ -52,7 +53,7 @@ func (suite *OnedriveCollectionSuite) TestOnedriveCollection() { // Set a item reader, add an item and validate we get the item back coll.Add(testItemID) - coll.itemReader = func(ctx context.Context, itemID string) (name string, data io.ReadCloser, err error) { + coll.itemReader = func(context.Context, graph.Service, string, string) (string, io.ReadCloser, error) { return testItemName, io.NopCloser(bytes.NewReader(testItemData)), nil } @@ -86,7 +87,7 @@ func (suite *OnedriveCollectionSuite) TestOnedriveCollectionReadError() { readError := errors.New("Test error") - coll.itemReader = func(ctx context.Context, itemID string) (name string, data io.ReadCloser, err error) { + coll.itemReader = func(context.Context, graph.Service, string, string) (name string, data io.ReadCloser, err error) { return "", nil, readError } diff --git a/src/internal/connector/onedrive/drive.go b/src/internal/connector/onedrive/drive.go new file mode 100644 index 000000000..81abb2da9 --- /dev/null +++ b/src/internal/connector/onedrive/drive.go @@ -0,0 +1,72 @@ +package onedrive + +import ( + "context" + + "github.com/microsoftgraph/msgraph-sdk-go/drives/item/root/delta" + "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/pkg/errors" + + "github.com/alcionai/corso/internal/connector/graph" + "github.com/alcionai/corso/internal/connector/support" + "github.com/alcionai/corso/pkg/logger" +) + +const ( + // nextLinkKey is used to find the next link in a paged + // graph response + nextLinkKey = "@odata.nextLink" +) + +// Enumerates the drives for the specified user +func drives(ctx context.Context, service graph.Service, user string) ([]models.Driveable, error) { + r, err := service.Client().UsersById(user).Drives().Get() + if err != nil { + return nil, errors.Wrapf(err, "failed to retrieve user drives. user: %s, details: %s", + user, support.ConnectorStackErrorTrace(err)) + } + logger.Ctx(ctx).Debugf("Found %d drives for user %s", len(r.GetValue()), user) + + return r.GetValue(), nil +} + +// itemCollector functions collect the items found in a drive +type itemCollector func(ctx context.Context, driveID string, items []models.DriveItemable) error + +// collectItems will enumerate all items in the specified drive and hand them to the +// provided `collector` method +func collectItems( + ctx context.Context, + service graph.Service, + driveID string, + collector itemCollector, +) error { + // TODO: Specify a timestamp in the delta query + // https://docs.microsoft.com/en-us/graph/api/driveitem-delta? + // view=graph-rest-1.0&tabs=http#example-4-retrieving-delta-results-using-a-timestamp + builder := service.Client().DrivesById(driveID).Root().Delta() + for { + r, err := builder.Get() + if err != nil { + return errors.Wrapf( + err, + "failed to query drive items. details: %s", + support.ConnectorStackErrorTrace(err), + ) + } + + err = collector(ctx, driveID, r.GetValue()) + if err != nil { + return err + } + + // Check if there are more items + if _, found := r.GetAdditionalData()[nextLinkKey]; !found { + break + } + nextLink := r.GetAdditionalData()[nextLinkKey].(*string) + logger.Ctx(ctx).Debugf("Found %s nextLink", *nextLink) + builder = delta.NewDeltaRequestBuilder(*nextLink, service.Adapter()) + } + return nil +} diff --git a/src/internal/connector/onedrive/item.go b/src/internal/connector/onedrive/item.go new file mode 100644 index 000000000..4233faa66 --- /dev/null +++ b/src/internal/connector/onedrive/item.go @@ -0,0 +1,53 @@ +package onedrive + +import ( + "context" + "io" + "net/http" + "time" + + "github.com/pkg/errors" + + "github.com/alcionai/corso/internal/connector/graph" + "github.com/alcionai/corso/pkg/logger" +) + +const ( + // downloadUrlKey is used to find the download URL in a + // DriveItem response + downloadURLKey = "@microsoft.graph.downloadUrl" +) + +// itemReader will return a io.ReadCloser for the specified item +// It crafts this by querying M365 for a download URL for the item +// and using a http client to initialize a reader +func driveItemReader( + ctx context.Context, + service graph.Service, + driveID, itemID string, +) (string, io.ReadCloser, error) { + logger.Ctx(ctx).Debugf("Reading Item %s at %s", itemID, time.Now()) + + item, err := service.Client().DrivesById(driveID).ItemsById(itemID).Get() + if err != nil { + return "", nil, errors.Wrapf(err, "failed to get item %s", itemID) + } + + // Get the download URL - https://docs.microsoft.com/en-us/graph/api/driveitem-get-content + // These URLs are pre-authenticated and can be used to download the data using the standard + // http client + if _, found := item.GetAdditionalData()[downloadURLKey]; !found { + return "", nil, errors.Errorf("file does not have a download URL. ID: %s, %#v", + itemID, item.GetAdditionalData()) + } + downloadURL := item.GetAdditionalData()[downloadURLKey].(*string) + + // TODO: We should use the `msgraphgocore` http client which has the right + // middleware/options configured + resp, err := http.Get(*downloadURL) + if err != nil { + return "", nil, errors.Wrapf(err, "failed to download file from %s", *downloadURL) + } + + return *item.GetName(), resp.Body, nil +} diff --git a/src/internal/connector/onedrive/item_test.go b/src/internal/connector/onedrive/item_test.go new file mode 100644 index 000000000..746e1275e --- /dev/null +++ b/src/internal/connector/onedrive/item_test.go @@ -0,0 +1,109 @@ +package onedrive + +import ( + "context" + "io" + "testing" + + msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" + "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/internal/connector/graph" + "github.com/alcionai/corso/internal/tester" +) + +type ItemIntegrationSuite struct { + suite.Suite + client *msgraphsdk.GraphServiceClient + adapter *msgraphsdk.GraphRequestAdapter +} + +func (suite *ItemIntegrationSuite) Client() *msgraphsdk.GraphServiceClient { + return suite.client +} + +func (suite *ItemIntegrationSuite) Adapter() *msgraphsdk.GraphRequestAdapter { + return suite.adapter +} + +func (suite *ItemIntegrationSuite) ErrPolicy() bool { + return false +} + +func TestItemIntegrationSuite(t *testing.T) { + if err := tester.RunOnAny( + tester.CorsoCITests, + tester.CorsoGraphConnectorTests, + ); err != nil { + t.Skip(err) + } + suite.Run(t, new(ItemIntegrationSuite)) +} + +func (suite *ItemIntegrationSuite) SetupSuite() { + _, err := tester.GetRequiredEnvVars(tester.M365AcctCredEnvs...) + require.NoError(suite.T(), err) + + a := tester.NewM365Account(suite.T()) + + m365, err := a.M365Config() + require.NoError(suite.T(), err) + + adapter, err := graph.CreateAdapter(m365.TenantID, m365.ClientID, m365.ClientSecret) + require.NoError(suite.T(), err) + suite.client = msgraphsdk.NewGraphServiceClient(adapter) + suite.adapter = adapter +} + +// TestItemReader is an integration test that makes a few assumptions +// about the test environment +// 1) It assumes the test user has a drive +// 2) It assumes the drive has a file it can use to test `driveItemReader` +// The test checks these in below +func (suite *ItemIntegrationSuite) TestItemReader() { + ctx := context.TODO() + user := tester.M365UserID(suite.T()) + + drives, err := drives(ctx, suite, user) + require.NoError(suite.T(), err) + // Test Requirement 1: Need a drive + require.Greaterf(suite.T(), len(drives), 0, "user %s does not have a drive", user) + + // Pick the first drive + driveID := *drives[0].GetId() + + var driveItemID string + // This item collector tries to find "a" drive item that is a file to test the reader function + itemCollector := func(ctx context.Context, driveID string, items []models.DriveItemable) error { + for _, item := range items { + if item.GetFile() != nil { + driveItemID = *item.GetId() + break + } + } + return nil + } + err = collectItems(ctx, suite, driveID, itemCollector) + require.NoError(suite.T(), err) + + // Test Requirement 2: Need a file + require.NotEmpty( + suite.T(), + driveItemID, + "no file item found for user %s drive %s", + user, + driveID, + ) + + // Read data for the file + + name, itemData, err := driveItemReader(ctx, suite, driveID, driveItemID) + require.NoError(suite.T(), err) + require.NotEmpty(suite.T(), name) + size, err := io.Copy(io.Discard, itemData) + require.NoError(suite.T(), err) + require.NotZero(suite.T(), size) + suite.T().Logf("Read %d bytes from file %s.", size, name) +}