Compare commits
2 Commits
main
...
fix_onedri
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5e043fbf23 | ||
|
|
36794fd50f |
@ -27,7 +27,7 @@ const (
|
||||
collectionChannelBufferSize = 5
|
||||
|
||||
// TODO: Tune this later along with collectionChannelBufferSize
|
||||
urlPrefetchChannelBufferSize = 5
|
||||
urlPrefetchChannelBufferSizeDefault = 5
|
||||
|
||||
// Max number of retries to get doc from M365
|
||||
// Seems to timeout at times because of multiple requests
|
||||
@ -60,6 +60,8 @@ type Collection struct {
|
||||
|
||||
// should only be true if the old delta token expired
|
||||
doNotMergeItems bool
|
||||
|
||||
urlPrefetchChannelBufferSize int
|
||||
}
|
||||
|
||||
// itemReadFunc returns a reader for the specified item
|
||||
@ -78,14 +80,15 @@ func NewCollection(
|
||||
ctrlOpts control.Options,
|
||||
) *Collection {
|
||||
c := &Collection{
|
||||
folderPath: folderPath,
|
||||
driveItems: map[string]models.DriveItemable{},
|
||||
driveID: driveID,
|
||||
source: source,
|
||||
service: service,
|
||||
data: make(chan data.Stream, collectionChannelBufferSize),
|
||||
statusUpdater: statusUpdater,
|
||||
ctrl: ctrlOpts,
|
||||
folderPath: folderPath,
|
||||
driveItems: map[string]models.DriveItemable{},
|
||||
driveID: driveID,
|
||||
source: source,
|
||||
service: service,
|
||||
data: make(chan data.Stream, collectionChannelBufferSize),
|
||||
statusUpdater: statusUpdater,
|
||||
ctrl: ctrlOpts,
|
||||
urlPrefetchChannelBufferSize: urlPrefetchChannelBufferSizeDefault,
|
||||
}
|
||||
|
||||
// Allows tests to set a mock populator
|
||||
@ -169,8 +172,8 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
errs error
|
||||
byteCount int64
|
||||
itemsRead int64
|
||||
wg sync.WaitGroup
|
||||
m sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
|
||||
// Retrieve the OneDrive folder path to set later in
|
||||
@ -189,7 +192,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
defer colCloser()
|
||||
defer close(folderProgress)
|
||||
|
||||
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
|
||||
semaphoreCh := make(chan struct{}, oc.urlPrefetchChannelBufferSize)
|
||||
defer close(semaphoreCh)
|
||||
|
||||
errUpdater := func(id string, err error) {
|
||||
@ -207,10 +210,12 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
go func(item models.DriveItemable) {
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphoreCh }()
|
||||
release := func() {
|
||||
wg.Done()
|
||||
<-semaphoreCh
|
||||
}
|
||||
|
||||
go func(item models.DriveItemable) {
|
||||
// Read the item
|
||||
var (
|
||||
itemInfo details.ItemInfo
|
||||
@ -232,7 +237,9 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
defer release()
|
||||
errUpdater(*item.GetId(), err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -253,8 +260,11 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
}
|
||||
|
||||
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||
defer release()
|
||||
|
||||
progReader, closer := observe.ItemProgress(ctx, itemData, observe.ItemBackupMsg, itemName, itemSize)
|
||||
go closer()
|
||||
|
||||
return progReader, nil
|
||||
})
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user