Compare commits

...

2 Commits

Author SHA1 Message Date
ryanfkeepers
5e043fbf23 fix linting, aggregate defers 2023-01-25 09:34:56 -07:00
Vaibhav Kamra
36794fd50f Fix regression in OneDrive reader loop 2023-01-24 19:28:23 -08:00

View File

@ -27,7 +27,7 @@ const (
collectionChannelBufferSize = 5 collectionChannelBufferSize = 5
// TODO: Tune this later along with collectionChannelBufferSize // TODO: Tune this later along with collectionChannelBufferSize
urlPrefetchChannelBufferSize = 5 urlPrefetchChannelBufferSizeDefault = 5
// Max number of retries to get doc from M365 // Max number of retries to get doc from M365
// Seems to timeout at times because of multiple requests // Seems to timeout at times because of multiple requests
@ -60,6 +60,8 @@ type Collection struct {
// should only be true if the old delta token expired // should only be true if the old delta token expired
doNotMergeItems bool doNotMergeItems bool
urlPrefetchChannelBufferSize int
} }
// itemReadFunc returns a reader for the specified item // itemReadFunc returns a reader for the specified item
@ -78,14 +80,15 @@ func NewCollection(
ctrlOpts control.Options, ctrlOpts control.Options,
) *Collection { ) *Collection {
c := &Collection{ c := &Collection{
folderPath: folderPath, folderPath: folderPath,
driveItems: map[string]models.DriveItemable{}, driveItems: map[string]models.DriveItemable{},
driveID: driveID, driveID: driveID,
source: source, source: source,
service: service, service: service,
data: make(chan data.Stream, collectionChannelBufferSize), data: make(chan data.Stream, collectionChannelBufferSize),
statusUpdater: statusUpdater, statusUpdater: statusUpdater,
ctrl: ctrlOpts, ctrl: ctrlOpts,
urlPrefetchChannelBufferSize: urlPrefetchChannelBufferSizeDefault,
} }
// Allows tests to set a mock populator // Allows tests to set a mock populator
@ -169,8 +172,8 @@ func (oc *Collection) populateItems(ctx context.Context) {
errs error errs error
byteCount int64 byteCount int64
itemsRead int64 itemsRead int64
wg sync.WaitGroup
m sync.Mutex m sync.Mutex
wg sync.WaitGroup
) )
// Retrieve the OneDrive folder path to set later in // Retrieve the OneDrive folder path to set later in
@ -189,7 +192,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
defer colCloser() defer colCloser()
defer close(folderProgress) defer close(folderProgress)
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize) semaphoreCh := make(chan struct{}, oc.urlPrefetchChannelBufferSize)
defer close(semaphoreCh) defer close(semaphoreCh)
errUpdater := func(id string, err error) { errUpdater := func(id string, err error) {
@ -207,10 +210,12 @@ func (oc *Collection) populateItems(ctx context.Context) {
wg.Add(1) wg.Add(1)
go func(item models.DriveItemable) { release := func() {
defer wg.Done() wg.Done()
defer func() { <-semaphoreCh }() <-semaphoreCh
}
go func(item models.DriveItemable) {
// Read the item // Read the item
var ( var (
itemInfo details.ItemInfo itemInfo details.ItemInfo
@ -232,7 +237,9 @@ func (oc *Collection) populateItems(ctx context.Context) {
} }
if err != nil { if err != nil {
defer release()
errUpdater(*item.GetId(), err) errUpdater(*item.GetId(), err)
return return
} }
@ -253,8 +260,11 @@ func (oc *Collection) populateItems(ctx context.Context) {
} }
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
defer release()
progReader, closer := observe.ItemProgress(ctx, itemData, observe.ItemBackupMsg, itemName, itemSize) progReader, closer := observe.ItemProgress(ctx, itemData, observe.ItemBackupMsg, itemName, itemSize)
go closer() go closer()
return progReader, nil return progReader, nil
}) })