From 2176f4b831432d94831cc57365a451e792b3dfcb Mon Sep 17 00:00:00 2001 From: ashmrtn Date: Mon, 20 Mar 2023 13:04:49 -0700 Subject: [PATCH] Add option to control parallelism getting items (#2864) This option is only consumed by Exchange right now and will control the number of items fetched in parallel per collection. Note that the total number of parallel item fetches may be higher is kopia is concurrently uploading multiple collections. This PR does not expose this option at the CLI layer Manually tested to ensure it * uses the default if options value is 0 * uses the default if options value > default * uses option value otherwise There's another PR open for this but it's not expected to main. We still want to make sure the change goes in though, hence this PR. --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Test - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Test Plan - [x] :muscle: Manual - [ ] :zap: Unit test - [ ] :green_heart: E2E --- .../exchange/exchange_data_collection.go | 16 +++++++++++++++- src/pkg/control/options.go | 13 +++++++------ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index fe06c317d..9aeba1d2f 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -197,7 +197,21 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { } // Limit the max number of active requests to GC - semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize) + fetchParallelism := col.ctrl.ItemFetchParallelism + if fetchParallelism < 1 || fetchParallelism > urlPrefetchChannelBufferSize { + fetchParallelism = urlPrefetchChannelBufferSize + logger.Ctx(ctx).Infow( + "fetch parallelism value not set or out of bounds, using default", + "default_parallelism", + urlPrefetchChannelBufferSize, + "requested_paralellism", + col.ctrl.ItemFetchParallelism, + ) + } + + logger.Ctx(ctx).Infow("fetching data with parallelism", "fetch_parallelism", fetchParallelism) + + semaphoreCh := make(chan struct{}, fetchParallelism) defer close(semaphoreCh) // delete all removed items diff --git a/src/pkg/control/options.go b/src/pkg/control/options.go index aecaef46e..f57ae492d 100644 --- a/src/pkg/control/options.go +++ b/src/pkg/control/options.go @@ -6,12 +6,13 @@ import ( // Options holds the optional configurations for a process type Options struct { - Collision CollisionPolicy `json:"-"` - DisableMetrics bool `json:"disableMetrics"` - FailFast bool `json:"failFast"` - RestorePermissions bool `json:"restorePermissions"` - SkipReduce bool `json:"skipReduce"` - ToggleFeatures Toggles `json:"ToggleFeatures"` + Collision CollisionPolicy `json:"-"` + DisableMetrics bool `json:"disableMetrics"` + FailFast bool `json:"failFast"` + RestorePermissions bool `json:"restorePermissions"` + SkipReduce bool `json:"skipReduce"` + ItemFetchParallelism int `json:"itemFetchParallelism"` + ToggleFeatures Toggles `json:"ToggleFeatures"` } // Defaults provides an Options with the default values set.