Add option to control parallelism getting items

This option is only consumed by Exchange right now and will control the
number of items fetched in parallel per collection. Note that the total
number of parallel item fetches may be higher is kopia is concurrently
uploading multiple collections.
This commit is contained in:
Ashlie Martinez 2023-03-16 18:12:24 -07:00
parent 1b6ad654e8
commit 43ccb4bccb
2 changed files with 22 additions and 7 deletions

View File

@ -197,7 +197,21 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
}
// Limit the max number of active requests to GC
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
fetchParallelism := col.ctrl.ItemFetchParallelism
if fetchParallelism == 0 || fetchParallelism > urlPrefetchChannelBufferSize {
fetchParallelism = urlPrefetchChannelBufferSize
logger.Ctx(ctx).Infow(
"fetch parallelism value not set or out of bounds, using default",
"default_parallelism",
urlPrefetchChannelBufferSize,
"requested_paralellism",
col.ctrl.ItemFetchParallelism,
)
}
logger.Ctx(ctx).Infow("fetching data with parallelism", "fetch_parallelism", fetchParallelism)
semaphoreCh := make(chan struct{}, fetchParallelism)
defer close(semaphoreCh)
// delete all removed items

View File

@ -6,12 +6,13 @@ import (
// Options holds the optional configurations for a process
type Options struct {
Collision CollisionPolicy `json:"-"`
DisableMetrics bool `json:"disableMetrics"`
FailFast bool `json:"failFast"`
RestorePermissions bool `json:"restorePermissions"`
SkipReduce bool `json:"skipReduce"`
ToggleFeatures Toggles `json:"ToggleFeatures"`
Collision CollisionPolicy `json:"-"`
DisableMetrics bool `json:"disableMetrics"`
FailFast bool `json:"failFast"`
RestorePermissions bool `json:"restorePermissions"`
SkipReduce bool `json:"skipReduce"`
ItemFetchParallelism int `json:"itemFetchParallelism"`
ToggleFeatures Toggles `json:"ToggleFeatures"`
}
// Defaults provides an Options with the default values set.