Implement OneDrive item upload (#843)
## Description This implements helper methods that support creating new items and uploading data from a `io.Reader` to OneDrive. It includes an integration test that uses mock data. ## Type of change <!--- Please check the type of change your PR introduces: ---> - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Test - [ ] 💻 CI/Deployment - [ ] 🐹 Trivial/Minor ## Issue(s) <!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. --> * #668 ## Test Plan <!-- How will this be tested prior to merging.--> - [ ] 💪 Manual - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
1f4a490c33
commit
8587e7d1c2
@ -26,6 +26,7 @@ require (
|
||||
github.com/tomlazar/table v0.1.2
|
||||
go.uber.org/zap v1.21.0
|
||||
golang.org/x/tools v0.1.12
|
||||
gopkg.in/resty.v1 v1.12.0
|
||||
)
|
||||
|
||||
require (
|
||||
|
||||
@ -464,6 +464,7 @@ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
@ -753,6 +754,8 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8
|
||||
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
||||
gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI=
|
||||
gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/resty.v1 v1.12.0 h1:CuXP0Pjfw9rOuY6EP+UvtNvt5DSqHpIxILZKT/quCZI=
|
||||
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
|
||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
||||
@ -2,8 +2,10 @@ package onedrive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/drives/item/root/delta"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/me/drives/item/items"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@ -15,7 +17,8 @@ import (
|
||||
const (
|
||||
// nextLinkKey is used to find the next link in a paged
|
||||
// graph response
|
||||
nextLinkKey = "@odata.nextLink"
|
||||
nextLinkKey = "@odata.nextLink"
|
||||
itemChildrenRawURLFmt = "https://graph.microsoft.com/v1.0/drives/%s/items/%s/children"
|
||||
)
|
||||
|
||||
// Enumerates the drives for the specified user
|
||||
@ -31,7 +34,7 @@ func drives(ctx context.Context, service graph.Service, user string) ([]models.D
|
||||
}
|
||||
|
||||
// itemCollector functions collect the items found in a drive
|
||||
type itemCollector func(ctx context.Context, driveID string, items []models.DriveItemable) error
|
||||
type itemCollector func(ctx context.Context, driveID string, driveItems []models.DriveItemable) error
|
||||
|
||||
// collectItems will enumerate all items in the specified drive and hand them to the
|
||||
// provided `collector` method
|
||||
@ -70,3 +73,63 @@ func collectItems(
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getFolder will lookup the specified folder name under `parentFolderID`
|
||||
func getFolder(ctx context.Context, service graph.Service, driveID string, parentFolderID string,
|
||||
folderName string,
|
||||
) (models.DriveItemable, error) {
|
||||
children, err := service.Client().DrivesById(driveID).ItemsById(parentFolderID).Children().Get()
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(
|
||||
err,
|
||||
"failed to get children. details: %s",
|
||||
support.ConnectorStackErrorTrace(err),
|
||||
)
|
||||
}
|
||||
|
||||
for _, item := range children.GetValue() {
|
||||
if item.GetFolder() == nil || item.GetName() == nil || *item.GetName() != folderName {
|
||||
continue
|
||||
}
|
||||
|
||||
return item, nil
|
||||
}
|
||||
|
||||
return nil, errors.Errorf("folder %s not found in drive(%s) parentFolder(%s)", folderName, driveID, parentFolderID)
|
||||
}
|
||||
|
||||
// Create a new item in the specified folder
|
||||
func createItem(ctx context.Context, service graph.Service, driveID string, parentFolderID string,
|
||||
item models.DriveItemable,
|
||||
) (models.DriveItemable, error) {
|
||||
// Graph SDK doesn't yet provide a POST method for `/children` so we set the `rawUrl` ourselves as recommended
|
||||
// here: https://github.com/microsoftgraph/msgraph-sdk-go/issues/155#issuecomment-1136254310
|
||||
rawURL := fmt.Sprintf(itemChildrenRawURLFmt, driveID, parentFolderID)
|
||||
|
||||
builder := items.NewItemsRequestBuilder(rawURL, service.Adapter())
|
||||
|
||||
newItem, err := builder.Post(item)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(
|
||||
err,
|
||||
"failed to create folder. details: %s",
|
||||
support.ConnectorStackErrorTrace(err),
|
||||
)
|
||||
}
|
||||
|
||||
return newItem, nil
|
||||
}
|
||||
|
||||
// newItem initializes a `models.DriveItemable` that can be used as input to `createItem`
|
||||
func newItem(name string, folder bool) models.DriveItemable {
|
||||
item := models.NewDriveItem()
|
||||
item.SetName(&name)
|
||||
|
||||
if folder {
|
||||
item.SetFolder(models.NewFolder())
|
||||
} else {
|
||||
item.SetFile(models.NewFile())
|
||||
}
|
||||
|
||||
return item
|
||||
}
|
||||
|
||||
@ -1,14 +1,18 @@
|
||||
package onedrive
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"gopkg.in/resty.v1"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
)
|
||||
|
||||
@ -51,3 +55,80 @@ func driveItemReader(
|
||||
|
||||
return *item.GetName(), resp.Body, nil
|
||||
}
|
||||
|
||||
// driveItemWriter is used to initialize and return an io.Writer to upload data for the specified item
|
||||
// It does so by creating an upload session and using that URL to initialize an `itemWriter`
|
||||
func driveItemWriter(ctx context.Context, service graph.Service, driveID, itemID string, itemSize int64,
|
||||
) (io.Writer, error) {
|
||||
r, err := service.Client().DrivesById(driveID).ItemsById(itemID).CreateUploadSession().Post(nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(
|
||||
err,
|
||||
"failed to create upload session for item %s. details: %s",
|
||||
itemID,
|
||||
support.ConnectorStackErrorTrace(err),
|
||||
)
|
||||
}
|
||||
|
||||
url := *r.GetUploadUrl()
|
||||
|
||||
logger.Ctx(ctx).Debugf("Created an upload session for item %s. URL: %s", itemID, url)
|
||||
|
||||
return &itemWriter{id: itemID, contentLength: itemSize, url: url}, nil
|
||||
}
|
||||
|
||||
// itemWriter implements an io.Writer for the OneDrive URL
|
||||
// it is initialized with
|
||||
type itemWriter struct {
|
||||
// Item ID
|
||||
id string
|
||||
// Upload URL for this item
|
||||
url string
|
||||
// Tracks how much data will be written
|
||||
contentLength int64
|
||||
// Last item offset that was written to
|
||||
lastWrittenOffset int64
|
||||
}
|
||||
|
||||
const (
|
||||
contentRangeHeaderKey = "Content-Range"
|
||||
contentLengthHeaderKey = "Content-Length"
|
||||
// Format for Content-Length is "bytes <start>-<end>/<total>"
|
||||
contentLengthHeaderValueFmt = "bytes %d-%d/%d"
|
||||
)
|
||||
|
||||
// Write will upload the provided data to OneDrive. It sets the `Content-Length` and `Content-Range` headers based on
|
||||
// https://docs.microsoft.com/en-us/graph/api/driveitem-createuploadsession
|
||||
func (iw *itemWriter) Write(p []byte) (n int, err error) {
|
||||
rangeLength := len(p)
|
||||
logger.Ctx(context.Background()).Debugf("WRITE for %s. Size:%d, Offset: %d, TotalSize: %d",
|
||||
iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength)
|
||||
|
||||
endOffset := iw.lastWrittenOffset + int64(rangeLength)
|
||||
|
||||
client := resty.New()
|
||||
|
||||
// PUT the request - set headers `Content-Range`to describe total size and `Content-Length` to describe size of
|
||||
// data in the current request
|
||||
resp, err := client.R().
|
||||
SetHeaders(map[string]string{
|
||||
contentRangeHeaderKey: fmt.Sprintf(contentLengthHeaderValueFmt,
|
||||
iw.lastWrittenOffset,
|
||||
endOffset-1,
|
||||
iw.contentLength),
|
||||
contentLengthHeaderKey: fmt.Sprintf("%d", iw.contentLength),
|
||||
}).
|
||||
SetBody(bytes.NewReader(p)).Put(iw.url)
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err,
|
||||
"failed to upload item %s. Upload failed at Size:%d, Offset: %d, TotalSize: %d ",
|
||||
iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength)
|
||||
}
|
||||
|
||||
// Update last offset
|
||||
iw.lastWrittenOffset = endOffset
|
||||
|
||||
logger.Ctx(context.Background()).Debugf("Response: %s", resp.String())
|
||||
|
||||
return rangeLength, nil
|
||||
}
|
||||
|
||||
@ -1,9 +1,11 @@
|
||||
package onedrive
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
@ -107,3 +109,63 @@ func (suite *ItemIntegrationSuite) TestItemReader() {
|
||||
require.NotZero(suite.T(), size)
|
||||
suite.T().Logf("Read %d bytes from file %s.", size, name)
|
||||
}
|
||||
|
||||
// TestItemWriter is an integration test for uploading data to OneDrive
|
||||
// It creates a new `testfolder_<timestamp` folder with a new
|
||||
// testitem_<timestamp> item and writes data to it
|
||||
func (suite *ItemIntegrationSuite) TestItemWriter() {
|
||||
ctx := context.TODO()
|
||||
user := tester.M365UserID(suite.T())
|
||||
|
||||
drives, err := drives(ctx, suite, user)
|
||||
require.NoError(suite.T(), err)
|
||||
// Test Requirement 1: Need a drive
|
||||
require.Greaterf(suite.T(), len(drives), 0, "user %s does not have a drive", user)
|
||||
|
||||
// Pick the first drive
|
||||
driveID := *drives[0].GetId()
|
||||
|
||||
root, err := suite.Client().DrivesById(driveID).Root().Get()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
// Test Requirement 2: "Test Folder" should exist
|
||||
folder, err := getFolder(ctx, suite, driveID, *root.GetId(), "Test Folder")
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
newFolderName := "testfolder_" + time.Now().Format("2006-01-02T15-04-05")
|
||||
suite.T().Logf("Test will create folder %s", newFolderName)
|
||||
|
||||
newFolder, err := createItem(ctx, suite, driveID, *folder.GetId(), newItem(newFolderName, true))
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
require.NotNil(suite.T(), newFolder.GetId())
|
||||
|
||||
newItemName := "testItem_" + time.Now().Format("2006-01-02T15-04-05")
|
||||
suite.T().Logf("Test will create item %s", newItemName)
|
||||
|
||||
newItem, err := createItem(ctx, suite, driveID, *newFolder.GetId(), newItem(newItemName, false))
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
require.NotNil(suite.T(), newItem.GetId())
|
||||
|
||||
// Initialize a 100KB mockDataProvider
|
||||
td, writeSize := mockDataReader(int64(100 * 1024))
|
||||
|
||||
w, err := driveItemWriter(ctx, suite, driveID, *newItem.GetId(), writeSize)
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
// Using a 32 KB buffer for the copy allows us to validate the
|
||||
// multi-part upload. `io.CopyBuffer` will only write 32 KB at
|
||||
// a time
|
||||
copyBuffer := make([]byte, 32*1024)
|
||||
|
||||
size, err := io.CopyBuffer(w, td, copyBuffer)
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
require.Equal(suite.T(), writeSize, size)
|
||||
}
|
||||
|
||||
func mockDataReader(size int64) (io.Reader, int64) {
|
||||
data := bytes.Repeat([]byte("D"), int(size))
|
||||
return bytes.NewReader(data), size
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user