Compare commits

...

2 Commits

Author SHA1 Message Date
ryanfkeepers
5e043fbf23 fix linting, aggregate defers 2023-01-25 09:34:56 -07:00
Vaibhav Kamra
36794fd50f Fix regression in OneDrive reader loop 2023-01-24 19:28:23 -08:00

View File

@ -27,7 +27,7 @@ const (
collectionChannelBufferSize = 5
// TODO: Tune this later along with collectionChannelBufferSize
urlPrefetchChannelBufferSize = 5
urlPrefetchChannelBufferSizeDefault = 5
// Max number of retries to get doc from M365
// Seems to timeout at times because of multiple requests
@ -60,6 +60,8 @@ type Collection struct {
// should only be true if the old delta token expired
doNotMergeItems bool
urlPrefetchChannelBufferSize int
}
// itemReadFunc returns a reader for the specified item
@ -78,14 +80,15 @@ func NewCollection(
ctrlOpts control.Options,
) *Collection {
c := &Collection{
folderPath: folderPath,
driveItems: map[string]models.DriveItemable{},
driveID: driveID,
source: source,
service: service,
data: make(chan data.Stream, collectionChannelBufferSize),
statusUpdater: statusUpdater,
ctrl: ctrlOpts,
folderPath: folderPath,
driveItems: map[string]models.DriveItemable{},
driveID: driveID,
source: source,
service: service,
data: make(chan data.Stream, collectionChannelBufferSize),
statusUpdater: statusUpdater,
ctrl: ctrlOpts,
urlPrefetchChannelBufferSize: urlPrefetchChannelBufferSizeDefault,
}
// Allows tests to set a mock populator
@ -169,8 +172,8 @@ func (oc *Collection) populateItems(ctx context.Context) {
errs error
byteCount int64
itemsRead int64
wg sync.WaitGroup
m sync.Mutex
wg sync.WaitGroup
)
// Retrieve the OneDrive folder path to set later in
@ -189,7 +192,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
defer colCloser()
defer close(folderProgress)
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
semaphoreCh := make(chan struct{}, oc.urlPrefetchChannelBufferSize)
defer close(semaphoreCh)
errUpdater := func(id string, err error) {
@ -207,10 +210,12 @@ func (oc *Collection) populateItems(ctx context.Context) {
wg.Add(1)
go func(item models.DriveItemable) {
defer wg.Done()
defer func() { <-semaphoreCh }()
release := func() {
wg.Done()
<-semaphoreCh
}
go func(item models.DriveItemable) {
// Read the item
var (
itemInfo details.ItemInfo
@ -232,7 +237,9 @@ func (oc *Collection) populateItems(ctx context.Context) {
}
if err != nil {
defer release()
errUpdater(*item.GetId(), err)
return
}
@ -253,8 +260,11 @@ func (oc *Collection) populateItems(ctx context.Context) {
}
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
defer release()
progReader, closer := observe.ItemProgress(ctx, itemData, observe.ItemBackupMsg, itemName, itemSize)
go closer()
return progReader, nil
})