Use an upload session for large attachments (#1148)

## Description

Use an upload session and chunked writes for large attachments.

This commit moves the logic used for OneDrive file upload to the common `uploadsession` pkg and 
leverages that for the upload.

## Type of change

- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🐹 Trivial/Minor

## Issue(s)

* #1115 

## Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Vaibhav Kamra 2022-10-12 20:35:40 -07:00 committed by GitHub
parent 0c921e8b40
commit f1f6c06ba0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 343 additions and 72 deletions

View File

@ -0,0 +1,117 @@
package exchange
import (
"bytes"
"context"
"io"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/microsoftgraph/msgraph-sdk-go/users/item/messages/item/attachments/createuploadsession"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/connector/uploadsession"
"github.com/alcionai/corso/src/pkg/logger"
)
const (
// Use large attachment logic for attachments > 3MB
// https://learn.microsoft.com/en-us/graph/outlook-large-attachments
largeAttachmentSize = int32(3 * 1024 * 1024)
attachmentChunkSize = 4 * 1024 * 1024
fileAttachmentOdataValue = "#microsoft.graph.fileAttachment"
itemAttachmentOdataValue = "#microsoft.graph.itemAttachment"
referenceAttachmentOdataValue = "#microsoft.graph.referenceAttachment"
)
func attachmentType(attachment models.Attachmentable) models.AttachmentType {
switch *attachment.GetOdataType() {
case fileAttachmentOdataValue:
return models.FILE_ATTACHMENTTYPE
case itemAttachmentOdataValue:
return models.ITEM_ATTACHMENTTYPE
case referenceAttachmentOdataValue:
return models.REFERENCE_ATTACHMENTTYPE
default:
// Should not hit this but default to ITEM_ATTACHMENTTYPE
// which will pick the default attachment upload mechanism
return models.ITEM_ATTACHMENTTYPE
}
}
// uploadAttachment will upload the specified message attachment to M365
func uploadAttachment(ctx context.Context, service graph.Service, userID, folderID, messageID string,
attachment models.Attachmentable,
) error {
logger.Ctx(ctx).Debugf("uploading attachment with size %d", *attachment.GetSize())
// For Item/Reference attachments *or* file attachments < 3MB, use the attachments endpoint
if attachmentType(attachment) != models.FILE_ATTACHMENTTYPE || *attachment.GetSize() < largeAttachmentSize {
_, err := service.Client().
UsersById(userID).
MailFoldersById(folderID).
MessagesById(messageID).
Attachments().
Post(ctx, attachment, nil)
return err
}
return uploadLargeAttachment(ctx, service, userID, folderID, messageID, attachment)
}
// uploadLargeAttachment will upload the specified attachment by creating an upload session and
// doing a chunked upload
func uploadLargeAttachment(ctx context.Context, service graph.Service, userID, folderID, messageID string,
attachment models.Attachmentable,
) error {
ab := attachmentBytes(attachment)
aw, err := attachmentWriter(ctx, service, userID, folderID, messageID, attachment, int64(len(ab)))
if err != nil {
return err
}
// Upload the stream data
copyBuffer := make([]byte, attachmentChunkSize)
_, err = io.CopyBuffer(aw, bytes.NewReader(ab), copyBuffer)
if err != nil {
return errors.Wrapf(err, "failed to upload attachment: item %s", messageID)
}
return nil
}
// attachmentWriter is used to initialize and return an io.Writer to upload data for the specified attachment
// It does so by creating an upload session and using that URL to initialize an `itemWriter`
func attachmentWriter(ctx context.Context, service graph.Service, userID, folderID, messageID string,
attachment models.Attachmentable, size int64,
) (io.Writer, error) {
session := createuploadsession.NewCreateUploadSessionPostRequestBody()
attItem := models.NewAttachmentItem()
attType := models.FILE_ATTACHMENTTYPE
attItem.SetAttachmentType(&attType)
attItem.SetName(attachment.GetName())
attItem.SetSize(&size)
session.SetAttachmentItem(attItem)
r, err := service.Client().UsersById(userID).MailFoldersById(folderID).
MessagesById(messageID).Attachments().CreateUploadSession().Post(ctx, session, nil)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to create attachment upload session for item %s. details: %s",
messageID,
support.ConnectorStackErrorTrace(err),
)
}
url := *r.GetUploadUrl()
logger.Ctx(ctx).Debugf("Created an upload session for item %s. URL: %s", messageID, url)
return uploadsession.NewWriter(messageID, url, size), nil
}

