allow users to limit page size (#3875)

allows cli users to limit the page size of delta queries by calling a new hidden flag: --delta-page-size.
This also adds the control.Options struct to the api client, so that configurations such as this can be easily handed into, and used by, the client.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🌻 Feature

#### Test Plan

- [x] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-07-21 18:00:09 -06:00 committed by GitHub
parent 62d4c68c04
commit 0d6b08204d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 96 additions and 50 deletions

View File

@ -94,6 +94,7 @@ func addExchangeCommands(cmd *cobra.Command) *cobra.Command {
flags.AddDisableDeltaFlag(c)
flags.AddEnableImmutableIDFlag(c)
flags.AddDisableConcurrencyLimiterFlag(c)
flags.AddDeltaPageSizeFlag(c)
case listCommand:
c, fs = utils.AddCommand(cmd, exchangeListCmd())
@ -175,7 +176,7 @@ func createExchangeCmd(cmd *cobra.Command, args []string) error {
sel := exchangeBackupCreateSelectors(flags.UserFV, flags.CategoryDataFV)
ins, err := utils.UsersMap(ctx, *acct, fault.New(true))
ins, err := utils.UsersMap(ctx, *acct, utils.Control(), fault.New(true))
if err != nil {
return Only(ctx, clues.Wrap(err, "Failed to retrieve M365 users"))
}

View File

@ -37,11 +37,11 @@ func (suite *ExchangeUnitSuite) TestAddExchangeCommands() {
expectRunE func(*cobra.Command, []string) error
}{
{
"create exchange",
createCommand,
expectUse + " " + exchangeServiceCommandCreateUseSuffix,
exchangeCreateCmd().Short,
[]string{
name: "create exchange",
use: createCommand,
expectUse: expectUse + " " + exchangeServiceCommandCreateUseSuffix,
expectShort: exchangeCreateCmd().Short,
flags: []string{
flags.UserFN,
flags.CategoryDataFN,
flags.DisableIncrementalsFN,
@ -50,28 +50,29 @@ func (suite *ExchangeUnitSuite) TestAddExchangeCommands() {
flags.FetchParallelismFN,
flags.SkipReduceFN,
flags.NoStatsFN,
flags.DeltaPageSizeFN,
},
createExchangeCmd,
expectRunE: createExchangeCmd,
},
{
"list exchange",
listCommand,
expectUse,
exchangeListCmd().Short,
[]string{
name: "list exchange",
use: listCommand,
expectUse: expectUse,
expectShort: exchangeListCmd().Short,
flags: []string{
flags.BackupFN,
flags.FailedItemsFN,
flags.SkippedItemsFN,
flags.RecoveredErrorsFN,
},
listExchangeCmd,
expectRunE: listExchangeCmd,
},
{
"details exchange",
detailsCommand,
expectUse + " " + exchangeServiceCommandDetailsUseSuffix,
exchangeDetailsCmd().Short,
[]string{
name: "details exchange",
use: detailsCommand,
expectUse: expectUse + " " + exchangeServiceCommandDetailsUseSuffix,
expectShort: exchangeDetailsCmd().Short,
flags: []string{
flags.BackupFN,
flags.ContactFN,
flags.ContactFolderFN,
@ -90,7 +91,7 @@ func (suite *ExchangeUnitSuite) TestAddExchangeCommands() {
flags.EventStartsBeforeFN,
flags.EventSubjectFN,
},
detailsExchangeCmd,
expectRunE: detailsExchangeCmd,
},
{
"delete exchange",

View File

@ -157,7 +157,7 @@ func createOneDriveCmd(cmd *cobra.Command, args []string) error {
sel := oneDriveBackupCreateSelectors(flags.UserFV)
ins, err := utils.UsersMap(ctx, *acct, fault.New(true))
ins, err := utils.UsersMap(ctx, *acct, utils.Control(), fault.New(true))
if err != nil {
return Only(ctx, clues.Wrap(err, "Failed to retrieve M365 users"))
}

View File

@ -5,6 +5,7 @@ import (
)
const (
DeltaPageSizeFN = "delta-page-size"
DisableConcurrencyLimiterFN = "disable-concurrency-limiter"
DisableDeltaFN = "disable-delta"
DisableIncrementalsFN = "disable-incrementals"
@ -21,6 +22,7 @@ const (
)
var (
DeltaPageSizeFV int
DisableConcurrencyLimiterFV bool
DisableDeltaFV bool
DisableIncrementalsFV bool
@ -72,6 +74,18 @@ func AddSkipReduceFlag(cmd *cobra.Command) {
cobra.CheckErr(fs.MarkHidden(SkipReduceFN))
}
// AddDeltaPageSizeFlag adds a hidden flag that allows callers to reduce delta
// query page sizes below 500.
func AddDeltaPageSizeFlag(cmd *cobra.Command) {
fs := cmd.Flags()
fs.IntVar(
&DeltaPageSizeFV,
DeltaPageSizeFN,
500,
"Control quantity of items returned in paged queries. Valid range is [1-500]. Default: 500")
cobra.CheckErr(fs.MarkHidden(DeltaPageSizeFN))
}
// AddFetchParallelismFlag adds a hidden flag that allows callers to reduce call
// paralellism (ie, the corso worker pool size) from 4 to as low as 1.
func AddFetchParallelismFlag(cmd *cobra.Command) {

View File

@ -14,6 +14,12 @@ func Control() control.Options {
opt.FailureHandling = control.FailFast
}
dps := int32(flags.DeltaPageSizeFV)
if dps > 500 || dps < 1 {
dps = 500
}
opt.DeltaPageSize = dps
opt.DisableMetrics = flags.NoStatsFV
opt.RestorePermissions = flags.RestorePermissionsFV
opt.SkipReduce = flags.SkipReduceFV

View File

@ -35,6 +35,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
assert.True(t, flags.SkipReduceFV, flags.SkipReduceFN)
assert.Equal(t, 2, flags.FetchParallelismFV, flags.FetchParallelismFN)
assert.True(t, flags.DisableConcurrencyLimiterFV, flags.DisableConcurrencyLimiterFN)
assert.Equal(t, 499, flags.DeltaPageSizeFV, flags.DeltaPageSizeFN)
},
}
@ -48,6 +49,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
flags.AddSkipReduceFlag(cmd)
flags.AddFetchParallelismFlag(cmd)
flags.AddDisableConcurrencyLimiterFlag(cmd)
flags.AddDeltaPageSizeFlag(cmd)
// Test arg parsing for few args
cmd.SetArgs([]string{
@ -60,6 +62,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
"--" + flags.SkipReduceFN,
"--" + flags.FetchParallelismFN, "2",
"--" + flags.DisableConcurrencyLimiterFN,
"--" + flags.DeltaPageSizeFN, "499",
})
err := cmd.Execute()

View File

@ -48,6 +48,8 @@ var (
Destination = "destination"
RestorePermissions = true
DeltaPageSize = "deltaPageSize"
AzureClientID = "testAzureClientId"
AzureTenantID = "testAzureTenantId"
AzureClientSecret = "testAzureClientSecret"

View File

@ -7,6 +7,7 @@ import (
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -15,9 +16,10 @@ import (
func UsersMap(
ctx context.Context,
acct account.Account,
co control.Options,
errs *fault.Bus,
) (idname.Cacher, error) {
au, err := makeUserAPI(acct)
au, err := makeUserAPI(acct, co)
if err != nil {
return nil, clues.Wrap(err, "constructing a graph client")
}
@ -25,13 +27,13 @@ func UsersMap(
return au.GetAllIDsAndNames(ctx, errs)
}
func makeUserAPI(acct account.Account) (api.Users, error) {
func makeUserAPI(acct account.Account, co control.Options) (api.Users, error) {
creds, err := acct.M365Config()
if err != nil {
return api.Users{}, clues.Wrap(err, "getting m365 account creds")
}
cli, err := api.NewClient(creds)
cli, err := api.NewClient(creds, co)
if err != nil {
return api.Users{}, clues.Wrap(err, "constructing api client")
}

View File

@ -82,8 +82,8 @@ var (
RudderStackDataPlaneURL string
)
func NewBus(ctx context.Context, s storage.Storage, tenID string, opts control.Options) (Bus, error) {
if opts.DisableMetrics {
func NewBus(ctx context.Context, s storage.Storage, tenID string, co control.Options) (Bus, error) {
if co.DisableMetrics {
return Bus{}, nil
}

View File

@ -57,7 +57,7 @@ func (suite *DataCollectionIntgSuite) SetupSuite() {
suite.tenantID = creds.AzureTenantID
suite.ac, err = api.NewClient(creds)
suite.ac, err = api.NewClient(creds, control.Defaults())
require.NoError(t, err, clues.ToCore(err))
}

View File

@ -69,7 +69,7 @@ func NewController(
return nil, clues.Wrap(err, "retrieving m365 account configuration").WithClues(ctx)
}
ac, err := api.NewClient(creds)
ac, err := api.NewClient(creds, co)
if err != nil {
return nil, clues.Wrap(err, "creating api client").WithClues(ctx)
}

View File

@ -414,7 +414,7 @@ func (suite *BackupIntgSuite) SetupSuite() {
creds, err := acct.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.ac, err = api.NewClient(creds)
suite.ac, err = api.NewClient(creds, control.Defaults())
require.NoError(t, err, clues.ToCore(err))
suite.tenantID = creds.AzureTenantID

View File

@ -17,6 +17,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -698,7 +699,7 @@ func (suite *ContainerResolverSuite) SetupSuite() {
}
func (suite *ContainerResolverSuite) TestPopulate() {
ac, err := api.NewClient(suite.credentials)
ac, err := api.NewClient(suite.credentials, control.Defaults())
require.NoError(suite.T(), err, clues.ToCore(err))
eventFunc := func(t *testing.T) graph.ContainerResolver {

View File

@ -9,6 +9,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -30,7 +31,7 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup {
its.creds = creds
its.ac, err = api.NewClient(creds)
its.ac, err = api.NewClient(creds, control.Defaults())
require.NoError(t, err, clues.ToCore(err))
its.userID = tconfig.GetM365UserID(ctx)

View File

@ -12,6 +12,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -83,7 +84,7 @@ func (suite *MailFolderCacheIntegrationSuite) TestDeltaFetch() {
ctx, flush := tester.NewContext(t)
defer flush()
ac, err := api.NewClient(suite.credentials)
ac, err := api.NewClient(suite.credentials, control.Defaults())
require.NoError(t, err, clues.ToCore(err))
acm := ac.Mail()

View File

@ -44,7 +44,7 @@ func (suite *RestoreIntgSuite) SetupSuite() {
require.NoError(t, err, clues.ToCore(err))
suite.credentials = m365
suite.ac, err = api.NewClient(m365)
suite.ac, err = api.NewClient(m365, control.Defaults())
require.NoError(t, err, clues.ToCore(err))
}

View File

@ -313,7 +313,7 @@ func (suite *OneDriveIntgSuite) SetupSuite() {
suite.creds = creds
suite.ac, err = api.NewClient(creds)
suite.ac, err = api.NewClient(creds, control.Defaults())
require.NoError(t, err, clues.ToCore(err))
}

View File

@ -9,6 +9,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -20,7 +21,7 @@ type oneDriveService struct {
}
func NewOneDriveService(credentials account.M365Config) (*oneDriveService, error) {
ac, err := api.NewClient(credentials)
ac, err := api.NewClient(credentials, control.Defaults())
if err != nil {
return nil, err
}

View File

@ -53,7 +53,7 @@ func (suite *URLCacheIntegrationSuite) SetupSuite() {
creds, err := acct.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.ac, err = api.NewClient(creds)
suite.ac, err = api.NewClient(creds, control.Defaults())
require.NoError(t, err, clues.ToCore(err))
drive, err := suite.ac.Users().GetDefaultDrive(ctx, suite.user)

View File

@ -201,7 +201,7 @@ func (suite *SharePointPagesSuite) TestCollectPages() {
creds, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
ac, err := api.NewClient(creds)
ac, err := api.NewClient(creds, control.Defaults())
require.NoError(t, err, clues.ToCore(err))
col, err := collectPages(

View File

@ -43,7 +43,7 @@ func (suite *SharePointCollectionSuite) SetupSuite() {
suite.creds = m365
ac, err := api.NewClient(m365)
ac, err := api.NewClient(m365, control.Defaults())
require.NoError(t, err, clues.ToCore(err))
suite.ac = ac

View File

@ -1232,7 +1232,7 @@ func (suite *BackupOpIntegrationSuite) SetupSuite() {
creds, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.ac, err = api.NewClient(creds)
suite.ac, err = api.NewClient(creds, control.Defaults())
require.NoError(t, err, clues.ToCore(err))
}

View File

@ -278,7 +278,7 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr
creds, err := acct.M365Config()
require.NoError(t, err, clues.ToCore(err))
ac, err := api.NewClient(creds)
ac, err := api.NewClient(creds, control.Defaults())
require.NoError(t, err, clues.ToCore(err))
// generate 3 new folders with two items each.

View File

@ -585,7 +585,7 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup {
creds, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
its.ac, err = api.NewClient(creds)
its.ac, err = api.NewClient(creds, control.Defaults())
require.NoError(t, err, clues.ToCore(err))
its.gockAC, err = mock.NewClient(creds)

View File

@ -7,14 +7,17 @@ import (
// Options holds the optional configurations for a process
type Options struct {
// DeltaPageSize controls the quantity of items fetched in each page
// during multi-page queries, such as graph api delta endpoints.
DeltaPageSize int32 `json:"deltaPageSize"`
DisableMetrics bool `json:"disableMetrics"`
FailureHandling FailurePolicy `json:"failureHandling"`
ItemExtensionFactory []extensions.CreateItemExtensioner `json:"-"`
Parallelism Parallelism `json:"parallelism"`
Repo repository.Options `json:"repo"`
RestorePermissions bool `json:"restorePermissions"`
SkipReduce bool `json:"skipReduce"`
ToggleFeatures Toggles `json:"toggleFeatures"`
Parallelism Parallelism `json:"parallelism"`
Repo repository.Options `json:"repo"`
ItemExtensionFactory []extensions.CreateItemExtensioner `json:"-"`
}
type Parallelism struct {
@ -39,6 +42,7 @@ const (
func Defaults() Options {
return Options{
FailureHandling: FailAfterRecovery,
DeltaPageSize: 500,
ToggleFeatures: Toggles{},
Parallelism: Parallelism{
CollectionBuffer: 4,

View File

@ -8,6 +8,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/path"
)
@ -36,11 +37,13 @@ type Client struct {
// arbitrary urls instead of constructing queries using the
// graph api client.
Requester graph.Requester
options control.Options
}
// NewClient produces a new exchange api client. Must be used in
// place of creating an ad-hoc client struct.
func NewClient(creds account.M365Config) (Client, error) {
func NewClient(creds account.M365Config, co control.Options) (Client, error) {
s, err := NewService(creds)
if err != nil {
return Client{}, err
@ -53,7 +56,11 @@ func NewClient(creds account.M365Config) (Client, error) {
rqr := graph.NewNoTimeoutHTTPWrapper()
return Client{creds, s, li, rqr}, nil
if co.DeltaPageSize < 1 || co.DeltaPageSize > maxDeltaPageSize {
co.DeltaPageSize = maxDeltaPageSize
}
return Client{creds, s, li, rqr, co}, nil
}
// initConcurrencyLimit ensures that the graph concurrency limiter is

View File

@ -277,7 +277,7 @@ func (c Contacts) NewContactDeltaIDsPager(
Select: idAnd(parentFolderID),
// do NOT set Top. It limits the total items received.
},
Headers: newPreferHeaders(preferPageSize(maxDeltaPageSize), preferImmutableIDs(immutableIDs)),
Headers: newPreferHeaders(preferPageSize(c.options.DeltaPageSize), preferImmutableIDs(immutableIDs)),
}
var builder *users.ItemContactFoldersItemContactsDeltaRequestBuilder

View File

@ -244,7 +244,7 @@ func (c Events) NewEventDeltaIDsPager(
immutableIDs bool,
) (itemIDPager, error) {
options := &users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration{
Headers: newPreferHeaders(preferPageSize(maxDeltaPageSize), preferImmutableIDs(immutableIDs)),
Headers: newPreferHeaders(preferPageSize(c.options.DeltaPageSize), preferImmutableIDs(immutableIDs)),
QueryParameters: &users.ItemCalendarsItemEventsDeltaRequestBuilderGetQueryParameters{
// do NOT set Top. It limits the total items received.
},

View File

@ -16,6 +16,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/services/m365/api"
"github.com/alcionai/corso/src/pkg/services/m365/api/mock"
)
@ -96,7 +97,7 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup {
creds, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
its.ac, err = api.NewClient(creds)
its.ac, err = api.NewClient(creds, control.Defaults())
require.NoError(t, err, clues.ToCore(err))
its.gockAC, err = mock.NewClient(creds)

View File

@ -310,7 +310,7 @@ func (c Mail) NewMailDeltaIDsPager(
Select: idAnd("isRead"),
// do NOT set Top. It limits the total items received.
},
Headers: newPreferHeaders(preferPageSize(maxDeltaPageSize), preferImmutableIDs(immutableIDs)),
Headers: newPreferHeaders(preferPageSize(c.options.DeltaPageSize), preferImmutableIDs(immutableIDs)),
}
var builder *users.ItemMailFoldersItemMessagesDeltaRequestBuilder

View File

@ -10,6 +10,7 @@ import (
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -328,7 +329,7 @@ func makeAC(
return api.Client{}, clues.Wrap(err, "getting m365 account creds")
}
cli, err := api.NewClient(creds)
cli, err := api.NewClient(creds, control.Defaults())
if err != nil {
return api.Client{}, clues.Wrap(err, "constructing api client")
}