use graph client for large data set (#3355)
<!-- PR description--> - Use graph client instead of Resty client. This brings uniformity and will help in utilising other features like Middlewares from graph client wrapper - Moved the upload session to graph package to use Graph http-client wrapper #### Does this PR need a docs update or release note? - [x] ⛔ No #### Type of change <!--- Please check the type of change your PR introduces: ---> - [x] 🧹 Tech Debt/Cleanup #### Issue(s) <!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. --> * https://github.com/alcionai/corso/issues/2300 #### Test Plan <!-- How will this be tested prior to merging.--> - [x] 💪 Manual
This commit is contained in:
parent
f8252b4cd3
commit
2ad32660e8
@ -11,7 +11,6 @@ import (
|
|||||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
"github.com/alcionai/corso/src/internal/connector/uploadsession"
|
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -106,7 +105,7 @@ func uploadLargeAttachment(
|
|||||||
}
|
}
|
||||||
|
|
||||||
url := ptr.Val(session.GetUploadUrl())
|
url := ptr.Val(session.GetUploadUrl())
|
||||||
aw := uploadsession.NewWriter(uploader.getItemID(), url, size)
|
aw := graph.NewLargeItemWriter(uploader.getItemID(), url, size)
|
||||||
logger.Ctx(ctx).Debugw("uploading large attachment", "attachment_url", graph.LoggableURL(url))
|
logger.Ctx(ctx).Debugw("uploading large attachment", "attachment_url", graph.LoggableURL(url))
|
||||||
|
|
||||||
// Upload the stream data
|
// Upload the stream data
|
||||||
|
|||||||
@ -1,12 +1,12 @@
|
|||||||
package uploadsession
|
package graph
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
"github.com/alcionai/clues"
|
"github.com/alcionai/clues"
|
||||||
"gopkg.in/resty.v1"
|
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
)
|
)
|
||||||
@ -20,7 +20,7 @@ const (
|
|||||||
|
|
||||||
// Writer implements an io.Writer for a M365
|
// Writer implements an io.Writer for a M365
|
||||||
// UploadSession URL
|
// UploadSession URL
|
||||||
type writer struct {
|
type largeItemWriter struct {
|
||||||
// Identifier
|
// Identifier
|
||||||
id string
|
id string
|
||||||
// Upload URL for this item
|
// Upload URL for this item
|
||||||
@ -29,18 +29,20 @@ type writer struct {
|
|||||||
contentLength int64
|
contentLength int64
|
||||||
// Last item offset that was written to
|
// Last item offset that was written to
|
||||||
lastWrittenOffset int64
|
lastWrittenOffset int64
|
||||||
client *resty.Client
|
client httpWrapper
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWriter(id, url string, size int64) *writer {
|
func NewLargeItemWriter(id, url string, size int64) *largeItemWriter {
|
||||||
return &writer{id: id, url: url, contentLength: size, client: resty.New()}
|
return &largeItemWriter{id: id, url: url, contentLength: size, client: *NewNoTimeoutHTTPWrapper()}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write will upload the provided data to M365. It sets the `Content-Length` and `Content-Range` headers based on
|
// Write will upload the provided data to M365. It sets the `Content-Length` and `Content-Range` headers based on
|
||||||
// https://docs.microsoft.com/en-us/graph/api/driveitem-createuploadsession
|
// https://docs.microsoft.com/en-us/graph/api/driveitem-createuploadsession
|
||||||
func (iw *writer) Write(p []byte) (int, error) {
|
func (iw *largeItemWriter) Write(p []byte) (int, error) {
|
||||||
rangeLength := len(p)
|
rangeLength := len(p)
|
||||||
logger.Ctx(context.Background()).
|
ctx := context.Background()
|
||||||
|
|
||||||
|
logger.Ctx(ctx).
|
||||||
Debugf("WRITE for %s. Size:%d, Offset: %d, TotalSize: %d",
|
Debugf("WRITE for %s. Size:%d, Offset: %d, TotalSize: %d",
|
||||||
iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength)
|
iw.id, rangeLength, iw.lastWrittenOffset, iw.contentLength)
|
||||||
|
|
||||||
@ -48,17 +50,20 @@ func (iw *writer) Write(p []byte) (int, error) {
|
|||||||
|
|
||||||
// PUT the request - set headers `Content-Range`to describe total size and `Content-Length` to describe size of
|
// PUT the request - set headers `Content-Range`to describe total size and `Content-Length` to describe size of
|
||||||
// data in the current request
|
// data in the current request
|
||||||
_, err := iw.client.R().
|
headers := make(map[string]string)
|
||||||
SetHeaders(map[string]string{
|
headers[contentRangeHeaderKey] = fmt.Sprintf(
|
||||||
contentRangeHeaderKey: fmt.Sprintf(
|
contentRangeHeaderValueFmt,
|
||||||
contentRangeHeaderValueFmt,
|
iw.lastWrittenOffset,
|
||||||
iw.lastWrittenOffset,
|
endOffset-1,
|
||||||
endOffset-1,
|
iw.contentLength)
|
||||||
iw.contentLength),
|
headers[contentLengthHeaderKey] = fmt.Sprintf("%d", rangeLength)
|
||||||
contentLengthHeaderKey: fmt.Sprintf("%d", rangeLength),
|
|
||||||
}).
|
_, err := iw.client.Request(
|
||||||
SetBody(bytes.NewReader(p)).
|
ctx,
|
||||||
Put(iw.url)
|
http.MethodPut,
|
||||||
|
iw.url,
|
||||||
|
bytes.NewReader(p),
|
||||||
|
headers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, clues.Wrap(err, "uploading item").With(
|
return 0, clues.Wrap(err, "uploading item").With(
|
||||||
"upload_id", iw.id,
|
"upload_id", iw.id,
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package uploadsession
|
package graph
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
@ -69,7 +69,7 @@ func (suite *UploadSessionSuite) TestWriter() {
|
|||||||
|
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
writer := NewWriter("item", ts.URL, writeSize)
|
writer := NewLargeItemWriter("item", ts.URL, writeSize)
|
||||||
|
|
||||||
// Using a 32 KB buffer for the copy allows us to validate the
|
// Using a 32 KB buffer for the copy allows us to validate the
|
||||||
// multi-part upload. `io.CopyBuffer` will only write 32 KB at
|
// multi-part upload. `io.CopyBuffer` will only write 32 KB at
|
||||||
@ -16,7 +16,6 @@ import (
|
|||||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||||
"github.com/alcionai/corso/src/internal/connector/onedrive/api"
|
"github.com/alcionai/corso/src/internal/connector/onedrive/api"
|
||||||
"github.com/alcionai/corso/src/internal/connector/onedrive/metadata"
|
"github.com/alcionai/corso/src/internal/connector/onedrive/metadata"
|
||||||
"github.com/alcionai/corso/src/internal/connector/uploadsession"
|
|
||||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
)
|
)
|
||||||
@ -360,7 +359,7 @@ func driveItemWriter(
|
|||||||
|
|
||||||
url := ptr.Val(r.GetUploadUrl())
|
url := ptr.Val(r.GetUploadUrl())
|
||||||
|
|
||||||
return uploadsession.NewWriter(itemID, url, itemSize), nil
|
return graph.NewLargeItemWriter(itemID, url, itemSize), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// constructWebURL helper function for recreating the webURL
|
// constructWebURL helper function for recreating the webURL
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user