Fix regression in OneDrive reader loop

This commit is contained in:
Vaibhav Kamra 2023-01-24 19:28:23 -08:00
parent 7872201f73
commit 36794fd50f

View File

@ -27,7 +27,7 @@ const (
collectionChannelBufferSize = 5 collectionChannelBufferSize = 5
// TODO: Tune this later along with collectionChannelBufferSize // TODO: Tune this later along with collectionChannelBufferSize
urlPrefetchChannelBufferSize = 5 urlPrefetchChannelBufferSizeDefault = 5
// Max number of retries to get doc from M365 // Max number of retries to get doc from M365
// Seems to timeout at times because of multiple requests // Seems to timeout at times because of multiple requests
@ -60,6 +60,8 @@ type Collection struct {
// should only be true if the old delta token expired // should only be true if the old delta token expired
doNotMergeItems bool doNotMergeItems bool
urlPrefetchChannelBufferSize int
} }
// itemReadFunc returns a reader for the specified item // itemReadFunc returns a reader for the specified item
@ -78,14 +80,15 @@ func NewCollection(
ctrlOpts control.Options, ctrlOpts control.Options,
) *Collection { ) *Collection {
c := &Collection{ c := &Collection{
folderPath: folderPath, folderPath: folderPath,
driveItems: map[string]models.DriveItemable{}, driveItems: map[string]models.DriveItemable{},
driveID: driveID, driveID: driveID,
source: source, source: source,
service: service, service: service,
data: make(chan data.Stream, collectionChannelBufferSize), data: make(chan data.Stream, collectionChannelBufferSize),
statusUpdater: statusUpdater, statusUpdater: statusUpdater,
ctrl: ctrlOpts, ctrl: ctrlOpts,
urlPrefetchChannelBufferSize: urlPrefetchChannelBufferSizeDefault,
} }
// Allows tests to set a mock populator // Allows tests to set a mock populator
@ -189,7 +192,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
defer colCloser() defer colCloser()
defer close(folderProgress) defer close(folderProgress)
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize) semaphoreCh := make(chan struct{}, oc.urlPrefetchChannelBufferSize)
defer close(semaphoreCh) defer close(semaphoreCh)
errUpdater := func(id string, err error) { errUpdater := func(id string, err error) {
@ -208,9 +211,6 @@ func (oc *Collection) populateItems(ctx context.Context) {
wg.Add(1) wg.Add(1)
go func(item models.DriveItemable) { go func(item models.DriveItemable) {
defer wg.Done()
defer func() { <-semaphoreCh }()
// Read the item // Read the item
var ( var (
itemInfo details.ItemInfo itemInfo details.ItemInfo
@ -232,6 +232,8 @@ func (oc *Collection) populateItems(ctx context.Context) {
} }
if err != nil { if err != nil {
defer wg.Done()
defer func() { <-semaphoreCh }()
errUpdater(*item.GetId(), err) errUpdater(*item.GetId(), err)
return return
} }
@ -253,6 +255,9 @@ func (oc *Collection) populateItems(ctx context.Context) {
} }
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
defer wg.Done()
defer func() { <-semaphoreCh }()
progReader, closer := observe.ItemProgress(ctx, itemData, observe.ItemBackupMsg, itemName, itemSize) progReader, closer := observe.ItemProgress(ctx, itemData, observe.ItemBackupMsg, itemName, itemSize)
go closer() go closer()
return progReader, nil return progReader, nil