diff --git a/CHANGELOG.md b/CHANGELOG.md index 63093b91c..422c92a33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,9 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- *Breaking Change*: +- **Breaking Change**: Changed how backup details are stored in the repository to improve memory usage (#1735) from [vkamra](https://github.com/vkamra) +- Improve OneDrive backup speed (#1842) from [meain](https://github.com/meain) ## [v0.0.3] (alpha) - 2022-12-05 diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 3f9f51fe5..930c2c528 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -9,6 +9,8 @@ import ( "sync/atomic" "time" + "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" @@ -22,10 +24,10 @@ import ( const ( // TODO: This number needs to be tuned // Consider max open file limit `ulimit -n`, usually 1024 when setting this value - collectionChannelBufferSize = 50 + collectionChannelBufferSize = 5 // TODO: Tune this later along with collectionChannelBufferSize - urlPrefetchChannelBufferSize = 25 + urlPrefetchChannelBufferSize = 5 // Max number of retries to get doc from M365 // Seems to timeout at times because of multiple requests @@ -48,7 +50,7 @@ type Collection struct { // represents folderPath path.Path // M365 IDs of file items within this collection - driveItemIDs []string + driveItems []models.DriveItemable // M365 ID of the drive this collection was created from driveID string source driveSource @@ -61,8 +63,7 @@ type Collection struct { // itemReadFunc returns a reader for the specified item type itemReaderFunc func( ctx context.Context, - service graph.Servicer, - driveID, itemID string, + item models.DriveItemable, ) (itemInfo details.ItemInfo, itemData io.ReadCloser, err error) // NewCollection creates a Collection @@ -76,7 +77,6 @@ func NewCollection( ) *Collection { c := &Collection{ folderPath: folderPath, - driveItemIDs: []string{}, driveID: driveID, source: source, service: service, @@ -98,8 +98,8 @@ func NewCollection( // Adds an itemID to the collection // This will make it eligible to be populated -func (oc *Collection) Add(itemID string) { - oc.driveItemIDs = append(oc.driveItemIDs, itemID) +func (oc *Collection) Add(item models.DriveItemable) { + oc.driveItems = append(oc.driveItems, item) } // Items() returns the channel containing M365 Exchange objects @@ -191,7 +191,7 @@ func (oc *Collection) populateItems(ctx context.Context) { folderProgress, colCloser := observe.ProgressWithCount( observe.ItemQueueMsg, "/"+parentPathString, - int64(len(oc.driveItemIDs)), + int64(len(oc.driveItems)), ) defer colCloser() defer close(folderProgress) @@ -205,7 +205,7 @@ func (oc *Collection) populateItems(ctx context.Context) { m.Unlock() } - for _, itemID := range oc.driveItemIDs { + for _, item := range oc.driveItems { if oc.ctrl.FailFast && errs != nil { break } @@ -214,7 +214,7 @@ func (oc *Collection) populateItems(ctx context.Context) { wg.Add(1) - go func(itemID string) { + go func(item models.DriveItemable) { defer wg.Done() defer func() { <-semaphoreCh }() @@ -226,7 +226,7 @@ func (oc *Collection) populateItems(ctx context.Context) { ) for i := 1; i <= maxRetries; i++ { - itemInfo, itemData, err = oc.itemReader(ctx, oc.service, oc.driveID, itemID) + itemInfo, itemData, err = oc.itemReader(ctx, item) // We only retry if it is a timeout error. Other // errors like throttling are already handled within @@ -242,7 +242,7 @@ func (oc *Collection) populateItems(ctx context.Context) { } if err != nil { - errUpdater(itemID, err) + errUpdater(*item.GetId(), err) return } @@ -276,7 +276,7 @@ func (oc *Collection) populateItems(ctx context.Context) { info: itemInfo, } folderProgress <- struct{}{} - }(itemID) + }(item) } wg.Wait() @@ -290,9 +290,9 @@ func (oc *Collection) reportAsCompleted(ctx context.Context, itemsRead int, byte status := support.CreateStatus(ctx, support.Backup, 1, // num folders (always 1) support.CollectionMetrics{ - Objects: len(oc.driveItemIDs), // items to read, - Successes: itemsRead, // items read successfully, - TotalBytes: byteCount, // Number of bytes read in the operation, + Objects: len(oc.driveItems), // items to read, + Successes: itemsRead, // items read successfully, + TotalBytes: byteCount, // Number of bytes read in the operation, }, errs, oc.folderPath.Folder(), // Additional details diff --git a/src/internal/connector/onedrive/collection_test.go b/src/internal/connector/onedrive/collection_test.go index 4de617c33..0abe52448 100644 --- a/src/internal/connector/onedrive/collection_test.go +++ b/src/internal/connector/onedrive/collection_test.go @@ -9,11 +9,11 @@ import ( "testing" msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" + "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/backup/details" @@ -68,7 +68,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { { name: "oneDrive", source: OneDriveSource, - itemReader: func(context.Context, graph.Servicer, string, string) (details.ItemInfo, io.ReadCloser, error) { + itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { return details.ItemInfo{OneDrive: &details.OneDriveInfo{ItemName: testItemName}}, io.NopCloser(bytes.NewReader(testItemData)), nil @@ -81,7 +81,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { { name: "sharePoint", source: SharePointSource, - itemReader: func(context.Context, graph.Servicer, string, string) (details.ItemInfo, io.ReadCloser, error) { + itemReader: func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { return details.ItemInfo{SharePoint: &details.SharePointInfo{ItemName: testItemName}}, io.NopCloser(bytes.NewReader(testItemData)), nil @@ -116,7 +116,9 @@ func (suite *CollectionUnitTestSuite) TestCollection() { assert.Equal(t, folderPath, coll.FullPath()) // Set a item reader, add an item and validate we get the item back - coll.Add(testItemID) + mockItem := models.NewDriveItem() + mockItem.SetId(&testItemID) + coll.Add(mockItem) coll.itemReader = test.itemReader // Read items from the collection @@ -167,6 +169,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() { for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { var ( + testItemID = "fakeItemID" + collStatus = support.ConnectorOperationStatus{} wg = sync.WaitGroup{} ) @@ -183,11 +187,14 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() { suite.testStatusUpdater(&wg, &collStatus), test.source, control.Options{}) - coll.Add("testItemID") + + mockItem := models.NewDriveItem() + mockItem.SetId(&testItemID) + coll.Add(mockItem) readError := errors.New("Test error") - coll.itemReader = func(context.Context, graph.Servicer, string, string) (details.ItemInfo, io.ReadCloser, error) { + coll.itemReader = func(context.Context, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) { return details.ItemInfo{}, nil, readError } diff --git a/src/internal/connector/onedrive/collections.go b/src/internal/connector/onedrive/collections.go index ac7d1ec4c..7e4aa3a4e 100644 --- a/src/internal/connector/onedrive/collections.go +++ b/src/internal/connector/onedrive/collections.go @@ -153,7 +153,7 @@ func (c *Collections) UpdateCollections(ctx context.Context, driveID string, ite } collection := col.(*Collection) - collection.Add(*item.GetId()) + collection.Add(item) c.NumFiles++ c.NumItems++ diff --git a/src/internal/connector/onedrive/drive.go b/src/internal/connector/onedrive/drive.go index 8934c729b..904e5dd41 100644 --- a/src/internal/connector/onedrive/drive.go +++ b/src/internal/connector/onedrive/drive.go @@ -143,9 +143,30 @@ func collectItems( // https://docs.microsoft.com/en-us/graph/api/driveitem-delta? // view=graph-rest-1.0&tabs=http#example-4-retrieving-delta-results-using-a-timestamp builder := service.Client().DrivesById(driveID).Root().Delta() + pageCount := int32(999) // max we can do is 999 + requestFields := []string{ + "content.downloadUrl", + "createdBy", + "createdDateTime", + "file", + "folder", + "id", + "lastModifiedDateTime", + "name", + "package", + "parentReference", + "root", + "size", + } + requestConfig := &msdrives.ItemRootDeltaRequestBuilderGetRequestConfiguration{ + QueryParameters: &msdrives.ItemRootDeltaRequestBuilderGetQueryParameters{ + Top: &pageCount, + Select: requestFields, + }, + } for { - r, err := builder.Get(ctx, nil) + r, err := builder.Get(ctx, requestConfig) if err != nil { return errors.Wrapf( err, diff --git a/src/internal/connector/onedrive/item.go b/src/internal/connector/onedrive/item.go index c011a43d2..186e7a6a3 100644 --- a/src/internal/connector/onedrive/item.go +++ b/src/internal/connector/onedrive/item.go @@ -2,8 +2,8 @@ package onedrive import ( "context" + "fmt" "io" - "time" msdrives "github.com/microsoftgraph/msgraph-sdk-go/drives" "github.com/microsoftgraph/msgraph-sdk-go/models" @@ -27,10 +27,14 @@ const ( // and using a http client to initialize a reader func sharePointItemReader( ctx context.Context, - service graph.Servicer, - driveID, itemID string, + item models.DriveItemable, ) (details.ItemInfo, io.ReadCloser, error) { - item, rc, err := driveItemReader(ctx, service, driveID, itemID) + url, ok := item.GetAdditionalData()[downloadURLKey].(*string) + if !ok { + return details.ItemInfo{}, nil, fmt.Errorf("failed to get url for %s", *item.GetName()) + } + + rc, err := driveItemReader(ctx, *url) if err != nil { return details.ItemInfo{}, nil, err } @@ -47,10 +51,14 @@ func sharePointItemReader( // and using a http client to initialize a reader func oneDriveItemReader( ctx context.Context, - service graph.Servicer, - driveID, itemID string, + item models.DriveItemable, ) (details.ItemInfo, io.ReadCloser, error) { - item, rc, err := driveItemReader(ctx, service, driveID, itemID) + url, ok := item.GetAdditionalData()[downloadURLKey].(*string) + if !ok { + return details.ItemInfo{}, nil, fmt.Errorf("failed to get url for %s", *item.GetName()) + } + + rc, err := driveItemReader(ctx, *url) if err != nil { return details.ItemInfo{}, nil, err } @@ -67,35 +75,17 @@ func oneDriveItemReader( // and using a http client to initialize a reader func driveItemReader( ctx context.Context, - service graph.Servicer, - driveID, itemID string, -) (models.DriveItemable, io.ReadCloser, error) { - logger.Ctx(ctx).Debugw("Reading Item", "id", itemID, "time", time.Now()) - - item, err := service.Client().DrivesById(driveID).ItemsById(itemID).Get(ctx, nil) - if err != nil { - return nil, nil, errors.Wrapf(err, "failed to get item %s", itemID) - } - - // Get the download URL - https://docs.microsoft.com/en-us/graph/api/driveitem-get-content - // These URLs are pre-authenticated and can be used to download the data using the standard - // http client - if _, found := item.GetAdditionalData()[downloadURLKey]; !found { - return nil, nil, errors.Errorf("file does not have a download URL. ID: %s, %#v", - itemID, item.GetAdditionalData()) - } - - downloadURL := item.GetAdditionalData()[downloadURLKey].(*string) - + url string, +) (io.ReadCloser, error) { httpClient := graph.CreateHTTPClient() httpClient.Timeout = 0 // infinite timeout for pulling large files - resp, err := httpClient.Get(*downloadURL) + resp, err := httpClient.Get(url) if err != nil { - return nil, nil, errors.Wrapf(err, "failed to download file from %s", *downloadURL) + return nil, errors.Wrapf(err, "failed to download file from %s", url) } - return item, resp.Body, nil + return resp.Body, nil } // oneDriveItemInfo will populate a details.OneDriveInfo struct @@ -104,11 +94,15 @@ func driveItemReader( // doesn't have its size value updated as a side effect of creation, // and kiota drops any SetSize update. func oneDriveItemInfo(di models.DriveItemable, itemSize int64) *details.OneDriveInfo { - ed, ok := di.GetCreatedBy().GetUser().GetAdditionalData()["email"] - email := "" - if ok { - email = *ed.(*string) + + if di.GetCreatedBy().GetUser() != nil { + // User is sometimes not available when created via some + // external applications (like backup/restore solutions) + ed, ok := di.GetCreatedBy().GetUser().GetAdditionalData()["email"] + if ok { + email = *ed.(*string) + } } return &details.OneDriveInfo{ diff --git a/src/internal/connector/onedrive/item_test.go b/src/internal/connector/onedrive/item_test.go index 44b2d8ce7..65425ac71 100644 --- a/src/internal/connector/onedrive/item_test.go +++ b/src/internal/connector/onedrive/item_test.go @@ -97,12 +97,12 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() { ctx, flush := tester.NewContext() defer flush() - var driveItemID string + var driveItem models.DriveItemable // This item collector tries to find "a" drive item that is a file to test the reader function itemCollector := func(ctx context.Context, driveID string, items []models.DriveItemable) error { for _, item := range items { if item.GetFile() != nil { - driveItemID = *item.GetId() + driveItem = item break } } @@ -115,7 +115,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() { // Test Requirement 2: Need a file require.NotEmpty( suite.T(), - driveItemID, + driveItem, "no file item found for user %s drive %s", suite.user, suite.userDriveID, @@ -123,7 +123,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() { // Read data for the file - itemInfo, itemData, err := oneDriveItemReader(ctx, suite, suite.userDriveID, driveItemID) + itemInfo, itemData, err := oneDriveItemReader(ctx, driveItem) require.NoError(suite.T(), err) require.NotNil(suite.T(), itemInfo.OneDrive) require.NotEmpty(suite.T(), itemInfo.OneDrive.ItemName)