From c58240ea3d49c9a0c20f557e1ffa771f18dcd02f Mon Sep 17 00:00:00 2001 From: Abin Simon Date: Thu, 1 Dec 2022 11:41:11 +0530 Subject: [PATCH] Parallelize OneDrive queries to get URL (#1607) ## Description This improves the initial backup speed for OneDrive. OneDrive backup was mostly slow when we had a lot of tiny files. Case where we had mostly large files was pretty much the best case scenario and we were throttled by purely how fast we can get the files from MS and how fast kopia can process and upload it. But of small files, we were slowed by the loop which was taking quite a bit of time to fetch the download urls. We have now parallelized the query for getting the download URL. Under best case scenarios, I was able to speed it up to under 20s from ~4-5m starting point. That said, MS graph api still seems to throttle us and when that happen we still go back to around ~2m for worst case scenario. I've added 3 retries as some requests were failing when we continuously making many requests. This should also take care of the issue of url expiring mentioned in https://github.com/alcionai/corso/issues/581 as we are only prefetching a few urls ahed of time. ## Type of change - [ ] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Test - [ ] :computer: CI/Deployment - [ ] :hamster: Trivial/Minor ## Issue(s) * https://github.com/alcionai/corso/issues/1595 * fixes https://github.com/alcionai/corso/issues/581 ## Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [x] :green_heart: E2E --- src/Makefile | 3 +- src/internal/connector/onedrive/collection.go | 109 +++++++++++++----- src/internal/operations/backup.go | 6 + 3 files changed, 87 insertions(+), 31 deletions(-) diff --git a/src/Makefile b/src/Makefile index 775024811..dc6a448f6 100644 --- a/src/Makefile +++ b/src/Makefile @@ -9,8 +9,7 @@ BAD_LINT_MSG := "Missing golangci-lint version $(WANTED_LINT_VERSION). Visit $(I .PHONY: check-lint check-lint-version lint load-test build: - go build -o corso -ldflags \ - "-X 'github.com/alcionai/corso/src/cli/version.Version=$(shell git describe --exact-match --tags $(git rev-parse HEAD) 2>/dev/null || echo unreleased)-$(shell git rev-parse --short HEAD)'" + go build -o corso lint: check-lint-version golangci-lint run diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 750f840f7..4f65b0ac0 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -4,6 +4,9 @@ package onedrive import ( "context" "io" + "sync" + "sync/atomic" + "time" "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" @@ -16,7 +19,15 @@ import ( const ( // TODO: This number needs to be tuned + // Consider max open file limit `ulimit -n`, usually 1024 when setting this value collectionChannelBufferSize = 50 + + // TODO: Tune this later along with collectionChannelBufferSize + urlPrefetchChannelBufferSize = 25 + + // Max number of retries to get doc from M365 + // Seems to timeout at times because of multiple requests + maxRetries = 4 // 1 + 3 retries ) var ( @@ -110,7 +121,9 @@ func (oc *Collection) populateItems(ctx context.Context) { var ( errs error byteCount int64 - itemsRead = 0 + itemsRead int64 + wg sync.WaitGroup + m sync.Mutex ) // Retrieve the OneDrive folder path to set later in @@ -129,37 +142,75 @@ func (oc *Collection) populateItems(ctx context.Context) { defer colCloser() defer close(folderProgress) - for _, itemID := range oc.driveItemIDs { - // Read the item - itemInfo, itemData, err := oc.itemReader(ctx, oc.service, oc.driveID, itemID) - if err != nil { - errs = support.WrapAndAppendf(itemID, err, errs) + semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize) + defer close(semaphoreCh) - if oc.service.ErrPolicy() { - break - } - - continue - } - // Item read successfully, add to collection - itemsRead++ - // byteCount iteration - byteCount += itemInfo.Size - - itemInfo.ParentPath = parentPathString - progReader, closer := observe.ItemProgress(itemData, observe.ItemBackupMsg, itemInfo.ItemName, itemInfo.Size) - - go closer() - - oc.data <- &Item{ - id: itemInfo.ItemName, - data: progReader, - info: itemInfo, - } - folderProgress <- struct{}{} + errUpdater := func(id string, err error) { + m.Lock() + errs = support.WrapAndAppend(id, err, errs) + m.Unlock() } - oc.reportAsCompleted(ctx, itemsRead, byteCount, errs) + for _, itemID := range oc.driveItemIDs { + if oc.service.ErrPolicy() && errs != nil { + break + } + + semaphoreCh <- struct{}{} + + wg.Add(1) + + go func(itemID string) { + defer wg.Done() + defer func() { <-semaphoreCh }() + + // Read the item + var ( + itemInfo *details.OneDriveInfo + itemData io.ReadCloser + err error + ) + + // Retrying as we were hitting timeouts when we have multiple requests + // https://github.com/microsoftgraph/msgraph-sdk-go/issues/302 + for i := 1; i <= maxRetries; i++ { + itemInfo, itemData, err = oc.itemReader(ctx, oc.service, oc.driveID, itemID) + if err == nil { + break + } + // TODO: Tweak sleep times + if i < maxRetries { + time.Sleep(time.Duration(3*(i+1)) * time.Second) + } + } + + if err != nil { + errUpdater(itemID, err) + return + } + + // Item read successfully, add to collection + atomic.AddInt64(&itemsRead, 1) + // byteCount iteration + atomic.AddInt64(&byteCount, itemInfo.Size) + + itemInfo.ParentPath = parentPathString + progReader, closer := observe.ItemProgress(itemData, observe.ItemBackupMsg, itemInfo.ItemName, itemInfo.Size) + + go closer() + + oc.data <- &Item{ + id: itemInfo.ItemName, + data: progReader, + info: itemInfo, + } + folderProgress <- struct{}{} + }(itemID) + } + + wg.Wait() + + oc.reportAsCompleted(ctx, int(itemsRead), byteCount, errs) } func (oc *Collection) reportAsCompleted(ctx context.Context, itemsRead int, byteCount int64, errs error) { diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 2a5f67205..84c46610e 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -21,6 +21,7 @@ import ( "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/store" ) @@ -172,6 +173,11 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { } backupCh <- struct{}{} + logger.Ctx(ctx).Debugf( + "Backed up %d directories and %d files", + opStats.k.TotalDirectoryCount, opStats.k.TotalFileCount, + ) + opStats.started = true opStats.gc = gc.AwaitStatus()