Speedup OneDrive backup by pulling urls in delta API (#1842)

## Description

This brings in another huge improvement in the backup speed dropping from ~15m to ~1.5m for an account with ~5000 files. This one also drastically reduces the number of requests we have to make for the same account from 5505 to just ~55. This also means that we don't get throttled anymore and we can easily run multiple backup jobs in parallel before we hit the 1024 limit.

~The code works as of now, but I have to double check the numbers as well as fix an issue with us opening too many files and causing program to crash with 'too many open files' when we bump up the numbers. With the current numbers, it works, but I wanna double check and optimize them. Plus some code cleanup.~

## Does this PR need a docs update or release note?

- [x]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [ ]  No 

## Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [x] 🐹 Trivial/Minor

## Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

## Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Abin Simon 2022-12-20 15:04:28 +05:30 committed by GitHub
parent 2b0ccdc8de
commit a4147f5498
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 87 additions and 64 deletions

View File

@ -9,9 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed ### Changed
- *Breaking Change*: - **Breaking Change**:
Changed how backup details are stored in the repository to Changed how backup details are stored in the repository to
improve memory usage (#1735) from [vkamra](https://github.com/vkamra) improve memory usage (#1735) from [vkamra](https://github.com/vkamra)
- Improve OneDrive backup speed (#1842) from [meain](https://github.com/meain)
## [v0.0.3] (alpha) - 2022-12-05 ## [v0.0.3] (alpha) - 2022-12-05

View File

@ -9,6 +9,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
@ -22,10 +24,10 @@ import (
const ( const (
// TODO: This number needs to be tuned // TODO: This number needs to be tuned
// Consider max open file limit `ulimit -n`, usually 1024 when setting this value // Consider max open file limit `ulimit -n`, usually 1024 when setting this value
collectionChannelBufferSize = 50 collectionChannelBufferSize = 5
// TODO: Tune this later along with collectionChannelBufferSize // TODO: Tune this later along with collectionChannelBufferSize
urlPrefetchChannelBufferSize = 25 urlPrefetchChannelBufferSize = 5
// Max number of retries to get doc from M365 // Max number of retries to get doc from M365
// Seems to timeout at times because of multiple requests // Seems to timeout at times because of multiple requests
@ -48,7 +50,7 @@ type Collection struct {
// represents // represents
folderPath path.Path folderPath path.Path
// M365 IDs of file items within this collection // M365 IDs of file items within this collection
driveItemIDs []string driveItems []models.DriveItemable
// M365 ID of the drive this collection was created from // M365 ID of the drive this collection was created from
driveID string driveID string
source driveSource source driveSource
@ -61,8 +63,7 @@ type Collection struct {
// itemReadFunc returns a reader for the specified item // itemReadFunc returns a reader for the specified item
type itemReaderFunc func( type itemReaderFunc func(
ctx context.Context, ctx context.Context,
service graph.Servicer, item models.DriveItemable,
driveID, itemID string,
) (itemInfo details.ItemInfo, itemData io.ReadCloser, err error) ) (itemInfo details.ItemInfo, itemData io.ReadCloser, err error)
// NewCollection creates a Collection // NewCollection creates a Collection
@ -76,7 +77,6 @@ func NewCollection(
) *Collection { ) *Collection {
c := &Collection{ c := &Collection{
folderPath: folderPath, folderPath: folderPath,
driveItemIDs: []string{},
driveID: driveID, driveID: driveID,
source: source, source: source,
service: service, service: service,
@ -98,8 +98,8 @@ func NewCollection(
// Adds an itemID to the collection // Adds an itemID to the collection
// This will make it eligible to be populated // This will make it eligible to be populated
func (oc *Collection) Add(itemID string) { func (oc *Collection) Add(item models.DriveItemable) {
oc.driveItemIDs = append(oc.driveItemIDs, itemID) oc.driveItems = append(oc.driveItems, item)
} }
// Items() returns the channel containing M365 Exchange objects // Items() returns the channel containing M365 Exchange objects
@ -191,7 +191,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
folderProgress, colCloser := observe.ProgressWithCount( folderProgress, colCloser := observe.ProgressWithCount(
observe.ItemQueueMsg, observe.ItemQueueMsg,
"/"+parentPathString, "/"+parentPathString,
int64(len(oc.driveItemIDs)), int64(len(oc.driveItems)),
) )
defer colCloser() defer colCloser()
defer close(folderProgress) defer close(folderProgress)
@ -205,7 +205,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
m.Unlock() m.Unlock()
} }
for _, itemID := range oc.driveItemIDs { for _, item := range oc.driveItems {
if oc.ctrl.FailFast && errs != nil { if oc.ctrl.FailFast && errs != nil {
break break
} }
@ -214,7 +214,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
wg.Add(1) wg.Add(1)
go func(itemID string) { go func(item models.DriveItemable) {
defer wg.Done() defer wg.Done()
defer func() { <-semaphoreCh }() defer func() { <-semaphoreCh }()
@ -226,7 +226,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
) )
for i := 1; i <= maxRetries; i++ { for i := 1; i <= maxRetries; i++ {
itemInfo, itemData, err = oc.itemReader(ctx, oc.service, oc.driveID, itemID) itemInfo, itemData, err = oc.itemReader(ctx, item)
// We only retry if it is a timeout error. Other // We only retry if it is a timeout error. Other
// errors like throttling are already handled within // errors like throttling are already handled within
@ -242,7 +242,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
} }
if err != nil { if err != nil {
errUpdater(itemID, err) errUpdater(*item.GetId(), err)
return return
} }
@ -276,7 +276,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
info: itemInfo, info: itemInfo,
} }
folderProgress <- struct{}{} folderProgress <- struct{}{}
}(itemID) }(item)
} }
wg.Wait() wg.Wait()
@ -290,9 +290,9 @@ func (oc *Collection) reportAsCompleted(ctx context.Context, itemsRead int, byte
status := support.CreateStatus(ctx, support.Backup, status := support.CreateStatus(ctx, support.Backup,
1, // num folders (always 1) 1, // num folders (always 1)
support.CollectionMetrics{ support.CollectionMetrics{
Objects: len(oc.driveItemIDs), // items to read, Objects: len(oc.driveItems), // items to read,
Successes: itemsRead, // items read successfully, Successes: itemsRead, // items read successfully,
TotalBytes: byteCount, // Number of bytes read in the operation, TotalBytes: byteCount, // Number of bytes read in the operation,
}, },
errs, errs,
oc.folderPath.Folder(), // Additional details oc.folderPath.Folder(), // Additional details

View File

@ -9,11 +9,11 @@ import (
"testing" "testing"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
@ -68,7 +68,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
{ {
name: "oneDrive", name: "oneDrive",
source: OneDriveSource, source: OneDriveSource,
itemReader: func(context.Context, graph.Servicer, string, string) (details.ItemInfo, io.ReadCloser, error) { itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) {
return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName}}, return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName}},
io.NopCloser(bytes.NewReader(testItemData)), io.NopCloser(bytes.NewReader(testItemData)),
nil nil
@ -81,7 +81,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
{ {
name: "sharePoint", name: "sharePoint",
source: SharePointSource, source: SharePointSource,
itemReader: func(context.Context, graph.Servicer, string, string) (details.ItemInfo, io.ReadCloser, error) { itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) {
return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName}}, return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName}},
io.NopCloser(bytes.NewReader(testItemData)), io.NopCloser(bytes.NewReader(testItemData)),
nil nil
@ -116,7 +116,9 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
assert.Equal(t, folderPath, coll.FullPath()) assert.Equal(t, folderPath, coll.FullPath())
// Set a item reader, add an item and validate we get the item back // Set a item reader, add an item and validate we get the item back
coll.Add(testItemID) mockItem := models.NewDriveItem()
mockItem.SetId(&testItemID)
coll.Add(mockItem)
coll.itemReader = test.itemReader coll.itemReader = test.itemReader
// Read items from the collection // Read items from the collection
@ -167,6 +169,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() {
for _, test := range table { for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) { suite.T().Run(test.name, func(t *testing.T) {
var ( var (
testItemID = "fakeItemID"
collStatus = support.ConnectorOperationStatus{} collStatus = support.ConnectorOperationStatus{}
wg = sync.WaitGroup{} wg = sync.WaitGroup{}
) )
@ -183,11 +187,14 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() {
suite.testStatusUpdater(&wg, &collStatus), suite.testStatusUpdater(&wg, &collStatus),
test.source, test.source,
control.Options{}) control.Options{})
coll.Add("testItemID")
mockItem := models.NewDriveItem()
mockItem.SetId(&testItemID)
coll.Add(mockItem)
readError := errors.New("Test error") readError := errors.New("Test error")
coll.itemReader = func(context.Context, graph.Servicer, string, string) (details.ItemInfo, io.ReadCloser, error) { coll.itemReader = func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) {
return details.ItemInfo{}, nil, readError return details.ItemInfo{}, nil, readError
} }

View File

@ -153,7 +153,7 @@ func (c *Collections) UpdateCollections(ctx context.Context, driveID string, ite
} }
collection := col.(*Collection) collection := col.(*Collection)
collection.Add(*item.GetId()) collection.Add(item)
c.NumFiles++ c.NumFiles++
c.NumItems++ c.NumItems++

View File

@ -143,9 +143,30 @@ func collectItems(
// https://docs.microsoft.com/en-us/graph/api/driveitem-delta? // https://docs.microsoft.com/en-us/graph/api/driveitem-delta?
// view=graph-rest-1.0&tabs=http#example-4-retrieving-delta-results-using-a-timestamp // view=graph-rest-1.0&tabs=http#example-4-retrieving-delta-results-using-a-timestamp
builder := service.Client().DrivesById(driveID).Root().Delta() builder := service.Client().DrivesById(driveID).Root().Delta()
pageCount := int32(999) // max we can do is 999
requestFields := []string{
"content.downloadUrl",
"createdBy",
"createdDateTime",
"file",
"folder",
"id",
"lastModifiedDateTime",
"name",
"package",
"parentReference",
"root",
"size",
}
requestConfig := &msdrives.ItemRootDeltaRequestBuilderGetRequestConfiguration{
QueryParameters: &msdrives.ItemRootDeltaRequestBuilderGetQueryParameters{
Top: &pageCount,
Select: requestFields,
},
}
for { for {
r, err := builder.Get(ctx, nil) r, err := builder.Get(ctx, requestConfig)
if err != nil { if err != nil {
return errors.Wrapf( return errors.Wrapf(
err, err,

View File

@ -2,8 +2,8 @@ package onedrive
import ( import (
"context" "context"
"fmt"
"io" "io"
"time"
msdrives "github.com/microsoftgraph/msgraph-sdk-go/drives" msdrives "github.com/microsoftgraph/msgraph-sdk-go/drives"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
@ -27,10 +27,14 @@ const (
// and using a http client to initialize a reader // and using a http client to initialize a reader
func sharePointItemReader( func sharePointItemReader(
ctx context.Context, ctx context.Context,
service graph.Servicer, item models.DriveItemable,
driveID, itemID string,
) (details.ItemInfo, io.ReadCloser, error) { ) (details.ItemInfo, io.ReadCloser, error) {
item, rc, err := driveItemReader(ctx, service, driveID, itemID) url, ok := item.GetAdditionalData()[downloadURLKey].(*string)
if !ok {
return details.ItemInfo{}, nil, fmt.Errorf("failed to get url for %s", *item.GetName())
}
rc, err := driveItemReader(ctx, *url)
if err != nil { if err != nil {
return details.ItemInfo{}, nil, err return details.ItemInfo{}, nil, err
} }
@ -47,10 +51,14 @@ func sharePointItemReader(
// and using a http client to initialize a reader // and using a http client to initialize a reader
func oneDriveItemReader( func oneDriveItemReader(
ctx context.Context, ctx context.Context,
service graph.Servicer, item models.DriveItemable,
driveID, itemID string,
) (details.ItemInfo, io.ReadCloser, error) { ) (details.ItemInfo, io.ReadCloser, error) {
item, rc, err := driveItemReader(ctx, service, driveID, itemID) url, ok := item.GetAdditionalData()[downloadURLKey].(*string)
if !ok {
return details.ItemInfo{}, nil, fmt.Errorf("failed to get url for %s", *item.GetName())
}
rc, err := driveItemReader(ctx, *url)
if err != nil { if err != nil {
return details.ItemInfo{}, nil, err return details.ItemInfo{}, nil, err
} }
@ -67,35 +75,17 @@ func oneDriveItemReader(
// and using a http client to initialize a reader // and using a http client to initialize a reader
func driveItemReader( func driveItemReader(
ctx context.Context, ctx context.Context,
service graph.Servicer, url string,
driveID, itemID string, ) (io.ReadCloser, error) {
) (models.DriveItemable, io.ReadCloser, error) {
logger.Ctx(ctx).Debugw("Reading Item", "id", itemID, "time", time.Now())
item, err := service.Client().DrivesById(driveID).ItemsById(itemID).Get(ctx, nil)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to get item %s", itemID)
}
// Get the download URL - https://docs.microsoft.com/en-us/graph/api/driveitem-get-content
// These URLs are pre-authenticated and can be used to download the data using the standard
// http client
if _, found := item.GetAdditionalData()[downloadURLKey]; !found {
return nil, nil, errors.Errorf("file does not have a download URL. ID: %s, %#v",
itemID, item.GetAdditionalData())
}
downloadURL := item.GetAdditionalData()[downloadURLKey].(*string)
httpClient := graph.CreateHTTPClient() httpClient := graph.CreateHTTPClient()
httpClient.Timeout = 0 // infinite timeout for pulling large files httpClient.Timeout = 0 // infinite timeout for pulling large files
resp, err := httpClient.Get(*downloadURL) resp, err := httpClient.Get(url)
if err != nil { if err != nil {
return nil, nil, errors.Wrapf(err, "failed to download file from %s", *downloadURL) return nil, errors.Wrapf(err, "failed to download file from %s", url)
} }
return item, resp.Body, nil return resp.Body, nil
} }
// oneDriveItemInfo will populate a details.OneDriveInfo struct // oneDriveItemInfo will populate a details.OneDriveInfo struct
@ -104,11 +94,15 @@ func driveItemReader(
// doesn't have its size value updated as a side effect of creation, // doesn't have its size value updated as a side effect of creation,
// and kiota drops any SetSize update. // and kiota drops any SetSize update.
func oneDriveItemInfo(di models.DriveItemable, itemSize int64) *details.OneDriveInfo { func oneDriveItemInfo(di models.DriveItemable, itemSize int64) *details.OneDriveInfo {
ed, ok := di.GetCreatedBy().GetUser().GetAdditionalData()["email"]
email := "" email := ""
if ok {
email = *ed.(*string) if di.GetCreatedBy().GetUser() != nil {
// User is sometimes not available when created via some
// external applications (like backup/restore solutions)
ed, ok := di.GetCreatedBy().GetUser().GetAdditionalData()["email"]
if ok {
email = *ed.(*string)
}
} }
return &details.OneDriveInfo{ return &details.OneDriveInfo{

View File

@ -97,12 +97,12 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() {
ctx, flush := tester.NewContext() ctx, flush := tester.NewContext()
defer flush() defer flush()
var driveItemID string var driveItem models.DriveItemable
// This item collector tries to find "a" drive item that is a file to test the reader function // This item collector tries to find "a" drive item that is a file to test the reader function
itemCollector := func(ctx context.Context, driveID string, items []models.DriveItemable) error { itemCollector := func(ctx context.Context, driveID string, items []models.DriveItemable) error {
for _, item := range items { for _, item := range items {
if item.GetFile() != nil { if item.GetFile() != nil {
driveItemID = *item.GetId() driveItem = item
break break
} }
} }
@ -115,7 +115,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() {
// Test Requirement 2: Need a file // Test Requirement 2: Need a file
require.NotEmpty( require.NotEmpty(
suite.T(), suite.T(),
driveItemID, driveItem,
"no file item found for user %s drive %s", "no file item found for user %s drive %s",
suite.user, suite.user,
suite.userDriveID, suite.userDriveID,
@ -123,7 +123,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() {
// Read data for the file // Read data for the file
itemInfo, itemData, err := oneDriveItemReader(ctx, suite, suite.userDriveID, driveItemID) itemInfo, itemData, err := oneDriveItemReader(ctx, driveItem)
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
require.NotNil(suite.T(), itemInfo.OneDrive) require.NotNil(suite.T(), itemInfo.OneDrive)
require.NotEmpty(suite.T(), itemInfo.OneDrive.ItemName) require.NotEmpty(suite.T(), itemInfo.OneDrive.ItemName)