Parallelize OneDrive queries to get URL (#1607)

## Description

This improves the initial backup speed for OneDrive. OneDrive backup was mostly slow when we had a lot of tiny files. Case where we had mostly large files was pretty much the best case scenario and we were throttled by purely how fast we can get the files from MS and how fast kopia can process and upload it. But of small files, we were slowed by the loop which was taking quite a bit of time to fetch the download urls. We have now parallelized the query for getting the download URL. Under best case scenarios, I was able to speed it up to under 20s from ~4-5m starting point. That said, MS graph api still seems to throttle us and when that happen we still go back to around ~2m for worst case scenario. I've added 3 retries as some requests were failing when we continuously making many requests.

This should also take care of the issue of url expiring mentioned in https://github.com/alcionai/corso/issues/581 as we are only prefetching a few urls ahed of time.

## Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🐹 Trivial/Minor

## Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* https://github.com/alcionai/corso/issues/1595
* fixes https://github.com/alcionai/corso/issues/581

## Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Abin Simon 2022-12-01 11:41:11 +05:30 committed by GitHub
parent 784db9e12e
commit c58240ea3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 31 deletions

View File

@ -9,8 +9,7 @@ BAD_LINT_MSG := "Missing golangci-lint version $(WANTED_LINT_VERSION). Visit $(I
.PHONY: check-lint check-lint-version lint load-test .PHONY: check-lint check-lint-version lint load-test
build: build:
go build -o corso -ldflags \ go build -o corso
"-X 'github.com/alcionai/corso/src/cli/version.Version=$(shell git describe --exact-match --tags $(git rev-parse HEAD) 2>/dev/null || echo unreleased)-$(shell git rev-parse --short HEAD)'"
lint: check-lint-version lint: check-lint-version
golangci-lint run golangci-lint run

View File

@ -4,6 +4,9 @@ package onedrive
import ( import (
"context" "context"
"io" "io"
"sync"
"sync/atomic"
"time"
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
@ -16,7 +19,15 @@ import (
const ( const (
// TODO: This number needs to be tuned // TODO: This number needs to be tuned
// Consider max open file limit `ulimit -n`, usually 1024 when setting this value
collectionChannelBufferSize = 50 collectionChannelBufferSize = 50
// TODO: Tune this later along with collectionChannelBufferSize
urlPrefetchChannelBufferSize = 25
// Max number of retries to get doc from M365
// Seems to timeout at times because of multiple requests
maxRetries = 4 // 1 + 3 retries
) )
var ( var (
@ -110,7 +121,9 @@ func (oc *Collection) populateItems(ctx context.Context) {
var ( var (
errs error errs error
byteCount int64 byteCount int64
itemsRead = 0 itemsRead int64
wg sync.WaitGroup
m sync.Mutex
) )
// Retrieve the OneDrive folder path to set later in // Retrieve the OneDrive folder path to set later in
@ -129,37 +142,75 @@ func (oc *Collection) populateItems(ctx context.Context) {
defer colCloser() defer colCloser()
defer close(folderProgress) defer close(folderProgress)
for _, itemID := range oc.driveItemIDs { semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
// Read the item defer close(semaphoreCh)
itemInfo, itemData, err := oc.itemReader(ctx, oc.service, oc.driveID, itemID)
if err != nil {
errs = support.WrapAndAppendf(itemID, err, errs)
if oc.service.ErrPolicy() { errUpdater := func(id string, err error) {
break m.Lock()
} errs = support.WrapAndAppend(id, err, errs)
m.Unlock()
continue
}
// Item read successfully, add to collection
itemsRead++
// byteCount iteration
byteCount += itemInfo.Size
itemInfo.ParentPath = parentPathString
progReader, closer := observe.ItemProgress(itemData, observe.ItemBackupMsg, itemInfo.ItemName, itemInfo.Size)
go closer()
oc.data <- &Item{
id: itemInfo.ItemName,
data: progReader,
info: itemInfo,
}
folderProgress <- struct{}{}
} }
oc.reportAsCompleted(ctx, itemsRead, byteCount, errs) for _, itemID := range oc.driveItemIDs {
if oc.service.ErrPolicy() && errs != nil {
break
}
semaphoreCh <- struct{}{}
wg.Add(1)
go func(itemID string) {
defer wg.Done()
defer func() { <-semaphoreCh }()
// Read the item
var (
itemInfo *details.OneDriveInfo
itemData io.ReadCloser
err error
)
// Retrying as we were hitting timeouts when we have multiple requests
// https://github.com/microsoftgraph/msgraph-sdk-go/issues/302
for i := 1; i <= maxRetries; i++ {
itemInfo, itemData, err = oc.itemReader(ctx, oc.service, oc.driveID, itemID)
if err == nil {
break
}
// TODO: Tweak sleep times
if i < maxRetries {
time.Sleep(time.Duration(3*(i+1)) * time.Second)
}
}
if err != nil {
errUpdater(itemID, err)
return
}
// Item read successfully, add to collection
atomic.AddInt64(&itemsRead, 1)
// byteCount iteration
atomic.AddInt64(&byteCount, itemInfo.Size)
itemInfo.ParentPath = parentPathString
progReader, closer := observe.ItemProgress(itemData, observe.ItemBackupMsg, itemInfo.ItemName, itemInfo.Size)
go closer()
oc.data <- &Item{
id: itemInfo.ItemName,
data: progReader,
info: itemInfo,
}
folderProgress <- struct{}{}
}(itemID)
}
wg.Wait()
oc.reportAsCompleted(ctx, int(itemsRead), byteCount, errs)
} }
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsRead int, byteCount int64, errs error) { func (oc *Collection) reportAsCompleted(ctx context.Context, itemsRead int, byteCount int64, errs error) {

View File

@ -21,6 +21,7 @@ import (
"github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/store" "github.com/alcionai/corso/src/pkg/store"
) )
@ -172,6 +173,11 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
} }
backupCh <- struct{}{} backupCh <- struct{}{}
logger.Ctx(ctx).Debugf(
"Backed up %d directories and %d files",
opStats.k.TotalDirectoryCount, opStats.k.TotalFileCount,
)
opStats.started = true opStats.started = true
opStats.gc = gc.AwaitStatus() opStats.gc = gc.AwaitStatus()