diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index fe06c317d..9aeba1d2f 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -197,7 +197,21 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { } // Limit the max number of active requests to GC - semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize) + fetchParallelism := col.ctrl.ItemFetchParallelism + if fetchParallelism < 1 || fetchParallelism > urlPrefetchChannelBufferSize { + fetchParallelism = urlPrefetchChannelBufferSize + logger.Ctx(ctx).Infow( + "fetch parallelism value not set or out of bounds, using default", + "default_parallelism", + urlPrefetchChannelBufferSize, + "requested_paralellism", + col.ctrl.ItemFetchParallelism, + ) + } + + logger.Ctx(ctx).Infow("fetching data with parallelism", "fetch_parallelism", fetchParallelism) + + semaphoreCh := make(chan struct{}, fetchParallelism) defer close(semaphoreCh) // delete all removed items diff --git a/src/pkg/control/options.go b/src/pkg/control/options.go index aecaef46e..f57ae492d 100644 --- a/src/pkg/control/options.go +++ b/src/pkg/control/options.go @@ -6,12 +6,13 @@ import ( // Options holds the optional configurations for a process type Options struct { - Collision CollisionPolicy `json:"-"` - DisableMetrics bool `json:"disableMetrics"` - FailFast bool `json:"failFast"` - RestorePermissions bool `json:"restorePermissions"` - SkipReduce bool `json:"skipReduce"` - ToggleFeatures Toggles `json:"ToggleFeatures"` + Collision CollisionPolicy `json:"-"` + DisableMetrics bool `json:"disableMetrics"` + FailFast bool `json:"failFast"` + RestorePermissions bool `json:"restorePermissions"` + SkipReduce bool `json:"skipReduce"` + ItemFetchParallelism int `json:"itemFetchParallelism"` + ToggleFeatures Toggles `json:"ToggleFeatures"` } // Defaults provides an Options with the default values set.