View File

@ -554,6 +554,19 @@ func (suite *ExchangeServiceSuite) TestRestoreExchangeObject() {
return *folder.GetId() return *folder.GetId()
}, },
}, },
{
name: "Test Mail: One Large Attachment",
bytes: mockconnector.GetMockMessageWithLargeAttachment("Restore Large Attachment"),
category: path.EmailCategory,
cleanupFunc: DeleteMailFolder,
destination: func() string {
folderName := "TestRestoreMailwithLargeAttachment: " + common.FormatSimpleDateTime(now)
folder, err := CreateMailFolder(ctx, suite.es, userID, folderName)
require.NoError(t, err)
return *folder.GetId()
},
},
{ {
name: "Test Mail: Two Attachments", name: "Test Mail: Two Attachments",
bytes: mockconnector.GetMockMessageWithTwoAttachments("Restore 2 Attachments"), bytes: mockconnector.GetMockMessageWithTwoAttachments("Restore 2 Attachments"),

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"reflect"
"runtime/trace" "runtime/trace"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
@ -20,7 +21,8 @@ import (
) )
// GetRestoreContainer utility function to create // GetRestoreContainer utility function to create
// an unique folder for the restore process // an unique folder for the restore process
//
// @param category: input from fullPath()[2] // @param category: input from fullPath()[2]
// that defines the application the folder is created in. // that defines the application the folder is created in.
func GetRestoreContainer( func GetRestoreContainer(
@ -223,8 +225,19 @@ func RestoreMailMessage(
"policy", cp) "policy", cp)
fallthrough fallthrough
case control.Copy: case control.Copy:
return MessageInfo(clone), SendMailToBackStore(ctx, service, user, destination, clone) err := SendMailToBackStore(ctx, service, user, destination, clone)
if err != nil {
return nil, err
}
} }
return MessageInfo(clone), nil
}
// attachmentBytes is a helper to retrieve the attachment content from a models.Attachmentable
// TODO: Revisit how we retrieve/persist attachment content during backup so this is not needed
func attachmentBytes(attachment models.Attachmentable) []byte {
return reflect.Indirect(reflect.ValueOf(attachment)).FieldByName("contentBytes").Bytes()
} }
// SendMailToBackStore function for transporting in-memory messageable item to M365 backstore // SendMailToBackStore function for transporting in-memory messageable item to M365 backstore
@ -261,24 +274,19 @@ func SendMailToBackStore(
if len(attached) > 0 { if len(attached) > 0 {
id := *sentMessage.GetId() id := *sentMessage.GetId()
for _, attachment := range attached { for _, attachment := range attached {
_, err = service.Client(). err := uploadAttachment(ctx, service, user, destination, id, attachment)
UsersById(user).
MailFoldersById(destination).
MessagesById(id).
Attachments().
Post(ctx, attachment, nil)
if err != nil { if err != nil {
errs = support.WrapAndAppend(id, errs = support.WrapAndAppend(fmt.Sprintf("uploading attachment for message %s", id),
err, err,
errs, errs,
) )
break
} }
} }
return errs
} }
return nil return errs
} }
// RestoreExchangeDataCollections restores M365 objects in data.Collection to MSFT // RestoreExchangeDataCollections restores M365 objects in data.Collection to MSFT

View File

