From 6405c8246e312efb679ee92d86b6fa8a747bad5b Mon Sep 17 00:00:00 2001 From: Keepers Date: Fri, 21 Apr 2023 14:23:41 -0600 Subject: [PATCH] move parallelism logs/checks upstream (#3003) The fetch paralellism checks and logs occur on every item streamed from GC. This is a bit chatty, and has been moved upstream in the process for a more centralized behavior. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :broom: Tech Debt/Cleanup #### Test Plan - [x] :muscle: Manual - [x] :zap: Unit test --- src/cli/backup/help_e2e_test.go | 2 +- src/cli/options/options.go | 2 +- src/cli/repo/s3_e2e_test.go | 2 +- src/cmd/factory/impl/exchange.go | 6 +- src/internal/connector/data_collections.go | 4 + .../connector/data_collections_test.go | 10 +-- .../exchange/data_collections_test.go | 12 +-- .../exchange/exchange_data_collection.go | 21 +---- .../exchange/exchange_data_collection_test.go | 2 +- .../exchange/service_iterators_test.go | 2 +- src/internal/connector/graph/consts.go | 88 +++++++++++++++++++ src/internal/connector/graph/consts_test.go | 40 +++++++++ src/internal/connector/onedrive/collection.go | 11 +-- .../sharepoint/data_collections_test.go | 2 +- src/internal/events/events_test.go | 2 +- src/internal/operations/backup.go | 5 +- .../operations/backup_integration_test.go | 24 ++--- src/internal/operations/backup_test.go | 2 +- src/internal/operations/operation_test.go | 4 +- src/internal/operations/restore.go | 2 +- src/internal/operations/restore_test.go | 32 +++---- src/pkg/control/options.go | 25 ++++-- src/pkg/repository/repository_test.go | 20 ++--- 23 files changed, 222 insertions(+), 98 deletions(-) create mode 100644 src/internal/connector/graph/consts_test.go diff --git a/src/cli/backup/help_e2e_test.go b/src/cli/backup/help_e2e_test.go index 9fec46934..1a5356652 100644 --- a/src/cli/backup/help_e2e_test.go +++ b/src/cli/backup/help_e2e_test.go @@ -46,7 +46,7 @@ func prepM365Test( vpr, cfgFP := tester.MakeTempTestConfigClone(t, force) ctx = config.SetViper(ctx, vpr) - repo, err := repository.Initialize(ctx, acct, st, control.Options{}) + repo, err := repository.Initialize(ctx, acct, st, control.Defaults()) require.NoError(t, err, clues.ToCore(err)) return acct, st, repo, vpr, recorder, cfgFP diff --git a/src/cli/options/options.go b/src/cli/options/options.go index 626ad2115..20f233b60 100644 --- a/src/cli/options/options.go +++ b/src/cli/options/options.go @@ -19,7 +19,7 @@ func Control() control.Options { opt.SkipReduce = skipReduceFV opt.ToggleFeatures.DisableIncrementals = disableIncrementalsFV opt.ToggleFeatures.ExchangeImmutableIDs = enableImmutableID - opt.ItemFetchParallelism = fetchParallelismFV + opt.Parallelism.ItemFetch = fetchParallelismFV return opt } diff --git a/src/cli/repo/s3_e2e_test.go b/src/cli/repo/s3_e2e_test.go index 4266be8f0..d5e6c992e 100644 --- a/src/cli/repo/s3_e2e_test.go +++ b/src/cli/repo/s3_e2e_test.go @@ -194,7 +194,7 @@ func (suite *S3E2ESuite) TestConnectS3Cmd() { ctx = config.SetViper(ctx, vpr) // init the repo first - _, err = repository.Initialize(ctx, account.Account{}, st, control.Options{}) + _, err = repository.Initialize(ctx, account.Account{}, st, control.Defaults()) require.NoError(t, err, clues.ToCore(err)) // then test it diff --git a/src/cmd/factory/impl/exchange.go b/src/cmd/factory/impl/exchange.go index a28fe3389..930296365 100644 --- a/src/cmd/factory/impl/exchange.go +++ b/src/cmd/factory/impl/exchange.go @@ -71,7 +71,7 @@ func handleExchangeEmailFactory(cmd *cobra.Command, args []string) error { subject, body, body, now, now, now, now) }, - control.Options{}, + control.Defaults(), errs) if err != nil { return Only(ctx, err) @@ -117,7 +117,7 @@ func handleExchangeCalendarEventFactory(cmd *cobra.Command, args []string) error User, subject, body, body, now, now, exchMock.NoRecurrence, exchMock.NoAttendees, false) }, - control.Options{}, + control.Defaults(), errs) if err != nil { return Only(ctx, err) @@ -168,7 +168,7 @@ func handleExchangeContactFactory(cmd *cobra.Command, args []string) error { "123-456-7890", ) }, - control.Options{}, + control.Defaults(), errs) if err != nil { return Only(ctx, err) diff --git a/src/internal/connector/data_collections.go b/src/internal/connector/data_collections.go index 9a51b8f3f..e88048fc9 100644 --- a/src/internal/connector/data_collections.go +++ b/src/internal/connector/data_collections.go @@ -46,6 +46,10 @@ func (gc *GraphConnector) ProduceBackupCollections( diagnostics.Index("service", sels.Service.String())) defer end() + // Limit the max number of active requests to graph from this collection. + ctrlOpts.Parallelism.ItemFetch = graph.Parallelism(sels.PathService()). + ItemOverride(ctx, ctrlOpts.Parallelism.ItemFetch) + err := verifyBackupInputs(sels, gc.IDNameLookup.IDs()) if err != nil { return nil, nil, clues.Stack(err).WithClues(ctx) diff --git a/src/internal/connector/data_collections_test.go b/src/internal/connector/data_collections_test.go index 9bfd88dc0..caeb1103b 100644 --- a/src/internal/connector/data_collections_test.go +++ b/src/internal/connector/data_collections_test.go @@ -105,7 +105,7 @@ func (suite *DataCollectionIntgSuite) TestExchangeDataCollection() { nil, connector.credentials, connector.UpdateStatus, - control.Options{}, + control.Defaults(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.Empty(t, excludes) @@ -208,7 +208,7 @@ func (suite *DataCollectionIntgSuite) TestDataCollections_invalidResourceOwner() test.getSelector(t), test.getSelector(t), nil, - control.Options{}, + control.Defaults(), fault.New(true)) assert.Error(t, err, clues.ToCore(err)) assert.Empty(t, collections) @@ -263,7 +263,7 @@ func (suite *DataCollectionIntgSuite) TestSharePointDataCollection() { connector.credentials, connector.Service, connector, - control.Options{}, + control.Defaults(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) // Not expecting excludes as this isn't an incremental backup. @@ -345,7 +345,7 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Libraries() { sel.Selector, sel.Selector, nil, - control.Options{}, + control.Defaults(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) require.Len(t, cols, 2) // 1 collection, 1 path prefix directory to ensure the root path exists. @@ -389,7 +389,7 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Lists() { sel.Selector, sel.Selector, nil, - control.Options{}, + control.Defaults(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.Less(t, 0, len(cols)) diff --git a/src/internal/connector/exchange/data_collections_test.go b/src/internal/connector/exchange/data_collections_test.go index d69948c69..3ff3e5c2c 100644 --- a/src/internal/connector/exchange/data_collections_test.go +++ b/src/internal/connector/exchange/data_collections_test.go @@ -271,7 +271,7 @@ func (suite *DataCollectionsIntegrationSuite) TestMailFetch() { ss, test.scope, DeltaPaths{}, - control.Options{}, + control.Defaults(), func(status *support.ConnectorOperationStatus) {}, fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -342,7 +342,7 @@ func (suite *DataCollectionsIntegrationSuite) TestDelta() { ss, test.scope, DeltaPaths{}, - control.Options{}, + control.Defaults(), func(status *support.ConnectorOperationStatus) {}, fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -373,7 +373,7 @@ func (suite *DataCollectionsIntegrationSuite) TestDelta() { ss, test.scope, dps, - control.Options{}, + control.Defaults(), func(status *support.ConnectorOperationStatus) {}, fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -420,7 +420,7 @@ func (suite *DataCollectionsIntegrationSuite) TestMailSerializationRegression() ss, sel.Scopes()[0], DeltaPaths{}, - control.Options{}, + control.Defaults(), newStatusUpdater(t, &wg), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -494,7 +494,7 @@ func (suite *DataCollectionsIntegrationSuite) TestContactSerializationRegression ss, test.scope, DeltaPaths{}, - control.Options{}, + control.Defaults(), newStatusUpdater(t, &wg), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -608,7 +608,7 @@ func (suite *DataCollectionsIntegrationSuite) TestEventsSerializationRegression( ss, test.scope, DeltaPaths{}, - control.Options{}, + control.Defaults(), newStatusUpdater(t, &wg), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index 2c45175da..97a89e3f5 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -35,10 +35,6 @@ var ( const ( collectionChannelBufferSize = 1000 numberOfRetries = 4 - - // Outlooks expects max 4 concurrent requests - // https://learn.microsoft.com/en-us/graph/throttling-limits#outlook-service-limits - urlPrefetchChannelBufferSize = 4 ) type itemer interface { @@ -196,22 +192,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { }() } - // Limit the max number of active requests to GC - fetchParallelism := col.ctrl.ItemFetchParallelism - if fetchParallelism < 1 || fetchParallelism > urlPrefetchChannelBufferSize { - fetchParallelism = urlPrefetchChannelBufferSize - logger.Ctx(ctx).Infow( - "fetch parallelism value not set or out of bounds, using default", - "default_parallelism", - urlPrefetchChannelBufferSize, - "requested_parallellism", - col.ctrl.ItemFetchParallelism, - ) - } - - logger.Ctx(ctx).Infow("fetching data with parallelism", "fetch_parallelism", fetchParallelism) - - semaphoreCh := make(chan struct{}, fetchParallelism) + semaphoreCh := make(chan struct{}, col.ctrl.Parallelism.ItemFetch) defer close(semaphoreCh) // delete all removed items diff --git a/src/internal/connector/exchange/exchange_data_collection_test.go b/src/internal/connector/exchange/exchange_data_collection_test.go index 94d08fcef..ae9a7d3ce 100644 --- a/src/internal/connector/exchange/exchange_data_collection_test.go +++ b/src/internal/connector/exchange/exchange_data_collection_test.go @@ -179,7 +179,7 @@ func (suite *ExchangeDataCollectionSuite) TestNewCollection_state() { test.curr, test.prev, test.loc, 0, &mockItemer{}, nil, - control.Options{}, + control.Defaults(), false) assert.Equal(t, test.expect, c.State(), "collection state") assert.Equal(t, test.curr, c.fullPath, "full path") diff --git a/src/internal/connector/exchange/service_iterators_test.go b/src/internal/connector/exchange/service_iterators_test.go index a752883d1..17814e95a 100644 --- a/src/internal/connector/exchange/service_iterators_test.go +++ b/src/internal/connector/exchange/service_iterators_test.go @@ -838,7 +838,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre test.resolver, allScope, test.dps, - control.Options{}, + control.Defaults(), fault.New(true)) assert.NoError(t, err, clues.ToCore(err)) diff --git a/src/internal/connector/graph/consts.go b/src/internal/connector/graph/consts.go index 14dac934f..32a549e8c 100644 --- a/src/internal/connector/graph/consts.go +++ b/src/internal/connector/graph/consts.go @@ -1,5 +1,12 @@ package graph +import ( + "context" + + "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" +) + // --------------------------------------------------------------------------- // item response AdditionalData // --------------------------------------------------------------------------- @@ -25,3 +32,84 @@ const ( // given endpoint. PreviousPathFileName = "previouspath" ) + +// --------------------------------------------------------------------------- +// Runtime Configuration +// --------------------------------------------------------------------------- + +type parallelism struct { + // sets the collection buffer size before blocking. + collectionBuffer int + // sets the parallelism of item population within a collection. + item int +} + +func (p parallelism) CollectionBufferSize() int { + if p.collectionBuffer == 0 { + return 1 + } + + return p.collectionBuffer +} + +func (p parallelism) CollectionBufferOverride(ctx context.Context, override int) int { + logger.Ctx(ctx).Infow( + "collection buffer parallelism", + "default_parallelism", p.collectionBuffer, + "requested_paralellism", override) + + if !isWithin(1, p.collectionBuffer, override) { + return p.collectionBuffer + } + + return override +} + +func (p parallelism) ItemOverride(ctx context.Context, override int) int { + logger.Ctx(ctx).Infow( + "item-level parallelism", + "default_parallelism", p.item, + "requested_paralellism", override) + + if !isWithin(1, p.item, override) { + return p.item + } + + return override +} + +func (p parallelism) Item() int { + if p.item == 0 { + return 1 + } + + return p.item +} + +// returns low <= v <= high +// if high < low, returns low <= v +func isWithin(low, high, v int) bool { + return v >= low && (high < low || v <= high) +} + +var sp = map[path.ServiceType]parallelism{ + path.ExchangeService: { + collectionBuffer: 4, + item: 4, + }, + path.OneDriveService: { + collectionBuffer: 5, + item: 4, + }, + // sharepoint libraries are considered "onedrive" parallelism. + // this only controls lists/pages. + path.SharePointService: { + collectionBuffer: 5, + item: 4, + }, +} + +// Parallelism returns the Parallelism for the requested service. +func Parallelism(srv path.ServiceType) parallelism { + return sp[srv] +} diff --git a/src/internal/connector/graph/consts_test.go b/src/internal/connector/graph/consts_test.go new file mode 100644 index 000000000..84f8b694e --- /dev/null +++ b/src/internal/connector/graph/consts_test.go @@ -0,0 +1,40 @@ +package graph + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/tester" +) + +type ConstsUnitSuite struct { + tester.Suite +} + +func TestConstsUnitSuite(t *testing.T) { + suite.Run(t, &ConstsUnitSuite{Suite: tester.NewUnitSuite(t)}) +} + +func (suite *ConstsUnitSuite) TestIsWithin() { + table := []struct { + name string + low, high, v int + expect assert.BoolAssertionFunc + }{ + {"1 < 3 < 5", 1, 5, 3, assert.True}, + {"1 < 3, no high", 1, 0, 3, assert.True}, + {"1 <= 1 <= 1", 1, 1, 1, assert.True}, + {"1 <= 1 <= 5", 1, 5, 1, assert.True}, + {"1 <= 5 <= 5", 1, 5, 5, assert.True}, + {"1 <= 0 <= 2", 1, 1, 0, assert.False}, + {"1 <= 3 <= 2", 1, 1, 3, assert.False}, + } + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + test.expect(t, isWithin(test.low, test.high, test.v)) + }) + } +} diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index aef3dd7ab..893b0a0bd 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -28,13 +28,6 @@ import ( ) const ( - // TODO: This number needs to be tuned - // Consider max open file limit `ulimit -n`, usually 1024 when setting this value - collectionChannelBufferSize = 5 - - // TODO: Tune this later along with collectionChannelBufferSize - urlPrefetchChannelBufferSize = 5 - // Used to compare in case of OneNote files MaxOneNoteFileSize = 2 * 1024 * 1024 * 1024 ) @@ -179,7 +172,7 @@ func NewCollection( driveID: driveID, source: source, service: service, - data: make(chan data.Stream, collectionChannelBufferSize), + data: make(chan data.Stream, graph.Parallelism(path.OneDriveMetadataService).CollectionBufferSize()), statusUpdater: statusUpdater, ctrl: ctrlOpts, state: data.StateOf(prevPath, folderPath), @@ -489,7 +482,7 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { defer colCloser() defer close(folderProgress) - semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize) + semaphoreCh := make(chan struct{}, graph.Parallelism(path.OneDriveService).Item()) defer close(semaphoreCh) for _, item := range oc.driveItems { diff --git a/src/internal/connector/sharepoint/data_collections_test.go b/src/internal/connector/sharepoint/data_collections_test.go index e787aea41..14d406428 100644 --- a/src/internal/connector/sharepoint/data_collections_test.go +++ b/src/internal/connector/sharepoint/data_collections_test.go @@ -116,7 +116,7 @@ func (suite *SharePointLibrariesUnitSuite) TestUpdateCollections() { testFolderMatcher{test.scope}, &MockGraphService{}, nil, - control.Options{}) + control.Defaults()) c.CollectionMap = collMap diff --git a/src/internal/events/events_test.go b/src/internal/events/events_test.go index 46363a695..3d44690e9 100644 --- a/src/internal/events/events_test.go +++ b/src/internal/events/events_test.go @@ -52,7 +52,7 @@ func (suite *EventsIntegrationSuite) TestNewBus() { ) require.NoError(t, err, clues.ToCore(err)) - b, err := events.NewBus(ctx, s, a.ID(), control.Options{}) + b, err := events.NewBus(ctx, s, a.ID(), control.Defaults()) require.NotEmpty(t, b) require.NoError(t, err, clues.ToCore(err)) diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 366aab60f..091d404cd 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -234,7 +234,10 @@ func (op *BackupOperation) do( fallbackReasons = makeFallbackReasons(op.Selectors) ) - logger.Ctx(ctx).With("selectors", op.Selectors).Info("backing up selection") + logger.Ctx(ctx).With( + "control_options", op.Options, + "selectors", op.Selectors). + Info("backing up selection") // should always be 1, since backups are 1:1 with resourceOwners. opStats.resourceCount = 1 diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index fcf404dcf..0c52ee153 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -140,7 +140,7 @@ func newTestBackupOp( //revive:enable:context-as-argument var ( sw = store.NewKopiaStore(ms) - opts = control.Options{} + opts = control.Defaults() ) opts.ToggleFeatures = featureToggles @@ -532,14 +532,16 @@ func (suite *BackupOpIntegrationSuite) SetupSuite() { } func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { - kw := &kopia.Wrapper{} - sw := &store.Wrapper{} - gc := &mock.GraphConnector{} - acct := tester.NewM365Account(suite.T()) + var ( + kw = &kopia.Wrapper{} + sw = &store.Wrapper{} + gc = &mock.GraphConnector{} + acct = tester.NewM365Account(suite.T()) + opts = control.Defaults() + ) table := []struct { name string - opts control.Options kw *kopia.Wrapper sw *store.Wrapper bp inject.BackupProducer @@ -547,10 +549,10 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { targets []string errCheck assert.ErrorAssertionFunc }{ - {"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError}, - {"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error}, - {"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error}, - {"missing backup producer", control.Options{}, kw, sw, nil, acct, nil, assert.Error}, + {"good", kw, sw, gc, acct, nil, assert.NoError}, + {"missing kopia", nil, sw, gc, acct, nil, assert.Error}, + {"missing modelstore", kw, nil, gc, acct, nil, assert.Error}, + {"missing backup producer", kw, sw, nil, acct, nil, assert.Error}, } for _, test := range table { suite.Run(test.name, func() { @@ -561,7 +563,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { _, err := NewBackupOperation( ctx, - test.opts, + opts, test.kw, test.sw, test.bp, diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index 3e9e36805..40cbfb627 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -451,7 +451,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() { op, err := NewBackupOperation( ctx, - control.Options{}, + control.Defaults(), kw, sw, gc, diff --git a/src/internal/operations/operation_test.go b/src/internal/operations/operation_test.go index 27cf6185f..e95f942b5 100644 --- a/src/internal/operations/operation_test.go +++ b/src/internal/operations/operation_test.go @@ -25,7 +25,7 @@ func TestOperationSuite(t *testing.T) { func (suite *OperationSuite) TestNewOperation() { t := suite.T() - op := newOperation(control.Options{}, events.Bus{}, nil, nil) + op := newOperation(control.Defaults(), events.Bus{}, nil, nil) assert.Greater(t, op.CreatedAt, time.Time{}) } @@ -45,7 +45,7 @@ func (suite *OperationSuite) TestOperation_Validate() { } for _, test := range table { suite.Run(test.name, func() { - err := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw).validate() + err := newOperation(control.Defaults(), events.Bus{}, test.kw, test.sw).validate() test.errCheck(suite.T(), err, clues.ToCore(err)) }) } diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index b5d3caf64..aa632de92 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -212,7 +212,7 @@ func (op *RestoreOperation) do( }) observe.Message(ctx, fmt.Sprintf("Discovered %d items in backup %s to restore", len(paths), op.BackupID)) - logger.Ctx(ctx).With("selectors", op.Selectors).Info("restoring selection") + logger.Ctx(ctx).With("control_options", op.Options, "selectors", op.Selectors).Info("restoring selection") kopiaComplete, closer := observe.MessageWithCompletion(ctx, "Enumerating items in repository") defer closer() diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index 623367a75..57129e63c 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -106,7 +106,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { op, err := NewRestoreOperation( ctx, - control.Options{}, + control.Defaults(), kw, sw, gc, @@ -213,15 +213,17 @@ func (suite *RestoreOpIntegrationSuite) TearDownSuite() { } func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { - kw := &kopia.Wrapper{} - sw := &store.Wrapper{} - gc := &mock.GraphConnector{} - acct := tester.NewM365Account(suite.T()) - dest := tester.DefaultTestRestoreDestination() + var ( + kw = &kopia.Wrapper{} + sw = &store.Wrapper{} + gc = &mock.GraphConnector{} + acct = tester.NewM365Account(suite.T()) + dest = tester.DefaultTestRestoreDestination() + opts = control.Defaults() + ) table := []struct { name string - opts control.Options kw *kopia.Wrapper sw *store.Wrapper rc inject.RestoreConsumer @@ -229,10 +231,10 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { targets []string errCheck assert.ErrorAssertionFunc }{ - {"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError}, - {"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error}, - {"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error}, - {"missing restore consumer", control.Options{}, kw, sw, nil, acct, nil, assert.Error}, + {"good", kw, sw, gc, acct, nil, assert.NoError}, + {"missing kopia", nil, sw, gc, acct, nil, assert.Error}, + {"missing modelstore", kw, nil, gc, acct, nil, assert.Error}, + {"missing restore consumer", kw, sw, nil, acct, nil, assert.Error}, } for _, test := range table { suite.Run(test.name, func() { @@ -241,7 +243,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { _, err := NewRestoreOperation( ctx, - test.opts, + opts, test.kw, test.sw, test.rc, @@ -280,7 +282,7 @@ func setupExchangeBackup( bo, err := NewBackupOperation( ctx, - control.Options{}, + control.Defaults(), kw, sw, gc, @@ -331,7 +333,7 @@ func setupSharePointBackup( bo, err := NewBackupOperation( ctx, - control.Options{}, + control.Defaults(), kw, sw, gc, @@ -475,7 +477,7 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_errorNoResults() { ro, err := NewRestoreOperation( ctx, - control.Options{}, + control.Defaults(), suite.kw, suite.sw, gc, diff --git a/src/pkg/control/options.go b/src/pkg/control/options.go index 62a8027af..dc547cbb8 100644 --- a/src/pkg/control/options.go +++ b/src/pkg/control/options.go @@ -6,17 +6,24 @@ import ( // Options holds the optional configurations for a process type Options struct { - Collision CollisionPolicy `json:"-"` - DisableMetrics bool `json:"disableMetrics"` - FailureHandling FailureBehavior `json:"failureHandling"` - ItemFetchParallelism int `json:"itemFetchParallelism"` - RestorePermissions bool `json:"restorePermissions"` - SkipReduce bool `json:"skipReduce"` - ToggleFeatures Toggles `json:"ToggleFeatures"` + Collision CollisionPolicy `json:"-"` + DisableMetrics bool `json:"disableMetrics"` + FailureHandling FailureBehavior `json:"failureHandling"` + RestorePermissions bool `json:"restorePermissions"` + SkipReduce bool `json:"skipReduce"` + ToggleFeatures Toggles `json:"toggleFeatures"` + Parallelism Parallelism `json:"parallelism"` } type FailureBehavior string +type Parallelism struct { + // sets the collection buffer size before blocking. + CollectionBuffer int + // sets the parallelism of item population within a collection. + ItemFetch int +} + const ( // fails and exits the run immediately FailFast FailureBehavior = "fail-fast" @@ -31,6 +38,10 @@ func Defaults() Options { return Options{ FailureHandling: FailAfterRecovery, ToggleFeatures: Toggles{}, + Parallelism: Parallelism{ + CollectionBuffer: 4, + ItemFetch: 4, + }, } } diff --git a/src/pkg/repository/repository_test.go b/src/pkg/repository/repository_test.go index 3d6c9979f..68053a841 100644 --- a/src/pkg/repository/repository_test.go +++ b/src/pkg/repository/repository_test.go @@ -54,7 +54,7 @@ func (suite *RepositoryUnitSuite) TestInitialize() { st, err := test.storage() assert.NoError(t, err, clues.ToCore(err)) - _, err = repository.Initialize(ctx, test.account, st, control.Options{}) + _, err = repository.Initialize(ctx, test.account, st, control.Defaults()) test.errCheck(t, err, clues.ToCore(err)) }) } @@ -88,7 +88,7 @@ func (suite *RepositoryUnitSuite) TestConnect() { st, err := test.storage() assert.NoError(t, err, clues.ToCore(err)) - _, err = repository.Connect(ctx, test.account, st, control.Options{}) + _, err = repository.Connect(ctx, test.account, st, control.Defaults()) test.errCheck(t, err, clues.ToCore(err)) }) } @@ -131,7 +131,7 @@ func (suite *RepositoryIntegrationSuite) TestInitialize() { t := suite.T() st := test.storage(t) - r, err := repository.Initialize(ctx, test.account, st, control.Options{}) + r, err := repository.Initialize(ctx, test.account, st, control.Defaults()) if err == nil { defer func() { err := r.Close(ctx) @@ -153,11 +153,11 @@ func (suite *RepositoryIntegrationSuite) TestConnect() { // need to initialize the repository before we can test connecting to it. st := tester.NewPrefixedS3Storage(t) - _, err := repository.Initialize(ctx, account.Account{}, st, control.Options{}) + _, err := repository.Initialize(ctx, account.Account{}, st, control.Defaults()) require.NoError(t, err, clues.ToCore(err)) // now re-connect - _, err = repository.Connect(ctx, account.Account{}, st, control.Options{}) + _, err = repository.Connect(ctx, account.Account{}, st, control.Defaults()) assert.NoError(t, err, clues.ToCore(err)) } @@ -170,7 +170,7 @@ func (suite *RepositoryIntegrationSuite) TestConnect_sameID() { // need to initialize the repository before we can test connecting to it. st := tester.NewPrefixedS3Storage(t) - r, err := repository.Initialize(ctx, account.Account{}, st, control.Options{}) + r, err := repository.Initialize(ctx, account.Account{}, st, control.Defaults()) require.NoError(t, err, clues.ToCore(err)) oldID := r.GetID() @@ -179,7 +179,7 @@ func (suite *RepositoryIntegrationSuite) TestConnect_sameID() { require.NoError(t, err, clues.ToCore(err)) // now re-connect - r, err = repository.Connect(ctx, account.Account{}, st, control.Options{}) + r, err = repository.Connect(ctx, account.Account{}, st, control.Defaults()) require.NoError(t, err, clues.ToCore(err)) assert.Equal(t, oldID, r.GetID()) } @@ -195,7 +195,7 @@ func (suite *RepositoryIntegrationSuite) TestNewBackup() { // need to initialize the repository before we can test connecting to it. st := tester.NewPrefixedS3Storage(t) - r, err := repository.Initialize(ctx, acct, st, control.Options{}) + r, err := repository.Initialize(ctx, acct, st, control.Defaults()) require.NoError(t, err, clues.ToCore(err)) userID := tester.M365UserID(t) @@ -217,7 +217,7 @@ func (suite *RepositoryIntegrationSuite) TestNewRestore() { // need to initialize the repository before we can test connecting to it. st := tester.NewPrefixedS3Storage(t) - r, err := repository.Initialize(ctx, acct, st, control.Options{}) + r, err := repository.Initialize(ctx, acct, st, control.Defaults()) require.NoError(t, err, clues.ToCore(err)) ro, err := r.NewRestore(ctx, "backup-id", selectors.Selector{DiscreteOwner: "test"}, dest) @@ -234,7 +234,7 @@ func (suite *RepositoryIntegrationSuite) TestConnect_DisableMetrics() { // need to initialize the repository before we can test connecting to it. st := tester.NewPrefixedS3Storage(t) - _, err := repository.Initialize(ctx, account.Account{}, st, control.Options{}) + _, err := repository.Initialize(ctx, account.Account{}, st, control.Defaults()) require.NoError(t, err) // now re-connect