From a4147f5498ef6895159c04c82aa40529bde85cef Mon Sep 17 00:00:00 2001 From: Abin Simon Date: Tue, 20 Dec 2022 15:04:28 +0530 Subject: [PATCH] Speedup OneDrive backup by pulling urls in delta API (#1842) ## Description This brings in another huge improvement in the backup speed dropping from ~15m to ~1.5m for an account with ~5000 files. This one also drastically reduces the number of requests we have to make for the same account from 5505 to just ~55. This also means that we don't get throttled anymore and we can easily run multiple backup jobs in parallel before we hit the 1024 limit. ~The code works as of now, but I have to double check the numbers as well as fix an issue with us opening too many files and causing program to crash with 'too many open files' when we bump up the numbers. With the current numbers, it works, but I wanna double check and optimize them. Plus some code cleanup.~ ## Does this PR need a docs update or release note? - [x] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [ ] :no_entry: No ## Type of change - [ ] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Test - [ ] :computer: CI/Deployment - [x] :hamster: Trivial/Minor ## Issue(s) * # ## Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [x] :green_heart: E2E --- CHANGELOG.md | 3 +- src/internal/connector/onedrive/collection.go | 34 +++++----- .../connector/onedrive/collection_test.go | 19 ++++-- .../connector/onedrive/collections.go | 2 +- src/internal/connector/onedrive/drive.go | 23 ++++++- src/internal/connector/onedrive/item.go | 62 +++++++++---------- src/internal/connector/onedrive/item_test.go | 8 +-- 7 files changed, 87 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63093b91c..422c92a33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,9 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- *Breaking Change*: +- **Breaking Change**: Changed how backup details are stored in the repository to improve memory usage (#1735) from [vkamra](https://github.com/vkamra) +- Improve OneDrive backup speed (#1842) from [meain](https://github.com/meain) ## [v0.0.3] (alpha) - 2022-12-05 diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 3f9f51fe5..930c2c528 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -9,6 +9,8 @@ import ( "sync/atomic" "time" + "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" @@ -22,10 +24,10 @@ import ( const ( // TODO: This number needs to be tuned // Consider max open file limit `ulimit -n`, usually 1024 when setting this value - collectionChannelBufferSize = 50 + collectionChannelBufferSize = 5 // TODO: Tune this later along with collectionChannelBufferSize - urlPrefetchChannelBufferSize = 25 + urlPrefetchChannelBufferSize = 5 // Max number of retries to get doc from M365 // Seems to timeout at times because of multiple requests @@ -48,7 +50,7 @@ type Collection struct { // represents folderPath path.Path // M365 IDs of file items within this collection - driveItemIDs []string + driveItems []models.DriveItemable // M365 ID of the drive this collection was created from driveID string source driveSource @@ -61,8 +63,7 @@ type Collection struct { // itemReadFunc returns a reader for the specified item type itemReaderFunc func( ctx context.Context, - service graph.Servicer, - driveID, itemID string, + item models.DriveItemable, ) (itemInfo details.ItemInfo, itemData io.ReadCloser, err error) // NewCollection creates a Collection @@ -76,7 +77,6 @@ func NewCollection( ) *Collection { c := &Collection{ folderPath: folderPath, - driveItemIDs: []string{}, driveID: driveID, source: source, service: service, @@ -98,8 +98,8 @@ func NewCollection( // Adds an itemID to the collection // This will make it eligible to be populated -func (oc *Collection) Add(itemID string) { - oc.driveItemIDs = append(oc.driveItemIDs, itemID) +func (oc *Collection) Add(item models.DriveItemable) { + oc.driveItems = append(oc.driveItems, item) } // Items() returns the channel containing M365 Exchange objects @@ -191,7 +191,7 @@ func (oc *Collection) populateItems(ctx context.Context) { folderProgress, colCloser := observe.ProgressWithCount( observe.ItemQueueMsg, "/"+parentPathString, - int64(len(oc.driveItemIDs)), + int64(len(oc.driveItems)), ) defer colCloser() defer close(folderProgress) @@ -205,7 +205,7 @@ func (oc *Collection) populateItems(ctx context.Context) { m.Unlock() } - for _, itemID := range oc.driveItemIDs { + for _, item := range oc.driveItems { if oc.ctrl.FailFast && errs != nil { break } @@ -214,7 +214,7 @@ func (oc *Collection) populateItems(ctx context.Context) { wg.Add(1) - go func(itemID string) { + go func(item models.DriveItemable) { defer wg.Done() defer func() { <-semaphoreCh }() @@ -226,7 +226,7 @@ func (oc *Collection) populateItems(ctx context.Context) { ) for i := 1; i <= maxRetries; i++ { - itemInfo, itemData, err = oc.itemReader(ctx, oc.service, oc.driveID, itemID) + itemInfo, itemData, err = oc.itemReader(ctx, item) // We only retry if it is a timeout error. Other // errors like throttling are already handled within @@ -242,7 +242,7 @@ func (oc *Collection) populateItems(ctx context.Context) { } if err != nil { - errUpdater(itemID, err) + errUpdater(*item.GetId(), err) return } @@ -276,7 +276,7 @@ func (oc *Collection) populateItems(ctx context.Context) { info: itemInfo, } folderProgress <- struct{}{} - }(itemID) + }(item) } wg.Wait() @@ -290,9 +290,9 @@ func (oc *Collection) reportAsCompleted(ctx context.Context, itemsRead int, byte status := support.CreateStatus(ctx, support.Backup, 1, // num folders (always 1) support.CollectionMetrics{ - Objects: len(oc.driveItemIDs), // items to read, - Successes: itemsRead, // items read successfully, - TotalBytes: byteCount, // Number of bytes read in the operation, + Objects: len(oc.driveItems), // items to read, + Successes: itemsRead, // items read successfully, + TotalBytes: byteCount, // Number of bytes read in the operation, }, errs, oc.folderPath.Folder(), // Additional details diff --git a/src/internal/connector/onedrive/collection_test.go b/src/internal/connector/onedrive/collection_test.go index 4de617c33..0abe52448 100644 --- a/src/internal/connector/onedrive/collection_test.go +++ b/src/internal/connector/onedrive/collection_test.go @@ -9,11 +9,11 @@ import ( "testing" msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" + "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/backup/details" @@ -68,7 +68,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { { name: "oneDrive", source: OneDriveSource, - itemReader: func(context.Context, graph.Servicer, string, string) (details.ItemInfo, io.ReadCloser, error) { + itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName}}, io.NopCloser(bytes.NewReader(testItemData)), nil @@ -81,7 +81,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { { name: "sharePoint", source: SharePointSource, - itemReader: func(context.Context, graph.Servicer, string, string) (details.ItemInfo, io.ReadCloser, error) { + itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName}}, io.NopCloser(bytes.NewReader(testItemData)), nil @@ -116,7 +116,9 @@ func (suite *CollectionUnitTestSuite) TestCollection() { assert.Equal(t, folderPath, coll.FullPath()) // Set a item reader, add an item and validate we get the item back - coll.Add(testItemID) + mockItem := models.NewDriveItem() + mockItem.SetId(&testItemID) + coll.Add(mockItem) coll.itemReader = test.itemReader // Read items from the collection @@ -167,6 +169,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() { for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { var ( + testItemID = "fakeItemID" + collStatus = support.ConnectorOperationStatus{} wg = sync.WaitGroup{} ) @@ -183,11 +187,14 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() { suite.testStatusUpdater(&wg, &collStatus), test.source, control.Options{}) - coll.Add("testItemID") + + mockItem := models.NewDriveItem() + mockItem.SetId(&testItemID) + coll.Add(mockItem) readError := errors.New("Test error") - coll.itemReader = func(context.Context, graph.Servicer, string, string) (details.ItemInfo, io.ReadCloser, error) { + coll.itemReader = func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { return details.ItemInfo{}, nil, readError } diff --git a/src/internal/connector/onedrive/collections.go b/src/internal/connector/onedrive/collections.go index ac7d1ec4c..7e4aa3a4e 100644 --- a/src/internal/connector/onedrive/collections.go +++ b/src/internal/connector/onedrive/collections.go @@ -153,7 +153,7 @@ func (c *Collections) UpdateCollections(ctx context.Context, driveID string, ite } collection := col.(*Collection) - collection.Add(*item.GetId()) + collection.Add(item) c.NumFiles++ c.NumItems++ diff --git a/src/internal/connector/onedrive/drive.go b/src/internal/connector/onedrive/drive.go index 8934c729b..904e5dd41 100644 --- a/src/internal/connector/onedrive/drive.go +++ b/src/internal/connector/onedrive/drive.go @@ -143,9 +143,30 @@ func collectItems( // https://docs.microsoft.com/en-us/graph/api/driveitem-delta? // view=graph-rest-1.0&tabs=http#example-4-retrieving-delta-results-using-a-timestamp builder := service.Client().DrivesById(driveID).Root().Delta() + pageCount := int32(999) // max we can do is 999 + requestFields := []string{ + "content.downloadUrl", + "createdBy", + "createdDateTime", + "file", + "folder", + "id", + "lastModifiedDateTime", + "name", + "package", + "parentReference", + "root", + "size", + } + requestConfig := &msdrives.ItemRootDeltaRequestBuilderGetRequestConfiguration{ + QueryParameters: &msdrives.ItemRootDeltaRequestBuilderGetQueryParameters{ + Top: &pageCount, + Select: requestFields, + }, + } for { - r, err := builder.Get(ctx, nil) + r, err := builder.Get(ctx, requestConfig) if err != nil { return errors.Wrapf( err, diff --git a/src/internal/connector/onedrive/item.go b/src/internal/connector/onedrive/item.go index c011a43d2..186e7a6a3 100644 --- a/src/internal/connector/onedrive/item.go +++ b/src/internal/connector/onedrive/item.go @@ -2,8 +2,8 @@ package onedrive import ( "context" + "fmt" "io" - "time" msdrives "github.com/microsoftgraph/msgraph-sdk-go/drives" "github.com/microsoftgraph/msgraph-sdk-go/models" @@ -27,10 +27,14 @@ const ( // and using a http client to initialize a reader func sharePointItemReader( ctx context.Context, - service graph.Servicer, - driveID, itemID string, + item models.DriveItemable, ) (details.ItemInfo, io.ReadCloser, error) { - item, rc, err := driveItemReader(ctx, service, driveID, itemID) + url, ok := item.GetAdditionalData()[downloadURLKey].(*string) + if !ok { + return details.ItemInfo{}, nil, fmt.Errorf("failed to get url for %s", *item.GetName()) + } + + rc, err := driveItemReader(ctx, *url) if err != nil { return details.ItemInfo{}, nil, err } @@ -47,10 +51,14 @@ func sharePointItemReader( // and using a http client to initialize a reader func oneDriveItemReader( ctx context.Context, - service graph.Servicer, - driveID, itemID string, + item models.DriveItemable, ) (details.ItemInfo, io.ReadCloser, error) { - item, rc, err := driveItemReader(ctx, service, driveID, itemID) + url, ok := item.GetAdditionalData()[downloadURLKey].(*string) + if !ok { + return details.ItemInfo{}, nil, fmt.Errorf("failed to get url for %s", *item.GetName()) + } + + rc, err := driveItemReader(ctx, *url) if err != nil { return details.ItemInfo{}, nil, err } @@ -67,35 +75,17 @@ func oneDriveItemReader( // and using a http client to initialize a reader func driveItemReader( ctx context.Context, - service graph.Servicer, - driveID, itemID string, -) (models.DriveItemable, io.ReadCloser, error) { - logger.Ctx(ctx).Debugw("Reading Item", "id", itemID, "time", time.Now()) - - item, err := service.Client().DrivesById(driveID).ItemsById(itemID).Get(ctx, nil) - if err != nil { - return nil, nil, errors.Wrapf(err, "failed to get item %s", itemID) - } - - // Get the download URL - https://docs.microsoft.com/en-us/graph/api/driveitem-get-content - // These URLs are pre-authenticated and can be used to download the data using the standard - // http client - if _, found := item.GetAdditionalData()[downloadURLKey]; !found { - return nil, nil, errors.Errorf("file does not have a download URL. ID: %s, %#v", - itemID, item.GetAdditionalData()) - } - - downloadURL := item.GetAdditionalData()[downloadURLKey].(*string) - + url string, +) (io.ReadCloser, error) { httpClient := graph.CreateHTTPClient() httpClient.Timeout = 0 // infinite timeout for pulling large files - resp, err := httpClient.Get(*downloadURL) + resp, err := httpClient.Get(url) if err != nil { - return nil, nil, errors.Wrapf(err, "failed to download file from %s", *downloadURL) + return nil, errors.Wrapf(err, "failed to download file from %s", url) } - return item, resp.Body, nil + return resp.Body, nil } // oneDriveItemInfo will populate a details.OneDriveInfo struct @@ -104,11 +94,15 @@ func driveItemReader( // doesn't have its size value updated as a side effect of creation, // and kiota drops any SetSize update. func oneDriveItemInfo(di models.DriveItemable, itemSize int64) *details.OneDriveInfo { - ed, ok := di.GetCreatedBy().GetUser().GetAdditionalData()["email"] - email := "" - if ok { - email = *ed.(*string) + + if di.GetCreatedBy().GetUser() != nil { + // User is sometimes not available when created via some + // external applications (like backup/restore solutions) + ed, ok := di.GetCreatedBy().GetUser().GetAdditionalData()["email"] + if ok { + email = *ed.(*string) + } } return &details.OneDriveInfo{ diff --git a/src/internal/connector/onedrive/item_test.go b/src/internal/connector/onedrive/item_test.go index 44b2d8ce7..65425ac71 100644 --- a/src/internal/connector/onedrive/item_test.go +++ b/src/internal/connector/onedrive/item_test.go @@ -97,12 +97,12 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() { ctx, flush := tester.NewContext() defer flush() - var driveItemID string + var driveItem models.DriveItemable // This item collector tries to find "a" drive item that is a file to test the reader function itemCollector := func(ctx context.Context, driveID string, items []models.DriveItemable) error { for _, item := range items { if item.GetFile() != nil { - driveItemID = *item.GetId() + driveItem = item break } } @@ -115,7 +115,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() { // Test Requirement 2: Need a file require.NotEmpty( suite.T(), - driveItemID, + driveItem, "no file item found for user %s drive %s", suite.user, suite.userDriveID, @@ -123,7 +123,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() { // Read data for the file - itemInfo, itemData, err := oneDriveItemReader(ctx, suite, suite.userDriveID, driveItemID) + itemInfo, itemData, err := oneDriveItemReader(ctx, driveItem) require.NoError(suite.T(), err) require.NotNil(suite.T(), itemInfo.OneDrive) require.NotEmpty(suite.T(), itemInfo.OneDrive.ItemName)