From 36794fd50f1d6c94223c93163cb7f156cb2f316a Mon Sep 17 00:00:00 2001 From: Vaibhav Kamra Date: Tue, 24 Jan 2023 19:28:23 -0800 Subject: [PATCH] Fix regression in OneDrive reader loop --- src/internal/connector/onedrive/collection.go | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 4ea9ea9eb..b66bbe058 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -27,7 +27,7 @@ const ( collectionChannelBufferSize = 5 // TODO: Tune this later along with collectionChannelBufferSize - urlPrefetchChannelBufferSize = 5 + urlPrefetchChannelBufferSizeDefault = 5 // Max number of retries to get doc from M365 // Seems to timeout at times because of multiple requests @@ -60,6 +60,8 @@ type Collection struct { // should only be true if the old delta token expired doNotMergeItems bool + + urlPrefetchChannelBufferSize int } // itemReadFunc returns a reader for the specified item @@ -78,14 +80,15 @@ func NewCollection( ctrlOpts control.Options, ) *Collection { c := &Collection{ - folderPath: folderPath, - driveItems: map[string]models.DriveItemable{}, - driveID: driveID, - source: source, - service: service, - data: make(chan data.Stream, collectionChannelBufferSize), - statusUpdater: statusUpdater, - ctrl: ctrlOpts, + folderPath: folderPath, + driveItems: map[string]models.DriveItemable{}, + driveID: driveID, + source: source, + service: service, + data: make(chan data.Stream, collectionChannelBufferSize), + statusUpdater: statusUpdater, + ctrl: ctrlOpts, + urlPrefetchChannelBufferSize: urlPrefetchChannelBufferSizeDefault, } // Allows tests to set a mock populator @@ -189,7 +192,7 @@ func (oc *Collection) populateItems(ctx context.Context) { defer colCloser() defer close(folderProgress) - semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize) + semaphoreCh := make(chan struct{}, oc.urlPrefetchChannelBufferSize) defer close(semaphoreCh) errUpdater := func(id string, err error) { @@ -208,9 +211,6 @@ func (oc *Collection) populateItems(ctx context.Context) { wg.Add(1) go func(item models.DriveItemable) { - defer wg.Done() - defer func() { <-semaphoreCh }() - // Read the item var ( itemInfo details.ItemInfo @@ -232,6 +232,8 @@ func (oc *Collection) populateItems(ctx context.Context) { } if err != nil { + defer wg.Done() + defer func() { <-semaphoreCh }() errUpdater(*item.GetId(), err) return } @@ -253,6 +255,9 @@ func (oc *Collection) populateItems(ctx context.Context) { } itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { + defer wg.Done() + defer func() { <-semaphoreCh }() + progReader, closer := observe.ItemProgress(ctx, itemData, observe.ItemBackupMsg, itemName, itemSize) go closer() return progReader, nil