move parallelism logs/checks upstream (#3003)
The fetch paralellism checks and logs occur on every item streamed from GC. This is a bit chatty, and has been moved upstream in the process for a more centralized behavior. --- #### Does this PR need a docs update or release note? - [x] ⛔ No #### Type of change - [x] 🧹 Tech Debt/Cleanup #### Test Plan - [x] 💪 Manual - [x] ⚡ Unit test
This commit is contained in:
parent
cfe8d576b8
commit
6405c8246e
@ -46,7 +46,7 @@ func prepM365Test(
|
|||||||
vpr, cfgFP := tester.MakeTempTestConfigClone(t, force)
|
vpr, cfgFP := tester.MakeTempTestConfigClone(t, force)
|
||||||
ctx = config.SetViper(ctx, vpr)
|
ctx = config.SetViper(ctx, vpr)
|
||||||
|
|
||||||
repo, err := repository.Initialize(ctx, acct, st, control.Options{})
|
repo, err := repository.Initialize(ctx, acct, st, control.Defaults())
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
return acct, st, repo, vpr, recorder, cfgFP
|
return acct, st, repo, vpr, recorder, cfgFP
|
||||||
|
|||||||
@ -19,7 +19,7 @@ func Control() control.Options {
|
|||||||
opt.SkipReduce = skipReduceFV
|
opt.SkipReduce = skipReduceFV
|
||||||
opt.ToggleFeatures.DisableIncrementals = disableIncrementalsFV
|
opt.ToggleFeatures.DisableIncrementals = disableIncrementalsFV
|
||||||
opt.ToggleFeatures.ExchangeImmutableIDs = enableImmutableID
|
opt.ToggleFeatures.ExchangeImmutableIDs = enableImmutableID
|
||||||
opt.ItemFetchParallelism = fetchParallelismFV
|
opt.Parallelism.ItemFetch = fetchParallelismFV
|
||||||
|
|
||||||
return opt
|
return opt
|
||||||
}
|
}
|
||||||
|
|||||||
@ -194,7 +194,7 @@ func (suite *S3E2ESuite) TestConnectS3Cmd() {
|
|||||||
ctx = config.SetViper(ctx, vpr)
|
ctx = config.SetViper(ctx, vpr)
|
||||||
|
|
||||||
// init the repo first
|
// init the repo first
|
||||||
_, err = repository.Initialize(ctx, account.Account{}, st, control.Options{})
|
_, err = repository.Initialize(ctx, account.Account{}, st, control.Defaults())
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
// then test it
|
// then test it
|
||||||
|
|||||||
@ -71,7 +71,7 @@ func handleExchangeEmailFactory(cmd *cobra.Command, args []string) error {
|
|||||||
subject, body, body,
|
subject, body, body,
|
||||||
now, now, now, now)
|
now, now, now, now)
|
||||||
},
|
},
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
errs)
|
errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Only(ctx, err)
|
return Only(ctx, err)
|
||||||
@ -117,7 +117,7 @@ func handleExchangeCalendarEventFactory(cmd *cobra.Command, args []string) error
|
|||||||
User, subject, body, body,
|
User, subject, body, body,
|
||||||
now, now, exchMock.NoRecurrence, exchMock.NoAttendees, false)
|
now, now, exchMock.NoRecurrence, exchMock.NoAttendees, false)
|
||||||
},
|
},
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
errs)
|
errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Only(ctx, err)
|
return Only(ctx, err)
|
||||||
@ -168,7 +168,7 @@ func handleExchangeContactFactory(cmd *cobra.Command, args []string) error {
|
|||||||
"123-456-7890",
|
"123-456-7890",
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
errs)
|
errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Only(ctx, err)
|
return Only(ctx, err)
|
||||||
|
|||||||
@ -46,6 +46,10 @@ func (gc *GraphConnector) ProduceBackupCollections(
|
|||||||
diagnostics.Index("service", sels.Service.String()))
|
diagnostics.Index("service", sels.Service.String()))
|
||||||
defer end()
|
defer end()
|
||||||
|
|
||||||
|
// Limit the max number of active requests to graph from this collection.
|
||||||
|
ctrlOpts.Parallelism.ItemFetch = graph.Parallelism(sels.PathService()).
|
||||||
|
ItemOverride(ctx, ctrlOpts.Parallelism.ItemFetch)
|
||||||
|
|
||||||
err := verifyBackupInputs(sels, gc.IDNameLookup.IDs())
|
err := verifyBackupInputs(sels, gc.IDNameLookup.IDs())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, clues.Stack(err).WithClues(ctx)
|
return nil, nil, clues.Stack(err).WithClues(ctx)
|
||||||
|
|||||||
@ -105,7 +105,7 @@ func (suite *DataCollectionIntgSuite) TestExchangeDataCollection() {
|
|||||||
nil,
|
nil,
|
||||||
connector.credentials,
|
connector.credentials,
|
||||||
connector.UpdateStatus,
|
connector.UpdateStatus,
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
assert.Empty(t, excludes)
|
assert.Empty(t, excludes)
|
||||||
@ -208,7 +208,7 @@ func (suite *DataCollectionIntgSuite) TestDataCollections_invalidResourceOwner()
|
|||||||
test.getSelector(t),
|
test.getSelector(t),
|
||||||
test.getSelector(t),
|
test.getSelector(t),
|
||||||
nil,
|
nil,
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
assert.Error(t, err, clues.ToCore(err))
|
assert.Error(t, err, clues.ToCore(err))
|
||||||
assert.Empty(t, collections)
|
assert.Empty(t, collections)
|
||||||
@ -263,7 +263,7 @@ func (suite *DataCollectionIntgSuite) TestSharePointDataCollection() {
|
|||||||
connector.credentials,
|
connector.credentials,
|
||||||
connector.Service,
|
connector.Service,
|
||||||
connector,
|
connector,
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
// Not expecting excludes as this isn't an incremental backup.
|
// Not expecting excludes as this isn't an incremental backup.
|
||||||
@ -345,7 +345,7 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Libraries() {
|
|||||||
sel.Selector,
|
sel.Selector,
|
||||||
sel.Selector,
|
sel.Selector,
|
||||||
nil,
|
nil,
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
require.Len(t, cols, 2) // 1 collection, 1 path prefix directory to ensure the root path exists.
|
require.Len(t, cols, 2) // 1 collection, 1 path prefix directory to ensure the root path exists.
|
||||||
@ -389,7 +389,7 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Lists() {
|
|||||||
sel.Selector,
|
sel.Selector,
|
||||||
sel.Selector,
|
sel.Selector,
|
||||||
nil,
|
nil,
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
assert.Less(t, 0, len(cols))
|
assert.Less(t, 0, len(cols))
|
||||||
|
|||||||
@ -271,7 +271,7 @@ func (suite *DataCollectionsIntegrationSuite) TestMailFetch() {
|
|||||||
ss,
|
ss,
|
||||||
test.scope,
|
test.scope,
|
||||||
DeltaPaths{},
|
DeltaPaths{},
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
func(status *support.ConnectorOperationStatus) {},
|
func(status *support.ConnectorOperationStatus) {},
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
@ -342,7 +342,7 @@ func (suite *DataCollectionsIntegrationSuite) TestDelta() {
|
|||||||
ss,
|
ss,
|
||||||
test.scope,
|
test.scope,
|
||||||
DeltaPaths{},
|
DeltaPaths{},
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
func(status *support.ConnectorOperationStatus) {},
|
func(status *support.ConnectorOperationStatus) {},
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
@ -373,7 +373,7 @@ func (suite *DataCollectionsIntegrationSuite) TestDelta() {
|
|||||||
ss,
|
ss,
|
||||||
test.scope,
|
test.scope,
|
||||||
dps,
|
dps,
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
func(status *support.ConnectorOperationStatus) {},
|
func(status *support.ConnectorOperationStatus) {},
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
@ -420,7 +420,7 @@ func (suite *DataCollectionsIntegrationSuite) TestMailSerializationRegression()
|
|||||||
ss,
|
ss,
|
||||||
sel.Scopes()[0],
|
sel.Scopes()[0],
|
||||||
DeltaPaths{},
|
DeltaPaths{},
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
newStatusUpdater(t, &wg),
|
newStatusUpdater(t, &wg),
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
@ -494,7 +494,7 @@ func (suite *DataCollectionsIntegrationSuite) TestContactSerializationRegression
|
|||||||
ss,
|
ss,
|
||||||
test.scope,
|
test.scope,
|
||||||
DeltaPaths{},
|
DeltaPaths{},
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
newStatusUpdater(t, &wg),
|
newStatusUpdater(t, &wg),
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
@ -608,7 +608,7 @@ func (suite *DataCollectionsIntegrationSuite) TestEventsSerializationRegression(
|
|||||||
ss,
|
ss,
|
||||||
test.scope,
|
test.scope,
|
||||||
DeltaPaths{},
|
DeltaPaths{},
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
newStatusUpdater(t, &wg),
|
newStatusUpdater(t, &wg),
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|||||||
@ -35,10 +35,6 @@ var (
|
|||||||
const (
|
const (
|
||||||
collectionChannelBufferSize = 1000
|
collectionChannelBufferSize = 1000
|
||||||
numberOfRetries = 4
|
numberOfRetries = 4
|
||||||
|
|
||||||
// Outlooks expects max 4 concurrent requests
|
|
||||||
// https://learn.microsoft.com/en-us/graph/throttling-limits#outlook-service-limits
|
|
||||||
urlPrefetchChannelBufferSize = 4
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type itemer interface {
|
type itemer interface {
|
||||||
@ -196,22 +192,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Limit the max number of active requests to GC
|
semaphoreCh := make(chan struct{}, col.ctrl.Parallelism.ItemFetch)
|
||||||
fetchParallelism := col.ctrl.ItemFetchParallelism
|
|
||||||
if fetchParallelism < 1 || fetchParallelism > urlPrefetchChannelBufferSize {
|
|
||||||
fetchParallelism = urlPrefetchChannelBufferSize
|
|
||||||
logger.Ctx(ctx).Infow(
|
|
||||||
"fetch parallelism value not set or out of bounds, using default",
|
|
||||||
"default_parallelism",
|
|
||||||
urlPrefetchChannelBufferSize,
|
|
||||||
"requested_parallellism",
|
|
||||||
col.ctrl.ItemFetchParallelism,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Ctx(ctx).Infow("fetching data with parallelism", "fetch_parallelism", fetchParallelism)
|
|
||||||
|
|
||||||
semaphoreCh := make(chan struct{}, fetchParallelism)
|
|
||||||
defer close(semaphoreCh)
|
defer close(semaphoreCh)
|
||||||
|
|
||||||
// delete all removed items
|
// delete all removed items
|
||||||
|
|||||||
@ -179,7 +179,7 @@ func (suite *ExchangeDataCollectionSuite) TestNewCollection_state() {
|
|||||||
test.curr, test.prev, test.loc,
|
test.curr, test.prev, test.loc,
|
||||||
0,
|
0,
|
||||||
&mockItemer{}, nil,
|
&mockItemer{}, nil,
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
false)
|
false)
|
||||||
assert.Equal(t, test.expect, c.State(), "collection state")
|
assert.Equal(t, test.expect, c.State(), "collection state")
|
||||||
assert.Equal(t, test.curr, c.fullPath, "full path")
|
assert.Equal(t, test.curr, c.fullPath, "full path")
|
||||||
|
|||||||
@ -838,7 +838,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
|
|||||||
test.resolver,
|
test.resolver,
|
||||||
allScope,
|
allScope,
|
||||||
test.dps,
|
test.dps,
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
assert.NoError(t, err, clues.ToCore(err))
|
assert.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,12 @@
|
|||||||
package graph
|
package graph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
|
)
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// item response AdditionalData
|
// item response AdditionalData
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
@ -25,3 +32,84 @@ const (
|
|||||||
// given endpoint.
|
// given endpoint.
|
||||||
PreviousPathFileName = "previouspath"
|
PreviousPathFileName = "previouspath"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Runtime Configuration
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type parallelism struct {
|
||||||
|
// sets the collection buffer size before blocking.
|
||||||
|
collectionBuffer int
|
||||||
|
// sets the parallelism of item population within a collection.
|
||||||
|
item int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p parallelism) CollectionBufferSize() int {
|
||||||
|
if p.collectionBuffer == 0 {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.collectionBuffer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p parallelism) CollectionBufferOverride(ctx context.Context, override int) int {
|
||||||
|
logger.Ctx(ctx).Infow(
|
||||||
|
"collection buffer parallelism",
|
||||||
|
"default_parallelism", p.collectionBuffer,
|
||||||
|
"requested_paralellism", override)
|
||||||
|
|
||||||
|
if !isWithin(1, p.collectionBuffer, override) {
|
||||||
|
return p.collectionBuffer
|
||||||
|
}
|
||||||
|
|
||||||
|
return override
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p parallelism) ItemOverride(ctx context.Context, override int) int {
|
||||||
|
logger.Ctx(ctx).Infow(
|
||||||
|
"item-level parallelism",
|
||||||
|
"default_parallelism", p.item,
|
||||||
|
"requested_paralellism", override)
|
||||||
|
|
||||||
|
if !isWithin(1, p.item, override) {
|
||||||
|
return p.item
|
||||||
|
}
|
||||||
|
|
||||||
|
return override
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p parallelism) Item() int {
|
||||||
|
if p.item == 0 {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.item
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns low <= v <= high
|
||||||
|
// if high < low, returns low <= v
|
||||||
|
func isWithin(low, high, v int) bool {
|
||||||
|
return v >= low && (high < low || v <= high)
|
||||||
|
}
|
||||||
|
|
||||||
|
var sp = map[path.ServiceType]parallelism{
|
||||||
|
path.ExchangeService: {
|
||||||
|
collectionBuffer: 4,
|
||||||
|
item: 4,
|
||||||
|
},
|
||||||
|
path.OneDriveService: {
|
||||||
|
collectionBuffer: 5,
|
||||||
|
item: 4,
|
||||||
|
},
|
||||||
|
// sharepoint libraries are considered "onedrive" parallelism.
|
||||||
|
// this only controls lists/pages.
|
||||||
|
path.SharePointService: {
|
||||||
|
collectionBuffer: 5,
|
||||||
|
item: 4,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parallelism returns the Parallelism for the requested service.
|
||||||
|
func Parallelism(srv path.ServiceType) parallelism {
|
||||||
|
return sp[srv]
|
||||||
|
}
|
||||||
|
|||||||
40
src/internal/connector/graph/consts_test.go
Normal file
40
src/internal/connector/graph/consts_test.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
package graph
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ConstsUnitSuite struct {
|
||||||
|
tester.Suite
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConstsUnitSuite(t *testing.T) {
|
||||||
|
suite.Run(t, &ConstsUnitSuite{Suite: tester.NewUnitSuite(t)})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *ConstsUnitSuite) TestIsWithin() {
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
low, high, v int
|
||||||
|
expect assert.BoolAssertionFunc
|
||||||
|
}{
|
||||||
|
{"1 < 3 < 5", 1, 5, 3, assert.True},
|
||||||
|
{"1 < 3, no high", 1, 0, 3, assert.True},
|
||||||
|
{"1 <= 1 <= 1", 1, 1, 1, assert.True},
|
||||||
|
{"1 <= 1 <= 5", 1, 5, 1, assert.True},
|
||||||
|
{"1 <= 5 <= 5", 1, 5, 5, assert.True},
|
||||||
|
{"1 <= 0 <= 2", 1, 1, 0, assert.False},
|
||||||
|
{"1 <= 3 <= 2", 1, 1, 3, assert.False},
|
||||||
|
}
|
||||||
|
for _, test := range table {
|
||||||
|
suite.Run(test.name, func() {
|
||||||
|
t := suite.T()
|
||||||
|
test.expect(t, isWithin(test.low, test.high, test.v))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -28,13 +28,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// TODO: This number needs to be tuned
|
|
||||||
// Consider max open file limit `ulimit -n`, usually 1024 when setting this value
|
|
||||||
collectionChannelBufferSize = 5
|
|
||||||
|
|
||||||
// TODO: Tune this later along with collectionChannelBufferSize
|
|
||||||
urlPrefetchChannelBufferSize = 5
|
|
||||||
|
|
||||||
// Used to compare in case of OneNote files
|
// Used to compare in case of OneNote files
|
||||||
MaxOneNoteFileSize = 2 * 1024 * 1024 * 1024
|
MaxOneNoteFileSize = 2 * 1024 * 1024 * 1024
|
||||||
)
|
)
|
||||||
@ -179,7 +172,7 @@ func NewCollection(
|
|||||||
driveID: driveID,
|
driveID: driveID,
|
||||||
source: source,
|
source: source,
|
||||||
service: service,
|
service: service,
|
||||||
data: make(chan data.Stream, collectionChannelBufferSize),
|
data: make(chan data.Stream, graph.Parallelism(path.OneDriveMetadataService).CollectionBufferSize()),
|
||||||
statusUpdater: statusUpdater,
|
statusUpdater: statusUpdater,
|
||||||
ctrl: ctrlOpts,
|
ctrl: ctrlOpts,
|
||||||
state: data.StateOf(prevPath, folderPath),
|
state: data.StateOf(prevPath, folderPath),
|
||||||
@ -489,7 +482,7 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
|
|||||||
defer colCloser()
|
defer colCloser()
|
||||||
defer close(folderProgress)
|
defer close(folderProgress)
|
||||||
|
|
||||||
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
|
semaphoreCh := make(chan struct{}, graph.Parallelism(path.OneDriveService).Item())
|
||||||
defer close(semaphoreCh)
|
defer close(semaphoreCh)
|
||||||
|
|
||||||
for _, item := range oc.driveItems {
|
for _, item := range oc.driveItems {
|
||||||
|
|||||||
@ -116,7 +116,7 @@ func (suite *SharePointLibrariesUnitSuite) TestUpdateCollections() {
|
|||||||
testFolderMatcher{test.scope},
|
testFolderMatcher{test.scope},
|
||||||
&MockGraphService{},
|
&MockGraphService{},
|
||||||
nil,
|
nil,
|
||||||
control.Options{})
|
control.Defaults())
|
||||||
|
|
||||||
c.CollectionMap = collMap
|
c.CollectionMap = collMap
|
||||||
|
|
||||||
|
|||||||
@ -52,7 +52,7 @@ func (suite *EventsIntegrationSuite) TestNewBus() {
|
|||||||
)
|
)
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
b, err := events.NewBus(ctx, s, a.ID(), control.Options{})
|
b, err := events.NewBus(ctx, s, a.ID(), control.Defaults())
|
||||||
require.NotEmpty(t, b)
|
require.NotEmpty(t, b)
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
|
|||||||
@ -234,7 +234,10 @@ func (op *BackupOperation) do(
|
|||||||
fallbackReasons = makeFallbackReasons(op.Selectors)
|
fallbackReasons = makeFallbackReasons(op.Selectors)
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.Ctx(ctx).With("selectors", op.Selectors).Info("backing up selection")
|
logger.Ctx(ctx).With(
|
||||||
|
"control_options", op.Options,
|
||||||
|
"selectors", op.Selectors).
|
||||||
|
Info("backing up selection")
|
||||||
|
|
||||||
// should always be 1, since backups are 1:1 with resourceOwners.
|
// should always be 1, since backups are 1:1 with resourceOwners.
|
||||||
opStats.resourceCount = 1
|
opStats.resourceCount = 1
|
||||||
|
|||||||
@ -140,7 +140,7 @@ func newTestBackupOp(
|
|||||||
//revive:enable:context-as-argument
|
//revive:enable:context-as-argument
|
||||||
var (
|
var (
|
||||||
sw = store.NewKopiaStore(ms)
|
sw = store.NewKopiaStore(ms)
|
||||||
opts = control.Options{}
|
opts = control.Defaults()
|
||||||
)
|
)
|
||||||
|
|
||||||
opts.ToggleFeatures = featureToggles
|
opts.ToggleFeatures = featureToggles
|
||||||
@ -532,14 +532,16 @@ func (suite *BackupOpIntegrationSuite) SetupSuite() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
||||||
kw := &kopia.Wrapper{}
|
var (
|
||||||
sw := &store.Wrapper{}
|
kw = &kopia.Wrapper{}
|
||||||
gc := &mock.GraphConnector{}
|
sw = &store.Wrapper{}
|
||||||
acct := tester.NewM365Account(suite.T())
|
gc = &mock.GraphConnector{}
|
||||||
|
acct = tester.NewM365Account(suite.T())
|
||||||
|
opts = control.Defaults()
|
||||||
|
)
|
||||||
|
|
||||||
table := []struct {
|
table := []struct {
|
||||||
name string
|
name string
|
||||||
opts control.Options
|
|
||||||
kw *kopia.Wrapper
|
kw *kopia.Wrapper
|
||||||
sw *store.Wrapper
|
sw *store.Wrapper
|
||||||
bp inject.BackupProducer
|
bp inject.BackupProducer
|
||||||
@ -547,10 +549,10 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
|||||||
targets []string
|
targets []string
|
||||||
errCheck assert.ErrorAssertionFunc
|
errCheck assert.ErrorAssertionFunc
|
||||||
}{
|
}{
|
||||||
{"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError},
|
{"good", kw, sw, gc, acct, nil, assert.NoError},
|
||||||
{"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error},
|
{"missing kopia", nil, sw, gc, acct, nil, assert.Error},
|
||||||
{"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error},
|
{"missing modelstore", kw, nil, gc, acct, nil, assert.Error},
|
||||||
{"missing backup producer", control.Options{}, kw, sw, nil, acct, nil, assert.Error},
|
{"missing backup producer", kw, sw, nil, acct, nil, assert.Error},
|
||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
@ -561,7 +563,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
|||||||
|
|
||||||
_, err := NewBackupOperation(
|
_, err := NewBackupOperation(
|
||||||
ctx,
|
ctx,
|
||||||
test.opts,
|
opts,
|
||||||
test.kw,
|
test.kw,
|
||||||
test.sw,
|
test.sw,
|
||||||
test.bp,
|
test.bp,
|
||||||
|
|||||||
@ -451,7 +451,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() {
|
|||||||
|
|
||||||
op, err := NewBackupOperation(
|
op, err := NewBackupOperation(
|
||||||
ctx,
|
ctx,
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
kw,
|
kw,
|
||||||
sw,
|
sw,
|
||||||
gc,
|
gc,
|
||||||
|
|||||||
@ -25,7 +25,7 @@ func TestOperationSuite(t *testing.T) {
|
|||||||
|
|
||||||
func (suite *OperationSuite) TestNewOperation() {
|
func (suite *OperationSuite) TestNewOperation() {
|
||||||
t := suite.T()
|
t := suite.T()
|
||||||
op := newOperation(control.Options{}, events.Bus{}, nil, nil)
|
op := newOperation(control.Defaults(), events.Bus{}, nil, nil)
|
||||||
assert.Greater(t, op.CreatedAt, time.Time{})
|
assert.Greater(t, op.CreatedAt, time.Time{})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,7 +45,7 @@ func (suite *OperationSuite) TestOperation_Validate() {
|
|||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
err := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw).validate()
|
err := newOperation(control.Defaults(), events.Bus{}, test.kw, test.sw).validate()
|
||||||
test.errCheck(suite.T(), err, clues.ToCore(err))
|
test.errCheck(suite.T(), err, clues.ToCore(err))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -212,7 +212,7 @@ func (op *RestoreOperation) do(
|
|||||||
})
|
})
|
||||||
|
|
||||||
observe.Message(ctx, fmt.Sprintf("Discovered %d items in backup %s to restore", len(paths), op.BackupID))
|
observe.Message(ctx, fmt.Sprintf("Discovered %d items in backup %s to restore", len(paths), op.BackupID))
|
||||||
logger.Ctx(ctx).With("selectors", op.Selectors).Info("restoring selection")
|
logger.Ctx(ctx).With("control_options", op.Options, "selectors", op.Selectors).Info("restoring selection")
|
||||||
|
|
||||||
kopiaComplete, closer := observe.MessageWithCompletion(ctx, "Enumerating items in repository")
|
kopiaComplete, closer := observe.MessageWithCompletion(ctx, "Enumerating items in repository")
|
||||||
defer closer()
|
defer closer()
|
||||||
|
|||||||
@ -106,7 +106,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
|
|||||||
|
|
||||||
op, err := NewRestoreOperation(
|
op, err := NewRestoreOperation(
|
||||||
ctx,
|
ctx,
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
kw,
|
kw,
|
||||||
sw,
|
sw,
|
||||||
gc,
|
gc,
|
||||||
@ -213,15 +213,17 @@ func (suite *RestoreOpIntegrationSuite) TearDownSuite() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
|
func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
|
||||||
kw := &kopia.Wrapper{}
|
var (
|
||||||
sw := &store.Wrapper{}
|
kw = &kopia.Wrapper{}
|
||||||
gc := &mock.GraphConnector{}
|
sw = &store.Wrapper{}
|
||||||
acct := tester.NewM365Account(suite.T())
|
gc = &mock.GraphConnector{}
|
||||||
dest := tester.DefaultTestRestoreDestination()
|
acct = tester.NewM365Account(suite.T())
|
||||||
|
dest = tester.DefaultTestRestoreDestination()
|
||||||
|
opts = control.Defaults()
|
||||||
|
)
|
||||||
|
|
||||||
table := []struct {
|
table := []struct {
|
||||||
name string
|
name string
|
||||||
opts control.Options
|
|
||||||
kw *kopia.Wrapper
|
kw *kopia.Wrapper
|
||||||
sw *store.Wrapper
|
sw *store.Wrapper
|
||||||
rc inject.RestoreConsumer
|
rc inject.RestoreConsumer
|
||||||
@ -229,10 +231,10 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
|
|||||||
targets []string
|
targets []string
|
||||||
errCheck assert.ErrorAssertionFunc
|
errCheck assert.ErrorAssertionFunc
|
||||||
}{
|
}{
|
||||||
{"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError},
|
{"good", kw, sw, gc, acct, nil, assert.NoError},
|
||||||
{"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error},
|
{"missing kopia", nil, sw, gc, acct, nil, assert.Error},
|
||||||
{"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error},
|
{"missing modelstore", kw, nil, gc, acct, nil, assert.Error},
|
||||||
{"missing restore consumer", control.Options{}, kw, sw, nil, acct, nil, assert.Error},
|
{"missing restore consumer", kw, sw, nil, acct, nil, assert.Error},
|
||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
@ -241,7 +243,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
|
|||||||
|
|
||||||
_, err := NewRestoreOperation(
|
_, err := NewRestoreOperation(
|
||||||
ctx,
|
ctx,
|
||||||
test.opts,
|
opts,
|
||||||
test.kw,
|
test.kw,
|
||||||
test.sw,
|
test.sw,
|
||||||
test.rc,
|
test.rc,
|
||||||
@ -280,7 +282,7 @@ func setupExchangeBackup(
|
|||||||
|
|
||||||
bo, err := NewBackupOperation(
|
bo, err := NewBackupOperation(
|
||||||
ctx,
|
ctx,
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
kw,
|
kw,
|
||||||
sw,
|
sw,
|
||||||
gc,
|
gc,
|
||||||
@ -331,7 +333,7 @@ func setupSharePointBackup(
|
|||||||
|
|
||||||
bo, err := NewBackupOperation(
|
bo, err := NewBackupOperation(
|
||||||
ctx,
|
ctx,
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
kw,
|
kw,
|
||||||
sw,
|
sw,
|
||||||
gc,
|
gc,
|
||||||
@ -475,7 +477,7 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_errorNoResults() {
|
|||||||
|
|
||||||
ro, err := NewRestoreOperation(
|
ro, err := NewRestoreOperation(
|
||||||
ctx,
|
ctx,
|
||||||
control.Options{},
|
control.Defaults(),
|
||||||
suite.kw,
|
suite.kw,
|
||||||
suite.sw,
|
suite.sw,
|
||||||
gc,
|
gc,
|
||||||
|
|||||||
@ -6,17 +6,24 @@ import (
|
|||||||
|
|
||||||
// Options holds the optional configurations for a process
|
// Options holds the optional configurations for a process
|
||||||
type Options struct {
|
type Options struct {
|
||||||
Collision CollisionPolicy `json:"-"`
|
Collision CollisionPolicy `json:"-"`
|
||||||
DisableMetrics bool `json:"disableMetrics"`
|
DisableMetrics bool `json:"disableMetrics"`
|
||||||
FailureHandling FailureBehavior `json:"failureHandling"`
|
FailureHandling FailureBehavior `json:"failureHandling"`
|
||||||
ItemFetchParallelism int `json:"itemFetchParallelism"`
|
RestorePermissions bool `json:"restorePermissions"`
|
||||||
RestorePermissions bool `json:"restorePermissions"`
|
SkipReduce bool `json:"skipReduce"`
|
||||||
SkipReduce bool `json:"skipReduce"`
|
ToggleFeatures Toggles `json:"toggleFeatures"`
|
||||||
ToggleFeatures Toggles `json:"ToggleFeatures"`
|
Parallelism Parallelism `json:"parallelism"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type FailureBehavior string
|
type FailureBehavior string
|
||||||
|
|
||||||
|
type Parallelism struct {
|
||||||
|
// sets the collection buffer size before blocking.
|
||||||
|
CollectionBuffer int
|
||||||
|
// sets the parallelism of item population within a collection.
|
||||||
|
ItemFetch int
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// fails and exits the run immediately
|
// fails and exits the run immediately
|
||||||
FailFast FailureBehavior = "fail-fast"
|
FailFast FailureBehavior = "fail-fast"
|
||||||
@ -31,6 +38,10 @@ func Defaults() Options {
|
|||||||
return Options{
|
return Options{
|
||||||
FailureHandling: FailAfterRecovery,
|
FailureHandling: FailAfterRecovery,
|
||||||
ToggleFeatures: Toggles{},
|
ToggleFeatures: Toggles{},
|
||||||
|
Parallelism: Parallelism{
|
||||||
|
CollectionBuffer: 4,
|
||||||
|
ItemFetch: 4,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -54,7 +54,7 @@ func (suite *RepositoryUnitSuite) TestInitialize() {
|
|||||||
st, err := test.storage()
|
st, err := test.storage()
|
||||||
assert.NoError(t, err, clues.ToCore(err))
|
assert.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
_, err = repository.Initialize(ctx, test.account, st, control.Options{})
|
_, err = repository.Initialize(ctx, test.account, st, control.Defaults())
|
||||||
test.errCheck(t, err, clues.ToCore(err))
|
test.errCheck(t, err, clues.ToCore(err))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -88,7 +88,7 @@ func (suite *RepositoryUnitSuite) TestConnect() {
|
|||||||
st, err := test.storage()
|
st, err := test.storage()
|
||||||
assert.NoError(t, err, clues.ToCore(err))
|
assert.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
_, err = repository.Connect(ctx, test.account, st, control.Options{})
|
_, err = repository.Connect(ctx, test.account, st, control.Defaults())
|
||||||
test.errCheck(t, err, clues.ToCore(err))
|
test.errCheck(t, err, clues.ToCore(err))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -131,7 +131,7 @@ func (suite *RepositoryIntegrationSuite) TestInitialize() {
|
|||||||
t := suite.T()
|
t := suite.T()
|
||||||
|
|
||||||
st := test.storage(t)
|
st := test.storage(t)
|
||||||
r, err := repository.Initialize(ctx, test.account, st, control.Options{})
|
r, err := repository.Initialize(ctx, test.account, st, control.Defaults())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
defer func() {
|
defer func() {
|
||||||
err := r.Close(ctx)
|
err := r.Close(ctx)
|
||||||
@ -153,11 +153,11 @@ func (suite *RepositoryIntegrationSuite) TestConnect() {
|
|||||||
// need to initialize the repository before we can test connecting to it.
|
// need to initialize the repository before we can test connecting to it.
|
||||||
st := tester.NewPrefixedS3Storage(t)
|
st := tester.NewPrefixedS3Storage(t)
|
||||||
|
|
||||||
_, err := repository.Initialize(ctx, account.Account{}, st, control.Options{})
|
_, err := repository.Initialize(ctx, account.Account{}, st, control.Defaults())
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
// now re-connect
|
// now re-connect
|
||||||
_, err = repository.Connect(ctx, account.Account{}, st, control.Options{})
|
_, err = repository.Connect(ctx, account.Account{}, st, control.Defaults())
|
||||||
assert.NoError(t, err, clues.ToCore(err))
|
assert.NoError(t, err, clues.ToCore(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -170,7 +170,7 @@ func (suite *RepositoryIntegrationSuite) TestConnect_sameID() {
|
|||||||
// need to initialize the repository before we can test connecting to it.
|
// need to initialize the repository before we can test connecting to it.
|
||||||
st := tester.NewPrefixedS3Storage(t)
|
st := tester.NewPrefixedS3Storage(t)
|
||||||
|
|
||||||
r, err := repository.Initialize(ctx, account.Account{}, st, control.Options{})
|
r, err := repository.Initialize(ctx, account.Account{}, st, control.Defaults())
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
oldID := r.GetID()
|
oldID := r.GetID()
|
||||||
@ -179,7 +179,7 @@ func (suite *RepositoryIntegrationSuite) TestConnect_sameID() {
|
|||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
// now re-connect
|
// now re-connect
|
||||||
r, err = repository.Connect(ctx, account.Account{}, st, control.Options{})
|
r, err = repository.Connect(ctx, account.Account{}, st, control.Defaults())
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
assert.Equal(t, oldID, r.GetID())
|
assert.Equal(t, oldID, r.GetID())
|
||||||
}
|
}
|
||||||
@ -195,7 +195,7 @@ func (suite *RepositoryIntegrationSuite) TestNewBackup() {
|
|||||||
// need to initialize the repository before we can test connecting to it.
|
// need to initialize the repository before we can test connecting to it.
|
||||||
st := tester.NewPrefixedS3Storage(t)
|
st := tester.NewPrefixedS3Storage(t)
|
||||||
|
|
||||||
r, err := repository.Initialize(ctx, acct, st, control.Options{})
|
r, err := repository.Initialize(ctx, acct, st, control.Defaults())
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
userID := tester.M365UserID(t)
|
userID := tester.M365UserID(t)
|
||||||
@ -217,7 +217,7 @@ func (suite *RepositoryIntegrationSuite) TestNewRestore() {
|
|||||||
// need to initialize the repository before we can test connecting to it.
|
// need to initialize the repository before we can test connecting to it.
|
||||||
st := tester.NewPrefixedS3Storage(t)
|
st := tester.NewPrefixedS3Storage(t)
|
||||||
|
|
||||||
r, err := repository.Initialize(ctx, acct, st, control.Options{})
|
r, err := repository.Initialize(ctx, acct, st, control.Defaults())
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
ro, err := r.NewRestore(ctx, "backup-id", selectors.Selector{DiscreteOwner: "test"}, dest)
|
ro, err := r.NewRestore(ctx, "backup-id", selectors.Selector{DiscreteOwner: "test"}, dest)
|
||||||
@ -234,7 +234,7 @@ func (suite *RepositoryIntegrationSuite) TestConnect_DisableMetrics() {
|
|||||||
// need to initialize the repository before we can test connecting to it.
|
// need to initialize the repository before we can test connecting to it.
|
||||||
st := tester.NewPrefixedS3Storage(t)
|
st := tester.NewPrefixedS3Storage(t)
|
||||||
|
|
||||||
_, err := repository.Initialize(ctx, account.Account{}, st, control.Options{})
|
_, err := repository.Initialize(ctx, account.Account{}, st, control.Defaults())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// now re-connect
|
// now re-connect
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user