@ -1,6 +1,7 @@
package mockconnector package mockconnector
import ( import (
"encoding/base64"
"fmt" "fmt"
"math/rand" "math/rand"
"strconv" "strconv"
@ -166,6 +167,34 @@ func GetMockMessageWithDirectAttachment(subject string) []byte {
return []byte(message) return []byte(message)
} }
// GetMockMessageWithDirectAttachment returns a message with a large attachment. This is derived from the message
// used in GetMockMessageWithDirectAttachment
// Serialized with: kiota-serialization-json-go v0.7.1
func GetMockMessageWithLargeAttachment(subject string) []byte {
//nolint:lll
messageFmt := "{\"id\":\"AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwBGAAAAAADCNgjhM9QmQYWNcI7hCpPrBwDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAADSEBNbUIB9RL6ePDeF3FIYAAB4moqeAAA=\"," +
"\"@odata.type\":\"#microsoft.graph.message\",\"@odata.etag\":\"W/\\\"CQAAABYAAADSEBNbUIB9RL6ePDeF3FIYAAB3maFQ\\\"\",\"@odata.context\":\"https://graph.microsoft.com/v1.0/$metadata#users('a4a472f8-ccb0-43ec-bf52-3697a91b926c')/messages/$entity\",\"categories\":[]," +
"\"changeKey\":\"CQAAABYAAADSEBNbUIB9RL6ePDeF3FIYAAB3maFQ\",\"createdDateTime\":\"2022-09-29T17:39:06Z\",\"lastModifiedDateTime\":\"2022-09-29T17:39:08Z\"," +
"\"attachments\":[{\"id\":\"AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwBGAAAAAADCNgjhM9QmQYWNcI7hCpPrBwDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAADSEBNbUIB9RL6ePDeF3FIYAAB4moqeAAABEgAQANMmZLFhjWJJj4X9mj8piqg=\",\"@odata.type\":\"#microsoft.graph.fileAttachment\",\"@odata.mediaContentType\":\"application/octet-stream\"," +
"\"contentType\":\"application/octet-stream\",\"isInline\":false,\"lastModifiedDateTime\":\"2022-09-29T17:39:06Z\",\"name\":\"database.db\",\"size\":%d," +
"\"contentBytes\":\"%s\"}]," +
"\"bccRecipients\":[],\"body\":{\"content\":\"<html><head>\\r\\n<meta http-equiv=\\\"Content-Type\\\" content=\\\"text/html; charset=utf-8\\\"><style type=\\\"text/css\\\" style=\\\"display:none\\\">\\r\\n<!--\\r\\np\\r\\n\\t{margin-top:0;\\r\\n\\tmargin-bottom:0}\\r\\n-->\\r\\n</style></head><body dir=\\\"ltr\\\"><div class=\\\"elementToProof\\\" style=\\\"font-family:Calibri,Arial,Helvetica,sans-serif; font-size:12pt; color:rgb(0,0,0)\\\"><span class=\\\"x_elementToProof ContentPasted0\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\">Lidia,</span> <div class=\\\"x_elementToProof\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\"><br class=\\\"ContentPasted0\\\"></div><div class=\\\"x_elementToProof ContentPasted0\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\">I hope this message finds you well. I am researching a database construct for next quarter's review. SkyNet will<span class=\\\"ContentPasted0\\\">&nbsp;</span><span data-ogsb=\\\"rgb(255, 255, 0)\\\" class=\\\"ContentPasted0\\\" style=\\\"margin:0px; background-color:rgb(255,255,0)!important\\\">not</span><span class=\\\"ContentPasted0\\\">&nbsp;</span>be able to match our database process speeds if we utilize the formulae that are included.&nbsp;</div><div class=\\\"x_elementToProof\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\"><br class=\\\"ContentPasted0\\\"></div><div class=\\\"x_elementToProof ContentPasted0\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\">Please give me your thoughts on the implementation.</div><div class=\\\"x_elementToProof\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\"><br class=\\\"ContentPasted0\\\"></div><div class=\\\"x_elementToProof ContentPasted0\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\">Best,</div><div class=\\\"x_elementToProof\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\"><br class=\\\"ContentPasted0\\\"></div><span class=\\\"x_elementToProof ContentPasted0\\\" data-ogsc=\\\"rgb(0, 0, 0)\\\" data-ogsb=\\\"rgb(255, 255, 255)\\\" style=\\\"font-size:12pt; margin:0px; color:rgb(0,0,0)!important; background-color:rgb(255,255,255)!important\\\">Dustin</span><br></div></body></html>\",\"contentType\":\"html\",\"@odata.type\":\"#microsoft.graph.itemBody\"}," +
"\"bodyPreview\":\"Lidia,\\r\\n\\r\\nI hope this message finds you well. I am researching a database construct for next quarter's review. SkyNet will not be able to match our database process speeds if we utilize the formulae that are included.\\r\\n\\r\\nPlease give me your thoughts on th\",\"ccRecipients\":[]," +
"\"conversationId\":\"AAQkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwAQANPFOcy_BapBghezTzIIldI=\",\"conversationIndex\":\"AQHY1Cpb08U5zL4FqkGCF7NPMgiV0g==\",\"flag\":{\"flagStatus\":\"notFlagged\",\"@odata.type\":\"#microsoft.graph.followupFlag\"}," +
"\"from\":{\"emailAddress\":{\"address\":\"dustina@8qzvrj.onmicrosoft.com\",\"name\":\"Dustin Abbot\",\"@odata.type\":\"#microsoft.graph.emailAddress\"},\"@odata.type\":\"#microsoft.graph.recipient\"},\"hasAttachments\":true,\"importance\":\"normal\",\"inferenceClassification\":\"focused\"," +
"\"internetMessageId\":\"<SJ0PR17MB56220C509D0006B8CC8FD952C3579@SJ0PR17MB5622.namprd17.prod.outlook.com>\",\"isDeliveryReceiptRequested\":false,\"isDraft\":false,\"isRead\":false,\"isReadReceiptRequested\":false," +
"\"parentFolderId\":\"AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwAuAAAAAADCNgjhM9QmQYWNcI7hCpPrAQDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAAA=\",\"receivedDateTime\":\"2022-09-29T17:39:07Z\",\"replyTo\":[],\"sender\":{\"emailAddress\":{\"address\":\"dustina@8qzvrj.onmicrosoft.com\",\"name\":\"Dustin Abbot\"," +
"\"@odata.type\":\"#microsoft.graph.emailAddress\"},\"@odata.type\":\"#microsoft.graph.recipient\"},\"sentDateTime\":\"2022-09-29T17:39:02Z\"," +
"\"subject\":\"" + subject + "\",\"toRecipients\":[{\"emailAddress\":{\"address\":\"LidiaH@8qzvrj.onmicrosoft.com\",\"name\":\"Lidia Holloway\",\"@odata.type\":\"#microsoft.graph.emailAddress\"},\"@odata.type\":\"#microsoft.graph.recipient\"}]," +
"\"webLink\":\"https://outlook.office365.com/owa/?ItemID=AAMkAGZmNjNlYjI3LWJlZWYtNGI4Mi04YjMyLTIxYThkNGQ4NmY1MwBGAAAAAADCNgjhM9QmQYWNcI7hCpPrBwDSEBNbUIB9RL6ePDeF3FIYAAAAAAEMAADSEBNbUIB9RL6ePDeF3FIYAAB4moqeAAA%3D&exvsurl=1&viewmodel=ReadMessageItem\"}"
attachmentSize := 3 * 1024 * 1024 // 3 MB
attachmentBytes := make([]byte, attachmentSize)
// Attachment content bytes are base64 encoded
return []byte(fmt.Sprintf(messageFmt, attachmentSize, base64.StdEncoding.EncodeToString([]byte(attachmentBytes))))
}
// GetMessageWithOneDriveAttachment returns a message with an OneDrive attachment represented in bytes // GetMessageWithOneDriveAttachment returns a message with an OneDrive attachment represented in bytes
// Serialized with: kiota-serialization-json-go v0.7.1 // Serialized with: kiota-serialization-json-go v0.7.1
func GetMessageWithOneDriveAttachment(subject string) []byte { func GetMessageWithOneDriveAttachment(subject string) []byte {

View File

@ -1,9 +1,7 @@
package onedrive package onedrive
import ( import (
"bytes"
"context" "context"
"fmt"
"io" "io"
"net/http" "net/http"
"time" "time"
@ -11,11 +9,11 @@ import (
msup "github.com/microsoftgraph/msgraph-sdk-go/drives/item/items/item/createuploadsession" msup "github.com/microsoftgraph/msgraph-sdk-go/drives/item/items/item/createuploadsession"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/pkg/errors" "github.com/pkg/errors"
"gopkg.in/resty.v1"
"github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/common"
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/connector/uploadsession"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
) )
@ -103,61 +101,5 @@ func driveItemWriter(
logger.Ctx(ctx).Debugf("Created an upload session for item %s. URL: %s", itemID, url) logger.Ctx(ctx).Debugf("Created an upload session for item %s. URL: %s", itemID, url)
return &itemWriter{id: itemID, contentLength: itemSize, url: url}, nil return uploadsession.NewWriter(itemID, url, itemSize), nil
}
// itemWriter implements an io.Writer for the OneDrive URL
// it is initialized with
type itemWriter struct {
// Item ID
id string
// Upload URL for this item
url string
// Tracks how much data will be written
contentLength int64
// Last item offset that was written to
lastWrittenOffset int64
}
const (
contentRangeHeaderKey = "Content-Range"
// Format for Content-Range is "bytes <start>-<end>/<total>"
contentRangeHeaderValueFmt = "bytes %d-%d/%d"
contentLengthHeaderKey = "Content-Length"
)
// Write will upload the provided data to OneDrive. It sets the `Content-Length` and `Content-Range` headers based on
// https://docs.microsoft.com/en-us/graph/api/driveitem-createuploadsession
func (iw *itemWriter) Write(p []byte) (n int, err error) {
rangeLength := len(p)
logger.Ctx(context.Background()).Debugf("WRITE for %s. Size:%d, Offset: %d, TotalSize: %d",
iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength)
endOffset := iw.lastWrittenOffset + int64(rangeLength)
client := resty.New()
// PUT the request - set headers `Content-Range`to describe total size and `Content-Length` to describe size of
// data in the current request
resp, err := client.R().
SetHeaders(map[string]string{
contentRangeHeaderKey: fmt.Sprintf(contentRangeHeaderValueFmt,
iw.lastWrittenOffset,
endOffset-1,
iw.contentLength),
contentLengthHeaderKey: fmt.Sprintf("%d", iw.contentLength),
}).
SetBody(bytes.NewReader(p)).Put(iw.url)
if err != nil {
return 0, errors.Wrapf(err,
"failed to upload item %s. Upload failed at Size:%d, Offset: %d, TotalSize: %d ",
iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength)
}
// Update last offset
iw.lastWrittenOffset = endOffset
logger.Ctx(context.Background()).Debugf("Response: %s", resp.String())
return rangeLength, nil
} }

View File

@ -0,0 +1,71 @@
package uploadsession
import (
"bytes"
"context"
"fmt"
"github.com/pkg/errors"
"gopkg.in/resty.v1"
"github.com/alcionai/corso/src/pkg/logger"
)
const (
contentRangeHeaderKey = "Content-Range"
// Format for Content-Range is "bytes <start>-<end>/<total>"
contentRangeHeaderValueFmt = "bytes %d-%d/%d"
contentLengthHeaderKey = "Content-Length"
)
// Writer implements an io.Writer for a M365
// UploadSession URL
type writer struct {
// Identifier
id string
// Upload URL for this item
url string
// Tracks how much data will be written
contentLength int64
// Last item offset that was written to
lastWrittenOffset int64
client *resty.Client
}
func NewWriter(id, url string, size int64) *writer {
return &writer{id: id, url: url, contentLength: size, client: resty.New()}
}
// Write will upload the provided data to M365. It sets the `Content-Length` and `Content-Range` headers based on
// https://docs.microsoft.com/en-us/graph/api/driveitem-createuploadsession
func (iw *writer) Write(p []byte) (n int, err error) {
rangeLength := len(p)
logger.Ctx(context.Background()).Debugf("WRITE for %s. Size:%d, Offset: %d, TotalSize: %d",
iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength)
endOffset := iw.lastWrittenOffset + int64(rangeLength)
// PUT the request - set headers `Content-Range`to describe total size and `Content-Length` to describe size of
// data in the current request
resp, err := iw.client.R().
SetHeaders(map[string]string{
contentRangeHeaderKey: fmt.Sprintf(contentRangeHeaderValueFmt,
iw.lastWrittenOffset,
endOffset-1,
iw.contentLength),
contentLengthHeaderKey: fmt.Sprintf("%d", rangeLength),
}).
SetBody(bytes.NewReader(p)).Put(iw.url)
if err != nil {
return 0, errors.Wrapf(err,
"failed to upload item %s. Upload failed at Size:%d, Offset: %d, TotalSize: %d ",
iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength)
}
// Update last offset
iw.lastWrittenOffset = endOffset
logger.Ctx(context.Background()).Debugf("Response: %s", resp.String())
return rangeLength, nil
}

View File

@ -0,0 +1,91 @@
package uploadsession
import (
"bytes"
"fmt"
"io"
"net/http"
"net/http/httptest"
"regexp"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
type UploadSessionSuite struct {
suite.Suite
}
func TestUploadSessionSuite(t *testing.T) {
suite.Run(t, new(UploadSessionSuite))
}
func (suite *UploadSessionSuite) TestWriter() {
t := suite.T()
// Initialize a 100KB mockDataProvider
td, writeSize := mockDataReader(int64(100 * 1024))
// Expected Content-Range value format
contentRangeRegex := regexp.MustCompile(`^bytes (?P<rangestart>\d+)-(?P<rangeend>\d+)/(?P<length>\d+)$`)
nextOffset := -1
// Initialize a test http server that validates expeected headers
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, r.Method, http.MethodPut)
// Validate the "Content-Range" header
assert.True(t, contentRangeRegex.MatchString(r.Header[contentRangeHeaderKey][0]),
"%s does not match expected value", r.Header[contentRangeHeaderKey][0])
// Extract the Content-Range components
matches := contentRangeRegex.FindStringSubmatch(r.Header[contentRangeHeaderKey][0])
rangeStart, err := strconv.Atoi(matches[contentRangeRegex.SubexpIndex("rangestart")])
assert.NoError(t, err)
rangeEnd, err := strconv.Atoi(matches[contentRangeRegex.SubexpIndex("rangeend")])
assert.NoError(t, err)
length, err := strconv.Atoi(matches[contentRangeRegex.SubexpIndex("length")])
assert.NoError(t, err)
// Validate total size and range start/end
assert.Equal(t, int(writeSize), length)
assert.Equal(t, nextOffset+1, rangeStart)
assert.Greater(t, rangeEnd, nextOffset)
// Validate the "Content-Length" header
assert.Equal(t, fmt.Sprintf("%d", (rangeEnd+1)-rangeStart), r.Header[contentLengthHeaderKey][0])
nextOffset = rangeEnd
}))
defer ts.Close()
writer := NewWriter("item", ts.URL, writeSize)
// Using a 32 KB buffer for the copy allows us to validate the
// multi-part upload. `io.CopyBuffer` will only write 32 KB at
// a time
copyBuffer := make([]byte, 32*1024)
size, err := io.CopyBuffer(writer, td, copyBuffer)
require.NoError(suite.T(), err)
require.Equal(suite.T(), writeSize, size)
}
func mockDataReader(size int64) (io.Reader, int64) {
data := bytes.Repeat([]byte("D"), int(size))
return &mockReader{r: bytes.NewReader(data)}, size
}
// mockReader allows us to wrap a `bytes.NewReader` but *disable*
// ReaderFrom functionality. This forces io.CopyBuffer to do a
// buffered read (useful to validate that chunked writes are working)
type mockReader struct {
r io.Reader
}
func (mr *mockReader) Read(b []byte) (n int, err error) {
return mr.r.Read(b)
}