diff --git a/src/Makefile b/src/Makefile index 775024811..dc6a448f6 100644 --- a/src/Makefile +++ b/src/Makefile @@ -9,8 +9,7 @@ BAD_LINT_MSG := "Missing golangci-lint version $(WANTED_LINT_VERSION). Visit $(I .PHONY: check-lint check-lint-version lint load-test build: - go build -o corso -ldflags \ - "-X 'github.com/alcionai/corso/src/cli/version.Version=$(shell git describe --exact-match --tags $(git rev-parse HEAD) 2>/dev/null || echo unreleased)-$(shell git rev-parse --short HEAD)'" + go build -o corso lint: check-lint-version golangci-lint run diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 750f840f7..4f65b0ac0 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -4,6 +4,9 @@ package onedrive import ( "context" "io" + "sync" + "sync/atomic" + "time" "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" @@ -16,7 +19,15 @@ import ( const ( // TODO: This number needs to be tuned + // Consider max open file limit `ulimit -n`, usually 1024 when setting this value collectionChannelBufferSize = 50 + + // TODO: Tune this later along with collectionChannelBufferSize + urlPrefetchChannelBufferSize = 25 + + // Max number of retries to get doc from M365 + // Seems to timeout at times because of multiple requests + maxRetries = 4 // 1 + 3 retries ) var ( @@ -110,7 +121,9 @@ func (oc *Collection) populateItems(ctx context.Context) { var ( errs error byteCount int64 - itemsRead = 0 + itemsRead int64 + wg sync.WaitGroup + m sync.Mutex ) // Retrieve the OneDrive folder path to set later in @@ -129,37 +142,75 @@ func (oc *Collection) populateItems(ctx context.Context) { defer colCloser() defer close(folderProgress) - for _, itemID := range oc.driveItemIDs { - // Read the item - itemInfo, itemData, err := oc.itemReader(ctx, oc.service, oc.driveID, itemID) - if err != nil { - errs = support.WrapAndAppendf(itemID, err, errs) + semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize) + defer close(semaphoreCh) - if oc.service.ErrPolicy() { - break - } - - continue - } - // Item read successfully, add to collection - itemsRead++ - // byteCount iteration - byteCount += itemInfo.Size - - itemInfo.ParentPath = parentPathString - progReader, closer := observe.ItemProgress(itemData, observe.ItemBackupMsg, itemInfo.ItemName, itemInfo.Size) - - go closer() - - oc.data <- &Item{ - id: itemInfo.ItemName, - data: progReader, - info: itemInfo, - } - folderProgress <- struct{}{} + errUpdater := func(id string, err error) { + m.Lock() + errs = support.WrapAndAppend(id, err, errs) + m.Unlock() } - oc.reportAsCompleted(ctx, itemsRead, byteCount, errs) + for _, itemID := range oc.driveItemIDs { + if oc.service.ErrPolicy() && errs != nil { + break + } + + semaphoreCh <- struct{}{} + + wg.Add(1) + + go func(itemID string) { + defer wg.Done() + defer func() { <-semaphoreCh }() + + // Read the item + var ( + itemInfo *details.OneDriveInfo + itemData io.ReadCloser + err error + ) + + // Retrying as we were hitting timeouts when we have multiple requests + // https://github.com/microsoftgraph/msgraph-sdk-go/issues/302 + for i := 1; i <= maxRetries; i++ { + itemInfo, itemData, err = oc.itemReader(ctx, oc.service, oc.driveID, itemID) + if err == nil { + break + } + // TODO: Tweak sleep times + if i < maxRetries { + time.Sleep(time.Duration(3*(i+1)) * time.Second) + } + } + + if err != nil { + errUpdater(itemID, err) + return + } + + // Item read successfully, add to collection + atomic.AddInt64(&itemsRead, 1) + // byteCount iteration + atomic.AddInt64(&byteCount, itemInfo.Size) + + itemInfo.ParentPath = parentPathString + progReader, closer := observe.ItemProgress(itemData, observe.ItemBackupMsg, itemInfo.ItemName, itemInfo.Size) + + go closer() + + oc.data <- &Item{ + id: itemInfo.ItemName, + data: progReader, + info: itemInfo, + } + folderProgress <- struct{}{} + }(itemID) + } + + wg.Wait() + + oc.reportAsCompleted(ctx, int(itemsRead), byteCount, errs) } func (oc *Collection) reportAsCompleted(ctx context.Context, itemsRead int, byteCount int64, errs error) { diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 2a5f67205..84c46610e 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -21,6 +21,7 @@ import ( "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/store" ) @@ -172,6 +173,11 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { } backupCh <- struct{}{} + logger.Ctx(ctx).Debugf( + "Backed up %d directories and %d files", + opStats.k.TotalDirectoryCount, opStats.k.TotalFileCount, + ) + opStats.started = true opStats.gc = gc.AwaitStatus()