diff --git a/src/go.mod b/src/go.mod index 0452932e8..3f2b52109 100644 --- a/src/go.mod +++ b/src/go.mod @@ -26,6 +26,7 @@ require ( github.com/tomlazar/table v0.1.2 go.uber.org/zap v1.21.0 golang.org/x/tools v0.1.12 + gopkg.in/resty.v1 v1.12.0 ) require ( diff --git a/src/go.sum b/src/go.sum index bd113ee1f..c873349c4 100644 --- a/src/go.sum +++ b/src/go.sum @@ -464,6 +464,7 @@ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -753,6 +754,8 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI= gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/resty.v1 v1.12.0 h1:CuXP0Pjfw9rOuY6EP+UvtNvt5DSqHpIxILZKT/quCZI= +gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/src/internal/connector/onedrive/drive.go b/src/internal/connector/onedrive/drive.go index a299ea48a..340872af3 100644 --- a/src/internal/connector/onedrive/drive.go +++ b/src/internal/connector/onedrive/drive.go @@ -2,8 +2,10 @@ package onedrive import ( "context" + "fmt" "github.com/microsoftgraph/msgraph-sdk-go/drives/item/root/delta" + "github.com/microsoftgraph/msgraph-sdk-go/me/drives/item/items" "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/pkg/errors" @@ -15,7 +17,8 @@ import ( const ( // nextLinkKey is used to find the next link in a paged // graph response - nextLinkKey = "@odata.nextLink" + nextLinkKey = "@odata.nextLink" + itemChildrenRawURLFmt = "https://graph.microsoft.com/v1.0/drives/%s/items/%s/children" ) // Enumerates the drives for the specified user @@ -31,7 +34,7 @@ func drives(ctx context.Context, service graph.Service, user string) ([]models.D } // itemCollector functions collect the items found in a drive -type itemCollector func(ctx context.Context, driveID string, items []models.DriveItemable) error +type itemCollector func(ctx context.Context, driveID string, driveItems []models.DriveItemable) error // collectItems will enumerate all items in the specified drive and hand them to the // provided `collector` method @@ -70,3 +73,63 @@ func collectItems( } return nil } + +// getFolder will lookup the specified folder name under `parentFolderID` +func getFolder(ctx context.Context, service graph.Service, driveID string, parentFolderID string, + folderName string, +) (models.DriveItemable, error) { + children, err := service.Client().DrivesById(driveID).ItemsById(parentFolderID).Children().Get() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to get children. details: %s", + support.ConnectorStackErrorTrace(err), + ) + } + + for _, item := range children.GetValue() { + if item.GetFolder() == nil || item.GetName() == nil || *item.GetName() != folderName { + continue + } + + return item, nil + } + + return nil, errors.Errorf("folder %s not found in drive(%s) parentFolder(%s)", folderName, driveID, parentFolderID) +} + +// Create a new item in the specified folder +func createItem(ctx context.Context, service graph.Service, driveID string, parentFolderID string, + item models.DriveItemable, +) (models.DriveItemable, error) { + // Graph SDK doesn't yet provide a POST method for `/children` so we set the `rawUrl` ourselves as recommended + // here: https://github.com/microsoftgraph/msgraph-sdk-go/issues/155#issuecomment-1136254310 + rawURL := fmt.Sprintf(itemChildrenRawURLFmt, driveID, parentFolderID) + + builder := items.NewItemsRequestBuilder(rawURL, service.Adapter()) + + newItem, err := builder.Post(item) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to create folder. details: %s", + support.ConnectorStackErrorTrace(err), + ) + } + + return newItem, nil +} + +// newItem initializes a `models.DriveItemable` that can be used as input to `createItem` +func newItem(name string, folder bool) models.DriveItemable { + item := models.NewDriveItem() + item.SetName(&name) + + if folder { + item.SetFolder(models.NewFolder()) + } else { + item.SetFile(models.NewFile()) + } + + return item +} diff --git a/src/internal/connector/onedrive/item.go b/src/internal/connector/onedrive/item.go index 8124f0e3a..0a8859026 100644 --- a/src/internal/connector/onedrive/item.go +++ b/src/internal/connector/onedrive/item.go @@ -1,14 +1,18 @@ package onedrive import ( + "bytes" "context" + "fmt" "io" "net/http" "time" "github.com/pkg/errors" + "gopkg.in/resty.v1" "github.com/alcionai/corso/src/internal/connector/graph" + "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/pkg/logger" ) @@ -51,3 +55,80 @@ func driveItemReader( return *item.GetName(), resp.Body, nil } + +// driveItemWriter is used to initialize and return an io.Writer to upload data for the specified item +// It does so by creating an upload session and using that URL to initialize an `itemWriter` +func driveItemWriter(ctx context.Context, service graph.Service, driveID, itemID string, itemSize int64, +) (io.Writer, error) { + r, err := service.Client().DrivesById(driveID).ItemsById(itemID).CreateUploadSession().Post(nil) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to create upload session for item %s. details: %s", + itemID, + support.ConnectorStackErrorTrace(err), + ) + } + + url := *r.GetUploadUrl() + + logger.Ctx(ctx).Debugf("Created an upload session for item %s. URL: %s", itemID, url) + + return &itemWriter{id: itemID, contentLength: itemSize, url: url}, nil +} + +// itemWriter implements an io.Writer for the OneDrive URL +// it is initialized with +type itemWriter struct { + // Item ID + id string + // Upload URL for this item + url string + // Tracks how much data will be written + contentLength int64 + // Last item offset that was written to + lastWrittenOffset int64 +} + +const ( + contentRangeHeaderKey = "Content-Range" + contentLengthHeaderKey = "Content-Length" + // Format for Content-Length is "bytes -/" + contentLengthHeaderValueFmt = "bytes %d-%d/%d" +) + +// Write will upload the provided data to OneDrive. It sets the `Content-Length` and `Content-Range` headers based on +// https://docs.microsoft.com/en-us/graph/api/driveitem-createuploadsession +func (iw *itemWriter) Write(p []byte) (n int, err error) { + rangeLength := len(p) + logger.Ctx(context.Background()).Debugf("WRITE for %s. Size:%d, Offset: %d, TotalSize: %d", + iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength) + + endOffset := iw.lastWrittenOffset + int64(rangeLength) + + client := resty.New() + + // PUT the request - set headers `Content-Range`to describe total size and `Content-Length` to describe size of + // data in the current request + resp, err := client.R(). + SetHeaders(map[string]string{ + contentRangeHeaderKey: fmt.Sprintf(contentLengthHeaderValueFmt, + iw.lastWrittenOffset, + endOffset-1, + iw.contentLength), + contentLengthHeaderKey: fmt.Sprintf("%d", iw.contentLength), + }). + SetBody(bytes.NewReader(p)).Put(iw.url) + if err != nil { + return 0, errors.Wrapf(err, + "failed to upload item %s. Upload failed at Size:%d, Offset: %d, TotalSize: %d ", + iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength) + } + + // Update last offset + iw.lastWrittenOffset = endOffset + + logger.Ctx(context.Background()).Debugf("Response: %s", resp.String()) + + return rangeLength, nil +} diff --git a/src/internal/connector/onedrive/item_test.go b/src/internal/connector/onedrive/item_test.go index 4da15bb66..2a08460af 100644 --- a/src/internal/connector/onedrive/item_test.go +++ b/src/internal/connector/onedrive/item_test.go @@ -1,9 +1,11 @@ package onedrive import ( + "bytes" "context" "io" "testing" + "time" msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" "github.com/microsoftgraph/msgraph-sdk-go/models" @@ -107,3 +109,63 @@ func (suite *ItemIntegrationSuite) TestItemReader() { require.NotZero(suite.T(), size) suite.T().Logf("Read %d bytes from file %s.", size, name) } + +// TestItemWriter is an integration test for uploading data to OneDrive +// It creates a new `testfolder_ item and writes data to it +func (suite *ItemIntegrationSuite) TestItemWriter() { + ctx := context.TODO() + user := tester.M365UserID(suite.T()) + + drives, err := drives(ctx, suite, user) + require.NoError(suite.T(), err) + // Test Requirement 1: Need a drive + require.Greaterf(suite.T(), len(drives), 0, "user %s does not have a drive", user) + + // Pick the first drive + driveID := *drives[0].GetId() + + root, err := suite.Client().DrivesById(driveID).Root().Get() + require.NoError(suite.T(), err) + + // Test Requirement 2: "Test Folder" should exist + folder, err := getFolder(ctx, suite, driveID, *root.GetId(), "Test Folder") + require.NoError(suite.T(), err) + + newFolderName := "testfolder_" + time.Now().Format("2006-01-02T15-04-05") + suite.T().Logf("Test will create folder %s", newFolderName) + + newFolder, err := createItem(ctx, suite, driveID, *folder.GetId(), newItem(newFolderName, true)) + require.NoError(suite.T(), err) + + require.NotNil(suite.T(), newFolder.GetId()) + + newItemName := "testItem_" + time.Now().Format("2006-01-02T15-04-05") + suite.T().Logf("Test will create item %s", newItemName) + + newItem, err := createItem(ctx, suite, driveID, *newFolder.GetId(), newItem(newItemName, false)) + require.NoError(suite.T(), err) + + require.NotNil(suite.T(), newItem.GetId()) + + // Initialize a 100KB mockDataProvider + td, writeSize := mockDataReader(int64(100 * 1024)) + + w, err := driveItemWriter(ctx, suite, driveID, *newItem.GetId(), writeSize) + require.NoError(suite.T(), err) + + // Using a 32 KB buffer for the copy allows us to validate the + // multi-part upload. `io.CopyBuffer` will only write 32 KB at + // a time + copyBuffer := make([]byte, 32*1024) + + size, err := io.CopyBuffer(w, td, copyBuffer) + require.NoError(suite.T(), err) + + require.Equal(suite.T(), writeSize, size) +} + +func mockDataReader(size int64) (io.Reader, int64) { + data := bytes.Repeat([]byte("D"), int(size)) + return bytes.NewReader(data), size +}