Compare commits
2 Commits
main
...
fix_onedri
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5e043fbf23 | ||
|
|
36794fd50f |
@ -27,7 +27,7 @@ const (
|
|||||||
collectionChannelBufferSize = 5
|
collectionChannelBufferSize = 5
|
||||||
|
|
||||||
// TODO: Tune this later along with collectionChannelBufferSize
|
// TODO: Tune this later along with collectionChannelBufferSize
|
||||||
urlPrefetchChannelBufferSize = 5
|
urlPrefetchChannelBufferSizeDefault = 5
|
||||||
|
|
||||||
// Max number of retries to get doc from M365
|
// Max number of retries to get doc from M365
|
||||||
// Seems to timeout at times because of multiple requests
|
// Seems to timeout at times because of multiple requests
|
||||||
@ -60,6 +60,8 @@ type Collection struct {
|
|||||||
|
|
||||||
// should only be true if the old delta token expired
|
// should only be true if the old delta token expired
|
||||||
doNotMergeItems bool
|
doNotMergeItems bool
|
||||||
|
|
||||||
|
urlPrefetchChannelBufferSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
// itemReadFunc returns a reader for the specified item
|
// itemReadFunc returns a reader for the specified item
|
||||||
@ -78,14 +80,15 @@ func NewCollection(
|
|||||||
ctrlOpts control.Options,
|
ctrlOpts control.Options,
|
||||||
) *Collection {
|
) *Collection {
|
||||||
c := &Collection{
|
c := &Collection{
|
||||||
folderPath: folderPath,
|
folderPath: folderPath,
|
||||||
driveItems: map[string]models.DriveItemable{},
|
driveItems: map[string]models.DriveItemable{},
|
||||||
driveID: driveID,
|
driveID: driveID,
|
||||||
source: source,
|
source: source,
|
||||||
service: service,
|
service: service,
|
||||||
data: make(chan data.Stream, collectionChannelBufferSize),
|
data: make(chan data.Stream, collectionChannelBufferSize),
|
||||||
statusUpdater: statusUpdater,
|
statusUpdater: statusUpdater,
|
||||||
ctrl: ctrlOpts,
|
ctrl: ctrlOpts,
|
||||||
|
urlPrefetchChannelBufferSize: urlPrefetchChannelBufferSizeDefault,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allows tests to set a mock populator
|
// Allows tests to set a mock populator
|
||||||
@ -169,8 +172,8 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
errs error
|
errs error
|
||||||
byteCount int64
|
byteCount int64
|
||||||
itemsRead int64
|
itemsRead int64
|
||||||
wg sync.WaitGroup
|
|
||||||
m sync.Mutex
|
m sync.Mutex
|
||||||
|
wg sync.WaitGroup
|
||||||
)
|
)
|
||||||
|
|
||||||
// Retrieve the OneDrive folder path to set later in
|
// Retrieve the OneDrive folder path to set later in
|
||||||
@ -189,7 +192,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
defer colCloser()
|
defer colCloser()
|
||||||
defer close(folderProgress)
|
defer close(folderProgress)
|
||||||
|
|
||||||
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
|
semaphoreCh := make(chan struct{}, oc.urlPrefetchChannelBufferSize)
|
||||||
defer close(semaphoreCh)
|
defer close(semaphoreCh)
|
||||||
|
|
||||||
errUpdater := func(id string, err error) {
|
errUpdater := func(id string, err error) {
|
||||||
@ -207,10 +210,12 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
go func(item models.DriveItemable) {
|
release := func() {
|
||||||
defer wg.Done()
|
wg.Done()
|
||||||
defer func() { <-semaphoreCh }()
|
<-semaphoreCh
|
||||||
|
}
|
||||||
|
|
||||||
|
go func(item models.DriveItemable) {
|
||||||
// Read the item
|
// Read the item
|
||||||
var (
|
var (
|
||||||
itemInfo details.ItemInfo
|
itemInfo details.ItemInfo
|
||||||
@ -232,7 +237,9 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
defer release()
|
||||||
errUpdater(*item.GetId(), err)
|
errUpdater(*item.GetId(), err)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -253,8 +260,11 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||||
|
defer release()
|
||||||
|
|
||||||
progReader, closer := observe.ItemProgress(ctx, itemData, observe.ItemBackupMsg, itemName, itemSize)
|
progReader, closer := observe.ItemProgress(ctx, itemData, observe.ItemBackupMsg, itemName, itemSize)
|
||||||
go closer()
|
go closer()
|
||||||
|
|
||||||
return progReader, nil
|
return progReader, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user