fix concurrency limiter init race (#3715)
initializes the concurrency limiter as part of CLI pre-run initialization to ensure we don't accidentally skip out on including the limiter in middleware. --- #### Does this PR need a docs update or release note? - [x] ⛔ No #### Type of change - [x] 🐛 Bugfix #### Issue(s) * #3695 #### Test Plan - [x] 💪 Manual - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
5949a3bcd7
commit
8b81728488
@ -264,14 +264,19 @@ func runBackups(
|
||||
|
||||
// genericDeleteCommand is a helper function that all services can use
|
||||
// for the removal of an entry from the repository
|
||||
func genericDeleteCommand(cmd *cobra.Command, bID, designation string, args []string) error {
|
||||
func genericDeleteCommand(
|
||||
cmd *cobra.Command,
|
||||
pst path.ServiceType,
|
||||
bID, designation string,
|
||||
args []string,
|
||||
) error {
|
||||
if utils.HasNoFlagsAndShownHelp(cmd) {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx := clues.Add(cmd.Context(), "delete_backup_id", bID)
|
||||
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd))
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, pst, repo.S3Overrides(cmd))
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
@ -289,10 +294,15 @@ func genericDeleteCommand(cmd *cobra.Command, bID, designation string, args []st
|
||||
|
||||
// genericListCommand is a helper function that all services can use
|
||||
// to display the backup IDs saved within the repository
|
||||
func genericListCommand(cmd *cobra.Command, bID string, service path.ServiceType, args []string) error {
|
||||
func genericListCommand(
|
||||
cmd *cobra.Command,
|
||||
bID string,
|
||||
service path.ServiceType,
|
||||
args []string,
|
||||
) error {
|
||||
ctx := cmd.Context()
|
||||
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd))
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, service, repo.S3Overrides(cmd))
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
|
||||
@ -166,7 +166,7 @@ func createExchangeCmd(cmd *cobra.Command, args []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, repo.S3Overrides(cmd))
|
||||
r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, path.ExchangeService, repo.S3Overrides(cmd))
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
@ -275,7 +275,7 @@ func detailsExchangeCmd(cmd *cobra.Command, args []string) error {
|
||||
ctx := cmd.Context()
|
||||
opts := utils.MakeExchangeOpts(cmd)
|
||||
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd))
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, path.ExchangeService, repo.S3Overrides(cmd))
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
@ -353,5 +353,5 @@ func exchangeDeleteCmd() *cobra.Command {
|
||||
|
||||
// deletes an exchange service backup.
|
||||
func deleteExchangeCmd(cmd *cobra.Command, args []string) error {
|
||||
return genericDeleteCommand(cmd, flags.BackupIDFV, "Exchange", args)
|
||||
return genericDeleteCommand(cmd, path.ExchangeService, flags.BackupIDFV, "Exchange", args)
|
||||
}
|
||||
|
||||
@ -148,7 +148,7 @@ func createOneDriveCmd(cmd *cobra.Command, args []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, repo.S3Overrides(cmd))
|
||||
r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, path.OneDriveService, repo.S3Overrides(cmd))
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
@ -234,7 +234,7 @@ func detailsOneDriveCmd(cmd *cobra.Command, args []string) error {
|
||||
ctx := cmd.Context()
|
||||
opts := utils.MakeOneDriveOpts(cmd)
|
||||
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd))
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, path.OneDriveService, repo.S3Overrides(cmd))
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
@ -309,5 +309,5 @@ func oneDriveDeleteCmd() *cobra.Command {
|
||||
|
||||
// deletes a oneDrive service backup.
|
||||
func deleteOneDriveCmd(cmd *cobra.Command, args []string) error {
|
||||
return genericDeleteCommand(cmd, flags.BackupIDFV, "OneDrive", args)
|
||||
return genericDeleteCommand(cmd, path.OneDriveService, flags.BackupIDFV, "OneDrive", args)
|
||||
}
|
||||
|
||||
@ -163,7 +163,7 @@ func createSharePointCmd(cmd *cobra.Command, args []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, repo.S3Overrides(cmd))
|
||||
r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, path.SharePointService, repo.S3Overrides(cmd))
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
@ -298,7 +298,7 @@ func sharePointDeleteCmd() *cobra.Command {
|
||||
|
||||
// deletes a sharePoint service backup.
|
||||
func deleteSharePointCmd(cmd *cobra.Command, args []string) error {
|
||||
return genericDeleteCommand(cmd, flags.BackupIDFV, "SharePoint", args)
|
||||
return genericDeleteCommand(cmd, path.SharePointService, flags.BackupIDFV, "SharePoint", args)
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
@ -325,7 +325,7 @@ func detailsSharePointCmd(cmd *cobra.Command, args []string) error {
|
||||
ctx := cmd.Context()
|
||||
opts := utils.MakeSharePointOpts(cmd)
|
||||
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd))
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, path.SharePointService, repo.S3Overrides(cmd))
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
"github.com/alcionai/corso/src/cli/print"
|
||||
"github.com/alcionai/corso/src/cli/utils"
|
||||
"github.com/alcionai/corso/src/pkg/control/repository"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -122,7 +123,7 @@ func handleMaintenanceCmd(cmd *cobra.Command, args []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, S3Overrides(cmd))
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, path.UnknownService, S3Overrides(cmd))
|
||||
if err != nil {
|
||||
return print.Only(ctx, err)
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/common/dttm"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
// called by restore.go to map subcommands to provider-specific handling.
|
||||
@ -93,7 +94,7 @@ func restoreExchangeCmd(cmd *cobra.Command, args []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd))
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, path.ExchangeService, repo.S3Overrides(cmd))
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/common/dttm"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
// called by restore.go to map subcommands to provider-specific handling.
|
||||
@ -92,7 +93,7 @@ func restoreOneDriveCmd(cmd *cobra.Command, args []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd))
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, path.OneDriveService, repo.S3Overrides(cmd))
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/common/dttm"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
// called by restore.go to map subcommands to provider-specific handling.
|
||||
@ -99,7 +100,7 @@ func restoreSharePointCmd(cmd *cobra.Command, args []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd))
|
||||
r, _, _, err := utils.GetAccountAndConnect(ctx, path.SharePointService, repo.S3Overrides(cmd))
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
|
||||
@ -21,6 +21,7 @@ import (
|
||||
|
||||
func GetAccountAndConnect(
|
||||
ctx context.Context,
|
||||
pst path.ServiceType,
|
||||
overrides map[string]string,
|
||||
) (repository.Repository, *storage.Storage, *account.Account, error) {
|
||||
cfg, err := config.GetConfigRepoDetails(ctx, true, true, overrides)
|
||||
@ -38,14 +39,21 @@ func GetAccountAndConnect(
|
||||
return nil, nil, nil, clues.Wrap(err, "connecting to the "+cfg.Storage.Provider.String()+" repository")
|
||||
}
|
||||
|
||||
// this initializes our graph api client configurations,
|
||||
// including control options such as concurency limitations.
|
||||
if _, err := r.ConnectToM365(ctx, pst); err != nil {
|
||||
return nil, nil, nil, clues.Wrap(err, "connecting to m365")
|
||||
}
|
||||
|
||||
return r, &cfg.Storage, &cfg.Account, nil
|
||||
}
|
||||
|
||||
func AccountConnectAndWriteRepoConfig(
|
||||
ctx context.Context,
|
||||
pst path.ServiceType,
|
||||
overrides map[string]string,
|
||||
) (repository.Repository, *account.Account, error) {
|
||||
r, stg, acc, err := GetAccountAndConnect(ctx, overrides)
|
||||
r, stg, acc, err := GetAccountAndConnect(ctx, pst, overrides)
|
||||
if err != nil {
|
||||
logger.CtxErr(ctx, err).Info("getting and connecting account")
|
||||
return nil, nil, err
|
||||
|
||||
@ -113,6 +113,7 @@ func getControllerAndVerifyResourceOwner(
|
||||
ctx context.Context,
|
||||
resourceCat resource.Category,
|
||||
resourceOwner string,
|
||||
pst path.ServiceType,
|
||||
) (
|
||||
*m365.Controller,
|
||||
account.Account,
|
||||
@ -136,7 +137,7 @@ func getControllerAndVerifyResourceOwner(
|
||||
return nil, account.Account{}, nil, clues.Wrap(err, "finding m365 account details")
|
||||
}
|
||||
|
||||
ctrl, err := m365.NewController(ctx, acct, resourceCat)
|
||||
ctrl, err := m365.NewController(ctx, acct, resourceCat, pst, control.Options{})
|
||||
if err != nil {
|
||||
return nil, account.Account{}, nil, clues.Wrap(err, "connecting to graph api")
|
||||
}
|
||||
|
||||
@ -52,7 +52,7 @@ func handleExchangeEmailFactory(cmd *cobra.Command, args []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctrl, _, _, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User)
|
||||
ctrl, _, _, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User, path.ExchangeService)
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
@ -98,7 +98,7 @@ func handleExchangeCalendarEventFactory(cmd *cobra.Command, args []string) error
|
||||
return nil
|
||||
}
|
||||
|
||||
ctrl, _, _, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User)
|
||||
ctrl, _, _, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User, path.ExchangeService)
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
@ -146,7 +146,7 @@ func handleExchangeContactFactory(cmd *cobra.Command, args []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctrl, _, _, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User)
|
||||
ctrl, _, _, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User, path.ExchangeService)
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
|
||||
@ -36,7 +36,7 @@ func handleOneDriveFileFactory(cmd *cobra.Command, args []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctrl, acct, inp, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User)
|
||||
ctrl, acct, inp, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User, path.OneDriveService)
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
|
||||
@ -36,7 +36,7 @@ func handleSharePointLibraryFileFactory(cmd *cobra.Command, args []string) error
|
||||
return nil
|
||||
}
|
||||
|
||||
ctrl, acct, inp, err := getControllerAndVerifyResourceOwner(ctx, resource.Sites, Site)
|
||||
ctrl, acct, inp, err := getControllerAndVerifyResourceOwner(ctx, resource.Sites, Site, path.SharePointService)
|
||||
if err != nil {
|
||||
return Only(ctx, err)
|
||||
}
|
||||
|
||||
@ -66,7 +66,7 @@ func (suite *DataCollectionIntgSuite) TestExchangeDataCollection() {
|
||||
|
||||
selUsers := []string{suite.user}
|
||||
|
||||
ctrl := newController(ctx, suite.T(), resource.Users)
|
||||
ctrl := newController(ctx, suite.T(), resource.Users, path.ExchangeService)
|
||||
tests := []struct {
|
||||
name string
|
||||
getSelector func(t *testing.T) selectors.Selector
|
||||
@ -167,7 +167,7 @@ func (suite *DataCollectionIntgSuite) TestDataCollections_invalidResourceOwner()
|
||||
defer flush()
|
||||
|
||||
owners := []string{"snuffleupagus"}
|
||||
ctrl := newController(ctx, suite.T(), resource.Users)
|
||||
ctrl := newController(ctx, suite.T(), resource.Users, path.ExchangeService)
|
||||
tests := []struct {
|
||||
name string
|
||||
getSelector func(t *testing.T) selectors.Selector
|
||||
@ -253,7 +253,7 @@ func (suite *DataCollectionIntgSuite) TestSharePointDataCollection() {
|
||||
defer flush()
|
||||
|
||||
selSites := []string{suite.site}
|
||||
ctrl := newController(ctx, suite.T(), resource.Sites)
|
||||
ctrl := newController(ctx, suite.T(), resource.Sites, path.SharePointService)
|
||||
tests := []struct {
|
||||
name string
|
||||
expected int
|
||||
@ -348,7 +348,7 @@ func (suite *SPCollectionIntgSuite) SetupSuite() {
|
||||
ctx, flush := tester.NewContext(suite.T())
|
||||
defer flush()
|
||||
|
||||
suite.connector = newController(ctx, suite.T(), resource.Sites)
|
||||
suite.connector = newController(ctx, suite.T(), resource.Sites, path.SharePointService)
|
||||
suite.user = tester.M365UserID(suite.T())
|
||||
|
||||
tester.LogTimeOfTest(suite.T())
|
||||
@ -362,7 +362,7 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Libraries() {
|
||||
|
||||
var (
|
||||
siteID = tester.M365SiteID(t)
|
||||
ctrl = newController(ctx, t, resource.Sites)
|
||||
ctrl = newController(ctx, t, resource.Sites, path.SharePointService)
|
||||
siteIDs = []string{siteID}
|
||||
)
|
||||
|
||||
@ -409,7 +409,7 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Lists() {
|
||||
|
||||
var (
|
||||
siteID = tester.M365SiteID(t)
|
||||
ctrl = newController(ctx, t, resource.Sites)
|
||||
ctrl = newController(ctx, t, resource.Sites, path.SharePointService)
|
||||
siteIDs = []string{siteID}
|
||||
)
|
||||
|
||||
|
||||
@ -14,6 +14,8 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/m365/support"
|
||||
"github.com/alcionai/corso/src/internal/operations/inject"
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
)
|
||||
|
||||
@ -51,7 +53,11 @@ func NewController(
|
||||
ctx context.Context,
|
||||
acct account.Account,
|
||||
rc resource.Category,
|
||||
pst path.ServiceType,
|
||||
co control.Options,
|
||||
) (*Controller, error) {
|
||||
graph.InitializeConcurrencyLimiter(ctx, pst == path.ExchangeService, co.Parallelism.ItemFetch)
|
||||
|
||||
creds, err := acct.M365Config()
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "retrieving m365 account configuration").WithClues(ctx)
|
||||
|
||||
@ -283,7 +283,7 @@ func (suite *ControllerIntegrationSuite) SetupSuite() {
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
suite.ctrl = newController(ctx, t, resource.Users)
|
||||
suite.ctrl = newController(ctx, t, resource.Users, path.ExchangeService)
|
||||
suite.user = tester.M365UserID(t)
|
||||
suite.secondaryUser = tester.SecondaryM365UserID(t)
|
||||
|
||||
@ -422,7 +422,7 @@ func runRestore(
|
||||
|
||||
start := time.Now()
|
||||
|
||||
restoreCtrl := newController(ctx, t, config.Resource)
|
||||
restoreCtrl := newController(ctx, t, config.Resource, path.ExchangeService)
|
||||
restoreSel := getSelectorWith(t, config.Service, config.ResourceOwners, true)
|
||||
deets, err := restoreCtrl.ConsumeRestoreCollections(
|
||||
ctx,
|
||||
@ -484,7 +484,7 @@ func runBackupAndCompare(
|
||||
nameToID[ro] = ro
|
||||
}
|
||||
|
||||
backupCtrl := newController(ctx, t, config.Resource)
|
||||
backupCtrl := newController(ctx, t, config.Resource, path.ExchangeService)
|
||||
backupCtrl.IDNameLookup = inMock.NewCache(idToName, nameToID)
|
||||
|
||||
backupSel := backupSelectorForExpected(t, config.Service, expectedDests)
|
||||
@ -1030,7 +1030,7 @@ func (suite *ControllerIntegrationSuite) TestMultiFolderBackupDifferentNames() {
|
||||
restoreCfg.Location,
|
||||
)
|
||||
|
||||
restoreCtrl := newController(ctx, t, test.resourceCat)
|
||||
restoreCtrl := newController(ctx, t, test.resourceCat, path.ExchangeService)
|
||||
deets, err := restoreCtrl.ConsumeRestoreCollections(
|
||||
ctx,
|
||||
version.Backup,
|
||||
@ -1060,7 +1060,7 @@ func (suite *ControllerIntegrationSuite) TestMultiFolderBackupDifferentNames() {
|
||||
|
||||
// Run a backup and compare its output with what we put in.
|
||||
|
||||
backupCtrl := newController(ctx, t, test.resourceCat)
|
||||
backupCtrl := newController(ctx, t, test.resourceCat, path.ExchangeService)
|
||||
backupSel := backupSelectorForExpected(t, test.service, expectedDests)
|
||||
t.Log("Selective backup of", backupSel)
|
||||
|
||||
@ -1209,7 +1209,7 @@ func (suite *ControllerIntegrationSuite) TestBackup_CreatesPrefixCollections() {
|
||||
defer flush()
|
||||
|
||||
var (
|
||||
backupCtrl = newController(ctx, t, test.resourceCat)
|
||||
backupCtrl = newController(ctx, t, test.resourceCat, path.ExchangeService)
|
||||
backupSel = test.selectorFunc(t)
|
||||
errs = fault.New(true)
|
||||
start = time.Now()
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/m365/discovery"
|
||||
"github.com/alcionai/corso/src/internal/m365/graph"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
"github.com/alcionai/corso/src/pkg/credentials"
|
||||
@ -30,6 +31,13 @@ func TestDiscoveryIntgSuite(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func (suite *DiscoveryIntgSuite) SetupSuite() {
|
||||
ctx, flush := tester.NewContext(suite.T())
|
||||
defer flush()
|
||||
|
||||
graph.InitializeConcurrencyLimiter(ctx, true, 4)
|
||||
}
|
||||
|
||||
func (suite *DiscoveryIntgSuite) TestUsers() {
|
||||
t := suite.T()
|
||||
|
||||
|
||||
@ -199,9 +199,10 @@ func ProduceBackupCollections(
|
||||
|
||||
// Turn on concurrency limiter middleware for exchange backups
|
||||
// unless explicitly disabled through DisableConcurrencyLimiterFN cli flag
|
||||
if !ctrlOpts.ToggleFeatures.DisableConcurrencyLimiter {
|
||||
graph.InitializeConcurrencyLimiter(ctrlOpts.Parallelism.ItemFetch)
|
||||
}
|
||||
graph.InitializeConcurrencyLimiter(
|
||||
ctx,
|
||||
ctrlOpts.ToggleFeatures.DisableConcurrencyLimiter,
|
||||
ctrlOpts.Parallelism.ItemFetch)
|
||||
|
||||
cdps, canUsePreviousBackup, err := parseMetadataCollections(ctx, metadata)
|
||||
if err != nil {
|
||||
|
||||
@ -399,19 +399,26 @@ func TestBackupIntgSuite(t *testing.T) {
|
||||
}
|
||||
|
||||
func (suite *BackupIntgSuite) SetupSuite() {
|
||||
suite.user = tester.M365UserID(suite.T())
|
||||
suite.site = tester.M365SiteID(suite.T())
|
||||
t := suite.T()
|
||||
|
||||
acct := tester.NewM365Account(suite.T())
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
graph.InitializeConcurrencyLimiter(ctx, true, 4)
|
||||
|
||||
suite.user = tester.M365UserID(t)
|
||||
suite.site = tester.M365SiteID(t)
|
||||
|
||||
acct := tester.NewM365Account(t)
|
||||
creds, err := acct.M365Config()
|
||||
require.NoError(suite.T(), err, clues.ToCore(err))
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
suite.ac, err = api.NewClient(creds)
|
||||
require.NoError(suite.T(), err, clues.ToCore(err))
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
suite.tenantID = creds.AzureTenantID
|
||||
|
||||
tester.LogTimeOfTest(suite.T())
|
||||
tester.LogTimeOfTest(t)
|
||||
}
|
||||
|
||||
func (suite *BackupIntgSuite) TestMailFetch() {
|
||||
|
||||
@ -26,6 +26,12 @@ func TestBetaClientSuite(t *testing.T) {
|
||||
|
||||
func (suite *BetaClientSuite) SetupSuite() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
graph.InitializeConcurrencyLimiter(ctx, false, 4)
|
||||
|
||||
a := tester.NewM365Account(t)
|
||||
m365, err := a.M365Config()
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
@ -20,34 +20,41 @@ import (
|
||||
// "how many calls at one time"
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// concurrencyLimiter middleware limits the number of concurrent requests to graph API
|
||||
type concurrencyLimiter struct {
|
||||
// concurrencyLimiterMiddleware middleware limits the number of concurrent requests to graph API
|
||||
type concurrencyLimiterMiddleware struct {
|
||||
semaphore chan struct{}
|
||||
}
|
||||
|
||||
var (
|
||||
once sync.Once
|
||||
concurrencyLim *concurrencyLimiter
|
||||
maxConcurrentRequests = 4
|
||||
once sync.Once
|
||||
concurrencyLimitMiddlewareSingleton = &concurrencyLimiterMiddleware{}
|
||||
maxConcurrentRequests = 4
|
||||
)
|
||||
|
||||
func generateConcurrencyLimiter(capacity int) *concurrencyLimiter {
|
||||
func generateConcurrencyLimiter(capacity int) *concurrencyLimiterMiddleware {
|
||||
if capacity < 1 || capacity > maxConcurrentRequests {
|
||||
capacity = maxConcurrentRequests
|
||||
}
|
||||
|
||||
return &concurrencyLimiter{
|
||||
return &concurrencyLimiterMiddleware{
|
||||
semaphore: make(chan struct{}, capacity),
|
||||
}
|
||||
}
|
||||
|
||||
func InitializeConcurrencyLimiter(capacity int) {
|
||||
func InitializeConcurrencyLimiter(ctx context.Context, enable bool, capacity int) {
|
||||
once.Do(func() {
|
||||
concurrencyLim = generateConcurrencyLimiter(capacity)
|
||||
switch enable {
|
||||
case true:
|
||||
logger.Ctx(ctx).Infow("turning on the concurrency limiter", "concurrency_limit", capacity)
|
||||
concurrencyLimitMiddlewareSingleton.semaphore = generateConcurrencyLimiter(capacity).semaphore
|
||||
case false:
|
||||
logger.Ctx(ctx).Info("turning off the concurrency limiter")
|
||||
concurrencyLimitMiddlewareSingleton = nil
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (cl *concurrencyLimiter) Intercept(
|
||||
func (cl *concurrencyLimiterMiddleware) Intercept(
|
||||
pipeline khttp.Pipeline,
|
||||
middlewareIndex int,
|
||||
req *http.Request,
|
||||
|
||||
@ -76,10 +76,13 @@ func (suite *ConcurrencyMWUnitTestSuite) TestConcurrencyLimiter() {
|
||||
func (suite *ConcurrencyMWUnitTestSuite) TestInitializeConcurrencyLimiter() {
|
||||
t := suite.T()
|
||||
|
||||
InitializeConcurrencyLimiter(2)
|
||||
InitializeConcurrencyLimiter(4)
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
assert.Equal(t, cap(concurrencyLim.semaphore), 2, "singleton semaphore capacity changed")
|
||||
InitializeConcurrencyLimiter(ctx, true, 2)
|
||||
InitializeConcurrencyLimiter(ctx, false, 4)
|
||||
|
||||
assert.Equal(t, cap(concurrencyLimitMiddlewareSingleton.semaphore), 2, "singleton semaphore capacity changed")
|
||||
}
|
||||
|
||||
func (suite *ConcurrencyMWUnitTestSuite) TestGenerateConcurrencyLimiter() {
|
||||
|
||||
@ -261,8 +261,8 @@ func kiotaMiddlewares(
|
||||
}...)
|
||||
|
||||
// Optionally add concurrency limiter middleware if it has been initialized.
|
||||
if concurrencyLim != nil {
|
||||
mw = append(mw, concurrencyLim)
|
||||
if concurrencyLimitMiddlewareSingleton != nil {
|
||||
mw = append(mw, concurrencyLimitMiddlewareSingleton)
|
||||
}
|
||||
|
||||
mw = append(
|
||||
|
||||
@ -23,6 +23,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/m365/resource"
|
||||
m365Stub "github.com/alcionai/corso/src/internal/m365/stub"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
@ -1190,10 +1191,15 @@ func getSelectorWith(
|
||||
}
|
||||
}
|
||||
|
||||
func newController(ctx context.Context, t *testing.T, r resource.Category) *Controller {
|
||||
func newController(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
r resource.Category,
|
||||
pst path.ServiceType,
|
||||
) *Controller {
|
||||
a := tester.NewM365Account(t)
|
||||
|
||||
controller, err := NewController(ctx, a, r)
|
||||
controller, err := NewController(ctx, a, r, pst, control.Options{})
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
return controller
|
||||
|
||||
@ -299,6 +299,11 @@ func TestOneDriveSuite(t *testing.T) {
|
||||
func (suite *OneDriveIntgSuite) SetupSuite() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
graph.InitializeConcurrencyLimiter(ctx, true, 4)
|
||||
|
||||
suite.userID = tester.SecondaryM365UserID(t)
|
||||
|
||||
acct := tester.NewM365Account(t)
|
||||
|
||||
@ -121,7 +121,7 @@ func NewSuiteInfoImpl(
|
||||
rsc = resource.Sites
|
||||
}
|
||||
|
||||
ctrl := newController(ctx, t, rsc)
|
||||
ctrl := newController(ctx, t, rsc, path.OneDriveService)
|
||||
|
||||
return suiteInfoImpl{
|
||||
ac: ctrl.AC,
|
||||
|
||||
@ -40,6 +40,11 @@ type SharePointPageSuite struct {
|
||||
func (suite *SharePointPageSuite) SetupSuite() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
graph.InitializeConcurrencyLimiter(ctx, true, 4)
|
||||
|
||||
suite.siteID = tester.M365SiteID(t)
|
||||
a := tester.NewM365Account(t)
|
||||
m365, err := a.M365Config()
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/idname/mock"
|
||||
"github.com/alcionai/corso/src/internal/m365/graph"
|
||||
"github.com/alcionai/corso/src/internal/m365/onedrive"
|
||||
odConsts "github.com/alcionai/corso/src/internal/m365/onedrive/consts"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
@ -174,11 +175,17 @@ func TestSharePointPagesSuite(t *testing.T) {
|
||||
suite.Run(t, &SharePointPagesSuite{
|
||||
Suite: tester.NewIntegrationSuite(
|
||||
t,
|
||||
[][]string{tester.M365AcctCredEnvs},
|
||||
),
|
||||
[][]string{tester.M365AcctCredEnvs}),
|
||||
})
|
||||
}
|
||||
|
||||
func (suite *SharePointPagesSuite) SetupSuite() {
|
||||
ctx, flush := tester.NewContext(suite.T())
|
||||
defer flush()
|
||||
|
||||
graph.InitializeConcurrencyLimiter(ctx, false, 4)
|
||||
}
|
||||
|
||||
func (suite *SharePointPagesSuite) TestCollectPages() {
|
||||
t := suite.T()
|
||||
|
||||
|
||||
@ -525,6 +525,11 @@ func TestBackupOpIntegrationSuite(t *testing.T) {
|
||||
func (suite *BackupOpIntegrationSuite) SetupSuite() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
graph.InitializeConcurrencyLimiter(ctx, true, 4)
|
||||
|
||||
suite.user = tester.M365UserID(t)
|
||||
suite.site = tester.M365SiteID(t)
|
||||
|
||||
@ -1867,7 +1872,7 @@ func runDriveIncrementalTest(
|
||||
}
|
||||
for _, test := range table {
|
||||
suite.Run(test.name, func() {
|
||||
cleanCtrl, err := m365.NewController(ctx, acct, rc)
|
||||
cleanCtrl, err := m365.NewController(ctx, acct, rc, sel.PathService(), control.Options{})
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
var (
|
||||
@ -1951,7 +1956,9 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_oneDriveOwnerMigration() {
|
||||
ctrl, err := m365.NewController(
|
||||
ctx,
|
||||
acct,
|
||||
resource.Users)
|
||||
resource.Users,
|
||||
path.OneDriveService,
|
||||
control.Options{})
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
userable, err := ctrl.AC.Users().GetByID(ctx, suite.user)
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/m365"
|
||||
"github.com/alcionai/corso/src/internal/m365/resource"
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
)
|
||||
|
||||
@ -26,7 +27,7 @@ func ControllerWithSelector(
|
||||
ins idname.Cacher,
|
||||
onFail func(),
|
||||
) (*m365.Controller, selectors.Selector) {
|
||||
ctrl, err := m365.NewController(ctx, acct, cr)
|
||||
ctrl, err := m365.NewController(ctx, acct, cr, sel.PathService(), control.Options{})
|
||||
if !assert.NoError(t, err, clues.ToCore(err)) {
|
||||
if onFail != nil {
|
||||
onFail()
|
||||
|
||||
@ -465,7 +465,9 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_errorNoBackup() {
|
||||
ctrl, err := m365.NewController(
|
||||
ctx,
|
||||
suite.acct,
|
||||
resource.Users)
|
||||
resource.Users,
|
||||
rsel.PathService(),
|
||||
control.Options{})
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
ro, err := NewRestoreOperation(
|
||||
|
||||
@ -28,6 +28,7 @@ import (
|
||||
rep "github.com/alcionai/corso/src/pkg/control/repository"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
"github.com/alcionai/corso/src/pkg/storage"
|
||||
"github.com/alcionai/corso/src/pkg/store"
|
||||
@ -78,6 +79,12 @@ type Repository interface {
|
||||
) (operations.MaintenanceOperation, error)
|
||||
DeleteBackup(ctx context.Context, id string) error
|
||||
BackupGetter
|
||||
// ConnectToM365 establishes graph api connections
|
||||
// and initializes api client configurations.
|
||||
ConnectToM365(
|
||||
ctx context.Context,
|
||||
pst path.ServiceType,
|
||||
) (*m365.Controller, error)
|
||||
}
|
||||
|
||||
// Repository contains storage provider information.
|
||||
@ -316,14 +323,14 @@ func (r repository) NewBackupWithLookup(
|
||||
sel selectors.Selector,
|
||||
ins idname.Cacher,
|
||||
) (operations.BackupOperation, error) {
|
||||
ctrl, err := connectToM365(ctx, sel, r.Account)
|
||||
ctrl, err := connectToM365(ctx, sel.PathService(), r.Account, r.Opts)
|
||||
if err != nil {
|
||||
return operations.BackupOperation{}, errors.Wrap(err, "connecting to m365")
|
||||
return operations.BackupOperation{}, clues.Wrap(err, "connecting to m365")
|
||||
}
|
||||
|
||||
ownerID, ownerName, err := ctrl.PopulateOwnerIDAndNamesFrom(ctx, sel.DiscreteOwner, ins)
|
||||
if err != nil {
|
||||
return operations.BackupOperation{}, errors.Wrap(err, "resolving resource owner details")
|
||||
return operations.BackupOperation{}, clues.Wrap(err, "resolving resource owner details")
|
||||
}
|
||||
|
||||
// TODO: retrieve display name from gc
|
||||
@ -348,9 +355,9 @@ func (r repository) NewRestore(
|
||||
sel selectors.Selector,
|
||||
restoreCfg control.RestoreConfig,
|
||||
) (operations.RestoreOperation, error) {
|
||||
ctrl, err := connectToM365(ctx, sel, r.Account)
|
||||
ctrl, err := connectToM365(ctx, sel.PathService(), r.Account, r.Opts)
|
||||
if err != nil {
|
||||
return operations.RestoreOperation{}, errors.Wrap(err, "connecting to m365")
|
||||
return operations.RestoreOperation{}, clues.Wrap(err, "connecting to m365")
|
||||
}
|
||||
|
||||
return operations.NewRestoreOperation(
|
||||
@ -585,6 +592,18 @@ func deleteBackup(
|
||||
return sw.DeleteBackup(ctx, model.StableID(id))
|
||||
}
|
||||
|
||||
func (r repository) ConnectToM365(
|
||||
ctx context.Context,
|
||||
pst path.ServiceType,
|
||||
) (*m365.Controller, error) {
|
||||
ctrl, err := connectToM365(ctx, pst, r.Account, r.Opts)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "connecting to m365")
|
||||
}
|
||||
|
||||
return ctrl, nil
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Repository ID Model
|
||||
// ---------------------------------------------------------------------------
|
||||
@ -633,11 +652,11 @@ func newRepoID(s storage.Storage) string {
|
||||
// helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// produces a graph m365.
|
||||
func connectToM365(
|
||||
ctx context.Context,
|
||||
sel selectors.Selector,
|
||||
pst path.ServiceType,
|
||||
acct account.Account,
|
||||
co control.Options,
|
||||
) (*m365.Controller, error) {
|
||||
complete := observe.MessageWithCompletion(ctx, "Connecting to M365")
|
||||
defer func() {
|
||||
@ -647,11 +666,11 @@ func connectToM365(
|
||||
|
||||
// retrieve data from the producer
|
||||
rc := resource.Users
|
||||
if sel.Service == selectors.ServiceSharePoint {
|
||||
if pst == path.SharePointService {
|
||||
rc = resource.Sites
|
||||
}
|
||||
|
||||
ctrl, err := m365.NewController(ctx, acct, rc)
|
||||
ctrl, err := m365.NewController(ctx, acct, rc, pst, co)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -223,7 +223,7 @@ type EventsAPIIntgSuite struct {
|
||||
its intgTesterSetup
|
||||
}
|
||||
|
||||
func TestEventsAPIntgSuite(t *testing.T) {
|
||||
func TestEventsAPIIntgSuite(t *testing.T) {
|
||||
suite.Run(t, &EventsAPIIntgSuite{
|
||||
Suite: tester.NewIntegrationSuite(
|
||||
t,
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
"github.com/alcionai/corso/src/internal/m365/graph"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api/mock"
|
||||
@ -88,6 +89,8 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup {
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
graph.InitializeConcurrencyLimiter(ctx, true, 4)
|
||||
|
||||
a := tester.NewM365Account(t)
|
||||
creds, err := a.M365Config()
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
@ -37,6 +37,8 @@ func (suite *M365IntegrationSuite) TestUsers() {
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
graph.InitializeConcurrencyLimiter(ctx, true, 4)
|
||||
|
||||
acct := tester.NewM365Account(suite.T())
|
||||
|
||||
users, err := Users(ctx, acct, fault.New(true))
|
||||
@ -84,6 +86,8 @@ func (suite *M365IntegrationSuite) TestGetUserInfo() {
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
graph.InitializeConcurrencyLimiter(ctx, true, 4)
|
||||
|
||||
var (
|
||||
acct = tester.NewM365Account(t)
|
||||
uid = tester.M365UserID(t)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user