Add option to control parallelism getting items (#2864)

This option is only consumed by Exchange right now and will control the number of items fetched in parallel per collection. Note that the total number of parallel item fetches may be higher is kopia is concurrently uploading multiple collections.

This PR does not expose this option at the CLI layer

Manually tested to ensure it
* uses the default if options value is 0
* uses the default if options value > default
* uses option value otherwise

There's another PR open for this but it's not expected to main. We still
want to make sure the change goes in though, hence this PR.

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Test Plan

- [x] 💪 Manual
- [ ]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2023-03-20 13:04:49 -07:00 committed by GitHub
parent fa2cf046bb
commit 2176f4b831
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 7 deletions

View File

@ -197,7 +197,21 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
}
// Limit the max number of active requests to GC
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
fetchParallelism := col.ctrl.ItemFetchParallelism
if fetchParallelism < 1 || fetchParallelism > urlPrefetchChannelBufferSize {
fetchParallelism = urlPrefetchChannelBufferSize
logger.Ctx(ctx).Infow(
"fetch parallelism value not set or out of bounds, using default",
"default_parallelism",
urlPrefetchChannelBufferSize,
"requested_paralellism",
col.ctrl.ItemFetchParallelism,
)
}
logger.Ctx(ctx).Infow("fetching data with parallelism", "fetch_parallelism", fetchParallelism)
semaphoreCh := make(chan struct{}, fetchParallelism)
defer close(semaphoreCh)
// delete all removed items

View File

@ -11,6 +11,7 @@ type Options struct {
FailFast bool `json:"failFast"`
RestorePermissions bool `json:"restorePermissions"`
SkipReduce bool `json:"skipReduce"`
ItemFetchParallelism int `json:"itemFetchParallelism"`
ToggleFeatures Toggles `json:"ToggleFeatures"`
}