diff --git a/src/cli/backup/backup.go b/src/cli/backup/backup.go index a4cf1f2fa..cfa4a0b61 100644 --- a/src/cli/backup/backup.go +++ b/src/cli/backup/backup.go @@ -264,14 +264,19 @@ func runBackups( // genericDeleteCommand is a helper function that all services can use // for the removal of an entry from the repository -func genericDeleteCommand(cmd *cobra.Command, bID, designation string, args []string) error { +func genericDeleteCommand( + cmd *cobra.Command, + pst path.ServiceType, + bID, designation string, + args []string, +) error { if utils.HasNoFlagsAndShownHelp(cmd) { return nil } ctx := clues.Add(cmd.Context(), "delete_backup_id", bID) - r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd)) + r, _, _, err := utils.GetAccountAndConnect(ctx, pst, repo.S3Overrides(cmd)) if err != nil { return Only(ctx, err) } @@ -289,10 +294,15 @@ func genericDeleteCommand(cmd *cobra.Command, bID, designation string, args []st // genericListCommand is a helper function that all services can use // to display the backup IDs saved within the repository -func genericListCommand(cmd *cobra.Command, bID string, service path.ServiceType, args []string) error { +func genericListCommand( + cmd *cobra.Command, + bID string, + service path.ServiceType, + args []string, +) error { ctx := cmd.Context() - r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd)) + r, _, _, err := utils.GetAccountAndConnect(ctx, service, repo.S3Overrides(cmd)) if err != nil { return Only(ctx, err) } diff --git a/src/cli/backup/exchange.go b/src/cli/backup/exchange.go index 2f64f1a00..16429fb8f 100644 --- a/src/cli/backup/exchange.go +++ b/src/cli/backup/exchange.go @@ -166,7 +166,7 @@ func createExchangeCmd(cmd *cobra.Command, args []string) error { return err } - r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, repo.S3Overrides(cmd)) + r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, path.ExchangeService, repo.S3Overrides(cmd)) if err != nil { return Only(ctx, err) } @@ -275,7 +275,7 @@ func detailsExchangeCmd(cmd *cobra.Command, args []string) error { ctx := cmd.Context() opts := utils.MakeExchangeOpts(cmd) - r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd)) + r, _, _, err := utils.GetAccountAndConnect(ctx, path.ExchangeService, repo.S3Overrides(cmd)) if err != nil { return Only(ctx, err) } @@ -353,5 +353,5 @@ func exchangeDeleteCmd() *cobra.Command { // deletes an exchange service backup. func deleteExchangeCmd(cmd *cobra.Command, args []string) error { - return genericDeleteCommand(cmd, flags.BackupIDFV, "Exchange", args) + return genericDeleteCommand(cmd, path.ExchangeService, flags.BackupIDFV, "Exchange", args) } diff --git a/src/cli/backup/onedrive.go b/src/cli/backup/onedrive.go index 87f0a842c..7249bde5d 100644 --- a/src/cli/backup/onedrive.go +++ b/src/cli/backup/onedrive.go @@ -148,7 +148,7 @@ func createOneDriveCmd(cmd *cobra.Command, args []string) error { return err } - r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, repo.S3Overrides(cmd)) + r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, path.OneDriveService, repo.S3Overrides(cmd)) if err != nil { return Only(ctx, err) } @@ -234,7 +234,7 @@ func detailsOneDriveCmd(cmd *cobra.Command, args []string) error { ctx := cmd.Context() opts := utils.MakeOneDriveOpts(cmd) - r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd)) + r, _, _, err := utils.GetAccountAndConnect(ctx, path.OneDriveService, repo.S3Overrides(cmd)) if err != nil { return Only(ctx, err) } @@ -309,5 +309,5 @@ func oneDriveDeleteCmd() *cobra.Command { // deletes a oneDrive service backup. func deleteOneDriveCmd(cmd *cobra.Command, args []string) error { - return genericDeleteCommand(cmd, flags.BackupIDFV, "OneDrive", args) + return genericDeleteCommand(cmd, path.OneDriveService, flags.BackupIDFV, "OneDrive", args) } diff --git a/src/cli/backup/sharepoint.go b/src/cli/backup/sharepoint.go index f1f2b1928..3922bfa3d 100644 --- a/src/cli/backup/sharepoint.go +++ b/src/cli/backup/sharepoint.go @@ -163,7 +163,7 @@ func createSharePointCmd(cmd *cobra.Command, args []string) error { return err } - r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, repo.S3Overrides(cmd)) + r, acct, err := utils.AccountConnectAndWriteRepoConfig(ctx, path.SharePointService, repo.S3Overrides(cmd)) if err != nil { return Only(ctx, err) } @@ -298,7 +298,7 @@ func sharePointDeleteCmd() *cobra.Command { // deletes a sharePoint service backup. func deleteSharePointCmd(cmd *cobra.Command, args []string) error { - return genericDeleteCommand(cmd, flags.BackupIDFV, "SharePoint", args) + return genericDeleteCommand(cmd, path.SharePointService, flags.BackupIDFV, "SharePoint", args) } // ------------------------------------------------------------------------------------------------ @@ -325,7 +325,7 @@ func detailsSharePointCmd(cmd *cobra.Command, args []string) error { ctx := cmd.Context() opts := utils.MakeSharePointOpts(cmd) - r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd)) + r, _, _, err := utils.GetAccountAndConnect(ctx, path.SharePointService, repo.S3Overrides(cmd)) if err != nil { return Only(ctx, err) } diff --git a/src/cli/repo/repo.go b/src/cli/repo/repo.go index 0e4623561..b25096d26 100644 --- a/src/cli/repo/repo.go +++ b/src/cli/repo/repo.go @@ -11,6 +11,7 @@ import ( "github.com/alcionai/corso/src/cli/print" "github.com/alcionai/corso/src/cli/utils" "github.com/alcionai/corso/src/pkg/control/repository" + "github.com/alcionai/corso/src/pkg/path" ) const ( @@ -122,7 +123,7 @@ func handleMaintenanceCmd(cmd *cobra.Command, args []string) error { return err } - r, _, _, err := utils.GetAccountAndConnect(ctx, S3Overrides(cmd)) + r, _, _, err := utils.GetAccountAndConnect(ctx, path.UnknownService, S3Overrides(cmd)) if err != nil { return print.Only(ctx, err) } diff --git a/src/cli/restore/exchange.go b/src/cli/restore/exchange.go index 19476cad7..847e09d83 100644 --- a/src/cli/restore/exchange.go +++ b/src/cli/restore/exchange.go @@ -13,6 +13,7 @@ import ( "github.com/alcionai/corso/src/internal/common/dttm" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/path" ) // called by restore.go to map subcommands to provider-specific handling. @@ -93,7 +94,7 @@ func restoreExchangeCmd(cmd *cobra.Command, args []string) error { return err } - r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd)) + r, _, _, err := utils.GetAccountAndConnect(ctx, path.ExchangeService, repo.S3Overrides(cmd)) if err != nil { return Only(ctx, err) } diff --git a/src/cli/restore/onedrive.go b/src/cli/restore/onedrive.go index 7b306de95..d19f9d891 100644 --- a/src/cli/restore/onedrive.go +++ b/src/cli/restore/onedrive.go @@ -13,6 +13,7 @@ import ( "github.com/alcionai/corso/src/internal/common/dttm" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/path" ) // called by restore.go to map subcommands to provider-specific handling. @@ -92,7 +93,7 @@ func restoreOneDriveCmd(cmd *cobra.Command, args []string) error { return err } - r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd)) + r, _, _, err := utils.GetAccountAndConnect(ctx, path.OneDriveService, repo.S3Overrides(cmd)) if err != nil { return Only(ctx, err) } diff --git a/src/cli/restore/sharepoint.go b/src/cli/restore/sharepoint.go index a01b84850..d9fb0b95d 100644 --- a/src/cli/restore/sharepoint.go +++ b/src/cli/restore/sharepoint.go @@ -13,6 +13,7 @@ import ( "github.com/alcionai/corso/src/internal/common/dttm" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/path" ) // called by restore.go to map subcommands to provider-specific handling. @@ -99,7 +100,7 @@ func restoreSharePointCmd(cmd *cobra.Command, args []string) error { return err } - r, _, _, err := utils.GetAccountAndConnect(ctx, repo.S3Overrides(cmd)) + r, _, _, err := utils.GetAccountAndConnect(ctx, path.SharePointService, repo.S3Overrides(cmd)) if err != nil { return Only(ctx, err) } diff --git a/src/cli/utils/utils.go b/src/cli/utils/utils.go index 56564ee3e..b47071b2b 100644 --- a/src/cli/utils/utils.go +++ b/src/cli/utils/utils.go @@ -21,6 +21,7 @@ import ( func GetAccountAndConnect( ctx context.Context, + pst path.ServiceType, overrides map[string]string, ) (repository.Repository, *storage.Storage, *account.Account, error) { cfg, err := config.GetConfigRepoDetails(ctx, true, true, overrides) @@ -38,14 +39,21 @@ func GetAccountAndConnect( return nil, nil, nil, clues.Wrap(err, "connecting to the "+cfg.Storage.Provider.String()+" repository") } + // this initializes our graph api client configurations, + // including control options such as concurency limitations. + if _, err := r.ConnectToM365(ctx, pst); err != nil { + return nil, nil, nil, clues.Wrap(err, "connecting to m365") + } + return r, &cfg.Storage, &cfg.Account, nil } func AccountConnectAndWriteRepoConfig( ctx context.Context, + pst path.ServiceType, overrides map[string]string, ) (repository.Repository, *account.Account, error) { - r, stg, acc, err := GetAccountAndConnect(ctx, overrides) + r, stg, acc, err := GetAccountAndConnect(ctx, pst, overrides) if err != nil { logger.CtxErr(ctx, err).Info("getting and connecting account") return nil, nil, err diff --git a/src/cmd/factory/impl/common.go b/src/cmd/factory/impl/common.go index b18e73ca8..a92e169d7 100644 --- a/src/cmd/factory/impl/common.go +++ b/src/cmd/factory/impl/common.go @@ -113,6 +113,7 @@ func getControllerAndVerifyResourceOwner( ctx context.Context, resourceCat resource.Category, resourceOwner string, + pst path.ServiceType, ) ( *m365.Controller, account.Account, @@ -136,7 +137,7 @@ func getControllerAndVerifyResourceOwner( return nil, account.Account{}, nil, clues.Wrap(err, "finding m365 account details") } - ctrl, err := m365.NewController(ctx, acct, resourceCat) + ctrl, err := m365.NewController(ctx, acct, resourceCat, pst, control.Options{}) if err != nil { return nil, account.Account{}, nil, clues.Wrap(err, "connecting to graph api") } diff --git a/src/cmd/factory/impl/exchange.go b/src/cmd/factory/impl/exchange.go index bf8450da2..e54196e90 100644 --- a/src/cmd/factory/impl/exchange.go +++ b/src/cmd/factory/impl/exchange.go @@ -52,7 +52,7 @@ func handleExchangeEmailFactory(cmd *cobra.Command, args []string) error { return nil } - ctrl, _, _, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User) + ctrl, _, _, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User, path.ExchangeService) if err != nil { return Only(ctx, err) } @@ -98,7 +98,7 @@ func handleExchangeCalendarEventFactory(cmd *cobra.Command, args []string) error return nil } - ctrl, _, _, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User) + ctrl, _, _, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User, path.ExchangeService) if err != nil { return Only(ctx, err) } @@ -146,7 +146,7 @@ func handleExchangeContactFactory(cmd *cobra.Command, args []string) error { return nil } - ctrl, _, _, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User) + ctrl, _, _, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User, path.ExchangeService) if err != nil { return Only(ctx, err) } diff --git a/src/cmd/factory/impl/onedrive.go b/src/cmd/factory/impl/onedrive.go index 750fa64a3..3d761df1c 100644 --- a/src/cmd/factory/impl/onedrive.go +++ b/src/cmd/factory/impl/onedrive.go @@ -36,7 +36,7 @@ func handleOneDriveFileFactory(cmd *cobra.Command, args []string) error { return nil } - ctrl, acct, inp, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User) + ctrl, acct, inp, err := getControllerAndVerifyResourceOwner(ctx, resource.Users, User, path.OneDriveService) if err != nil { return Only(ctx, err) } diff --git a/src/cmd/factory/impl/sharepoint.go b/src/cmd/factory/impl/sharepoint.go index bdcc952f5..7d55fcc86 100644 --- a/src/cmd/factory/impl/sharepoint.go +++ b/src/cmd/factory/impl/sharepoint.go @@ -36,7 +36,7 @@ func handleSharePointLibraryFileFactory(cmd *cobra.Command, args []string) error return nil } - ctrl, acct, inp, err := getControllerAndVerifyResourceOwner(ctx, resource.Sites, Site) + ctrl, acct, inp, err := getControllerAndVerifyResourceOwner(ctx, resource.Sites, Site, path.SharePointService) if err != nil { return Only(ctx, err) } diff --git a/src/internal/m365/backup_test.go b/src/internal/m365/backup_test.go index 03543061b..6e019a88b 100644 --- a/src/internal/m365/backup_test.go +++ b/src/internal/m365/backup_test.go @@ -66,7 +66,7 @@ func (suite *DataCollectionIntgSuite) TestExchangeDataCollection() { selUsers := []string{suite.user} - ctrl := newController(ctx, suite.T(), resource.Users) + ctrl := newController(ctx, suite.T(), resource.Users, path.ExchangeService) tests := []struct { name string getSelector func(t *testing.T) selectors.Selector @@ -167,7 +167,7 @@ func (suite *DataCollectionIntgSuite) TestDataCollections_invalidResourceOwner() defer flush() owners := []string{"snuffleupagus"} - ctrl := newController(ctx, suite.T(), resource.Users) + ctrl := newController(ctx, suite.T(), resource.Users, path.ExchangeService) tests := []struct { name string getSelector func(t *testing.T) selectors.Selector @@ -253,7 +253,7 @@ func (suite *DataCollectionIntgSuite) TestSharePointDataCollection() { defer flush() selSites := []string{suite.site} - ctrl := newController(ctx, suite.T(), resource.Sites) + ctrl := newController(ctx, suite.T(), resource.Sites, path.SharePointService) tests := []struct { name string expected int @@ -348,7 +348,7 @@ func (suite *SPCollectionIntgSuite) SetupSuite() { ctx, flush := tester.NewContext(suite.T()) defer flush() - suite.connector = newController(ctx, suite.T(), resource.Sites) + suite.connector = newController(ctx, suite.T(), resource.Sites, path.SharePointService) suite.user = tester.M365UserID(suite.T()) tester.LogTimeOfTest(suite.T()) @@ -362,7 +362,7 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Libraries() { var ( siteID = tester.M365SiteID(t) - ctrl = newController(ctx, t, resource.Sites) + ctrl = newController(ctx, t, resource.Sites, path.SharePointService) siteIDs = []string{siteID} ) @@ -409,7 +409,7 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Lists() { var ( siteID = tester.M365SiteID(t) - ctrl = newController(ctx, t, resource.Sites) + ctrl = newController(ctx, t, resource.Sites, path.SharePointService) siteIDs = []string{siteID} ) diff --git a/src/internal/m365/controller.go b/src/internal/m365/controller.go index 910bf52a2..b0c8792e5 100644 --- a/src/internal/m365/controller.go +++ b/src/internal/m365/controller.go @@ -14,6 +14,8 @@ import ( "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -51,7 +53,11 @@ func NewController( ctx context.Context, acct account.Account, rc resource.Category, + pst path.ServiceType, + co control.Options, ) (*Controller, error) { + graph.InitializeConcurrencyLimiter(ctx, pst == path.ExchangeService, co.Parallelism.ItemFetch) + creds, err := acct.M365Config() if err != nil { return nil, clues.Wrap(err, "retrieving m365 account configuration").WithClues(ctx) diff --git a/src/internal/m365/controller_test.go b/src/internal/m365/controller_test.go index 6d181eb66..9557cb988 100644 --- a/src/internal/m365/controller_test.go +++ b/src/internal/m365/controller_test.go @@ -283,7 +283,7 @@ func (suite *ControllerIntegrationSuite) SetupSuite() { ctx, flush := tester.NewContext(t) defer flush() - suite.ctrl = newController(ctx, t, resource.Users) + suite.ctrl = newController(ctx, t, resource.Users, path.ExchangeService) suite.user = tester.M365UserID(t) suite.secondaryUser = tester.SecondaryM365UserID(t) @@ -422,7 +422,7 @@ func runRestore( start := time.Now() - restoreCtrl := newController(ctx, t, config.Resource) + restoreCtrl := newController(ctx, t, config.Resource, path.ExchangeService) restoreSel := getSelectorWith(t, config.Service, config.ResourceOwners, true) deets, err := restoreCtrl.ConsumeRestoreCollections( ctx, @@ -484,7 +484,7 @@ func runBackupAndCompare( nameToID[ro] = ro } - backupCtrl := newController(ctx, t, config.Resource) + backupCtrl := newController(ctx, t, config.Resource, path.ExchangeService) backupCtrl.IDNameLookup = inMock.NewCache(idToName, nameToID) backupSel := backupSelectorForExpected(t, config.Service, expectedDests) @@ -1030,7 +1030,7 @@ func (suite *ControllerIntegrationSuite) TestMultiFolderBackupDifferentNames() { restoreCfg.Location, ) - restoreCtrl := newController(ctx, t, test.resourceCat) + restoreCtrl := newController(ctx, t, test.resourceCat, path.ExchangeService) deets, err := restoreCtrl.ConsumeRestoreCollections( ctx, version.Backup, @@ -1060,7 +1060,7 @@ func (suite *ControllerIntegrationSuite) TestMultiFolderBackupDifferentNames() { // Run a backup and compare its output with what we put in. - backupCtrl := newController(ctx, t, test.resourceCat) + backupCtrl := newController(ctx, t, test.resourceCat, path.ExchangeService) backupSel := backupSelectorForExpected(t, test.service, expectedDests) t.Log("Selective backup of", backupSel) @@ -1209,7 +1209,7 @@ func (suite *ControllerIntegrationSuite) TestBackup_CreatesPrefixCollections() { defer flush() var ( - backupCtrl = newController(ctx, t, test.resourceCat) + backupCtrl = newController(ctx, t, test.resourceCat, path.ExchangeService) backupSel = test.selectorFunc(t) errs = fault.New(true) start = time.Now() diff --git a/src/internal/m365/discovery/discovery_test.go b/src/internal/m365/discovery/discovery_test.go index f4ae315f3..0be5aebe2 100644 --- a/src/internal/m365/discovery/discovery_test.go +++ b/src/internal/m365/discovery/discovery_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/m365/discovery" + "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/credentials" @@ -30,6 +31,13 @@ func TestDiscoveryIntgSuite(t *testing.T) { }) } +func (suite *DiscoveryIntgSuite) SetupSuite() { + ctx, flush := tester.NewContext(suite.T()) + defer flush() + + graph.InitializeConcurrencyLimiter(ctx, true, 4) +} + func (suite *DiscoveryIntgSuite) TestUsers() { t := suite.T() diff --git a/src/internal/m365/exchange/backup.go b/src/internal/m365/exchange/backup.go index 197753ecc..d69f71225 100644 --- a/src/internal/m365/exchange/backup.go +++ b/src/internal/m365/exchange/backup.go @@ -199,9 +199,10 @@ func ProduceBackupCollections( // Turn on concurrency limiter middleware for exchange backups // unless explicitly disabled through DisableConcurrencyLimiterFN cli flag - if !ctrlOpts.ToggleFeatures.DisableConcurrencyLimiter { - graph.InitializeConcurrencyLimiter(ctrlOpts.Parallelism.ItemFetch) - } + graph.InitializeConcurrencyLimiter( + ctx, + ctrlOpts.ToggleFeatures.DisableConcurrencyLimiter, + ctrlOpts.Parallelism.ItemFetch) cdps, canUsePreviousBackup, err := parseMetadataCollections(ctx, metadata) if err != nil { diff --git a/src/internal/m365/exchange/backup_test.go b/src/internal/m365/exchange/backup_test.go index 0af604c90..8cedaf0f0 100644 --- a/src/internal/m365/exchange/backup_test.go +++ b/src/internal/m365/exchange/backup_test.go @@ -399,19 +399,26 @@ func TestBackupIntgSuite(t *testing.T) { } func (suite *BackupIntgSuite) SetupSuite() { - suite.user = tester.M365UserID(suite.T()) - suite.site = tester.M365SiteID(suite.T()) + t := suite.T() - acct := tester.NewM365Account(suite.T()) + ctx, flush := tester.NewContext(t) + defer flush() + + graph.InitializeConcurrencyLimiter(ctx, true, 4) + + suite.user = tester.M365UserID(t) + suite.site = tester.M365SiteID(t) + + acct := tester.NewM365Account(t) creds, err := acct.M365Config() - require.NoError(suite.T(), err, clues.ToCore(err)) + require.NoError(t, err, clues.ToCore(err)) suite.ac, err = api.NewClient(creds) - require.NoError(suite.T(), err, clues.ToCore(err)) + require.NoError(t, err, clues.ToCore(err)) suite.tenantID = creds.AzureTenantID - tester.LogTimeOfTest(suite.T()) + tester.LogTimeOfTest(t) } func (suite *BackupIntgSuite) TestMailFetch() { diff --git a/src/internal/m365/graph/betasdk/beta_client_test.go b/src/internal/m365/graph/betasdk/beta_client_test.go index 9e7fcab00..47845a207 100644 --- a/src/internal/m365/graph/betasdk/beta_client_test.go +++ b/src/internal/m365/graph/betasdk/beta_client_test.go @@ -26,6 +26,12 @@ func TestBetaClientSuite(t *testing.T) { func (suite *BetaClientSuite) SetupSuite() { t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + graph.InitializeConcurrencyLimiter(ctx, false, 4) + a := tester.NewM365Account(t) m365, err := a.M365Config() require.NoError(t, err, clues.ToCore(err)) diff --git a/src/internal/m365/graph/concurrency_middleware.go b/src/internal/m365/graph/concurrency_middleware.go index 7da508217..f886abe5f 100644 --- a/src/internal/m365/graph/concurrency_middleware.go +++ b/src/internal/m365/graph/concurrency_middleware.go @@ -20,34 +20,41 @@ import ( // "how many calls at one time" // --------------------------------------------------------------------------- -// concurrencyLimiter middleware limits the number of concurrent requests to graph API -type concurrencyLimiter struct { +// concurrencyLimiterMiddleware middleware limits the number of concurrent requests to graph API +type concurrencyLimiterMiddleware struct { semaphore chan struct{} } var ( - once sync.Once - concurrencyLim *concurrencyLimiter - maxConcurrentRequests = 4 + once sync.Once + concurrencyLimitMiddlewareSingleton = &concurrencyLimiterMiddleware{} + maxConcurrentRequests = 4 ) -func generateConcurrencyLimiter(capacity int) *concurrencyLimiter { +func generateConcurrencyLimiter(capacity int) *concurrencyLimiterMiddleware { if capacity < 1 || capacity > maxConcurrentRequests { capacity = maxConcurrentRequests } - return &concurrencyLimiter{ + return &concurrencyLimiterMiddleware{ semaphore: make(chan struct{}, capacity), } } -func InitializeConcurrencyLimiter(capacity int) { +func InitializeConcurrencyLimiter(ctx context.Context, enable bool, capacity int) { once.Do(func() { - concurrencyLim = generateConcurrencyLimiter(capacity) + switch enable { + case true: + logger.Ctx(ctx).Infow("turning on the concurrency limiter", "concurrency_limit", capacity) + concurrencyLimitMiddlewareSingleton.semaphore = generateConcurrencyLimiter(capacity).semaphore + case false: + logger.Ctx(ctx).Info("turning off the concurrency limiter") + concurrencyLimitMiddlewareSingleton = nil + } }) } -func (cl *concurrencyLimiter) Intercept( +func (cl *concurrencyLimiterMiddleware) Intercept( pipeline khttp.Pipeline, middlewareIndex int, req *http.Request, diff --git a/src/internal/m365/graph/concurrency_middleware_test.go b/src/internal/m365/graph/concurrency_middleware_test.go index f0507bc63..c6b937784 100644 --- a/src/internal/m365/graph/concurrency_middleware_test.go +++ b/src/internal/m365/graph/concurrency_middleware_test.go @@ -76,10 +76,13 @@ func (suite *ConcurrencyMWUnitTestSuite) TestConcurrencyLimiter() { func (suite *ConcurrencyMWUnitTestSuite) TestInitializeConcurrencyLimiter() { t := suite.T() - InitializeConcurrencyLimiter(2) - InitializeConcurrencyLimiter(4) + ctx, flush := tester.NewContext(t) + defer flush() - assert.Equal(t, cap(concurrencyLim.semaphore), 2, "singleton semaphore capacity changed") + InitializeConcurrencyLimiter(ctx, true, 2) + InitializeConcurrencyLimiter(ctx, false, 4) + + assert.Equal(t, cap(concurrencyLimitMiddlewareSingleton.semaphore), 2, "singleton semaphore capacity changed") } func (suite *ConcurrencyMWUnitTestSuite) TestGenerateConcurrencyLimiter() { diff --git a/src/internal/m365/graph/service.go b/src/internal/m365/graph/service.go index 1c85e760c..379b969a0 100644 --- a/src/internal/m365/graph/service.go +++ b/src/internal/m365/graph/service.go @@ -261,8 +261,8 @@ func kiotaMiddlewares( }...) // Optionally add concurrency limiter middleware if it has been initialized. - if concurrencyLim != nil { - mw = append(mw, concurrencyLim) + if concurrencyLimitMiddlewareSingleton != nil { + mw = append(mw, concurrencyLimitMiddlewareSingleton) } mw = append( diff --git a/src/internal/m365/helper_test.go b/src/internal/m365/helper_test.go index 04c0ecc81..1a71498a4 100644 --- a/src/internal/m365/helper_test.go +++ b/src/internal/m365/helper_test.go @@ -23,6 +23,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/resource" m365Stub "github.com/alcionai/corso/src/internal/m365/stub" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" @@ -1190,10 +1191,15 @@ func getSelectorWith( } } -func newController(ctx context.Context, t *testing.T, r resource.Category) *Controller { +func newController( + ctx context.Context, + t *testing.T, + r resource.Category, + pst path.ServiceType, +) *Controller { a := tester.NewM365Account(t) - controller, err := NewController(ctx, a, r) + controller, err := NewController(ctx, a, r, pst, control.Options{}) require.NoError(t, err, clues.ToCore(err)) return controller diff --git a/src/internal/m365/onedrive/item_collector_test.go b/src/internal/m365/onedrive/item_collector_test.go index a935cc802..d17c62d84 100644 --- a/src/internal/m365/onedrive/item_collector_test.go +++ b/src/internal/m365/onedrive/item_collector_test.go @@ -299,6 +299,11 @@ func TestOneDriveSuite(t *testing.T) { func (suite *OneDriveIntgSuite) SetupSuite() { t := suite.T() + ctx, flush := tester.NewContext(t) + defer flush() + + graph.InitializeConcurrencyLimiter(ctx, true, 4) + suite.userID = tester.SecondaryM365UserID(t) acct := tester.NewM365Account(t) diff --git a/src/internal/m365/onedrive_test.go b/src/internal/m365/onedrive_test.go index f93fd0826..575ee3b3f 100644 --- a/src/internal/m365/onedrive_test.go +++ b/src/internal/m365/onedrive_test.go @@ -121,7 +121,7 @@ func NewSuiteInfoImpl( rsc = resource.Sites } - ctrl := newController(ctx, t, rsc) + ctrl := newController(ctx, t, rsc, path.OneDriveService) return suiteInfoImpl{ ac: ctrl.AC, diff --git a/src/internal/m365/sharepoint/api/pages_test.go b/src/internal/m365/sharepoint/api/pages_test.go index 58406557e..d5c2b7c24 100644 --- a/src/internal/m365/sharepoint/api/pages_test.go +++ b/src/internal/m365/sharepoint/api/pages_test.go @@ -40,6 +40,11 @@ type SharePointPageSuite struct { func (suite *SharePointPageSuite) SetupSuite() { t := suite.T() + ctx, flush := tester.NewContext(t) + defer flush() + + graph.InitializeConcurrencyLimiter(ctx, true, 4) + suite.siteID = tester.M365SiteID(t) a := tester.NewM365Account(t) m365, err := a.M365Config() diff --git a/src/internal/m365/sharepoint/backup_test.go b/src/internal/m365/sharepoint/backup_test.go index ecffd5ecb..4cbeff44a 100644 --- a/src/internal/m365/sharepoint/backup_test.go +++ b/src/internal/m365/sharepoint/backup_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/common/idname/mock" + "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/onedrive" odConsts "github.com/alcionai/corso/src/internal/m365/onedrive/consts" "github.com/alcionai/corso/src/internal/tester" @@ -174,11 +175,17 @@ func TestSharePointPagesSuite(t *testing.T) { suite.Run(t, &SharePointPagesSuite{ Suite: tester.NewIntegrationSuite( t, - [][]string{tester.M365AcctCredEnvs}, - ), + [][]string{tester.M365AcctCredEnvs}), }) } +func (suite *SharePointPagesSuite) SetupSuite() { + ctx, flush := tester.NewContext(suite.T()) + defer flush() + + graph.InitializeConcurrencyLimiter(ctx, false, 4) +} + func (suite *SharePointPagesSuite) TestCollectPages() { t := suite.T() diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index ba3c32d17..3d3c78b99 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -525,6 +525,11 @@ func TestBackupOpIntegrationSuite(t *testing.T) { func (suite *BackupOpIntegrationSuite) SetupSuite() { t := suite.T() + ctx, flush := tester.NewContext(t) + defer flush() + + graph.InitializeConcurrencyLimiter(ctx, true, 4) + suite.user = tester.M365UserID(t) suite.site = tester.M365SiteID(t) @@ -1867,7 +1872,7 @@ func runDriveIncrementalTest( } for _, test := range table { suite.Run(test.name, func() { - cleanCtrl, err := m365.NewController(ctx, acct, rc) + cleanCtrl, err := m365.NewController(ctx, acct, rc, sel.PathService(), control.Options{}) require.NoError(t, err, clues.ToCore(err)) var ( @@ -1951,7 +1956,9 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_oneDriveOwnerMigration() { ctrl, err := m365.NewController( ctx, acct, - resource.Users) + resource.Users, + path.OneDriveService, + control.Options{}) require.NoError(t, err, clues.ToCore(err)) userable, err := ctrl.AC.Users().GetByID(ctx, suite.user) diff --git a/src/internal/operations/help_test.go b/src/internal/operations/help_test.go index c1830bf25..309b4f7b5 100644 --- a/src/internal/operations/help_test.go +++ b/src/internal/operations/help_test.go @@ -11,6 +11,7 @@ import ( "github.com/alcionai/corso/src/internal/m365" "github.com/alcionai/corso/src/internal/m365/resource" "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/selectors" ) @@ -26,7 +27,7 @@ func ControllerWithSelector( ins idname.Cacher, onFail func(), ) (*m365.Controller, selectors.Selector) { - ctrl, err := m365.NewController(ctx, acct, cr) + ctrl, err := m365.NewController(ctx, acct, cr, sel.PathService(), control.Options{}) if !assert.NoError(t, err, clues.ToCore(err)) { if onFail != nil { onFail() diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index 7faa5051d..d2107dce0 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -465,7 +465,9 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_errorNoBackup() { ctrl, err := m365.NewController( ctx, suite.acct, - resource.Users) + resource.Users, + rsel.PathService(), + control.Options{}) require.NoError(t, err, clues.ToCore(err)) ro, err := NewRestoreOperation( diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 8bb99fef1..54471149f 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -28,6 +28,7 @@ import ( rep "github.com/alcionai/corso/src/pkg/control/repository" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/storage" "github.com/alcionai/corso/src/pkg/store" @@ -78,6 +79,12 @@ type Repository interface { ) (operations.MaintenanceOperation, error) DeleteBackup(ctx context.Context, id string) error BackupGetter + // ConnectToM365 establishes graph api connections + // and initializes api client configurations. + ConnectToM365( + ctx context.Context, + pst path.ServiceType, + ) (*m365.Controller, error) } // Repository contains storage provider information. @@ -316,14 +323,14 @@ func (r repository) NewBackupWithLookup( sel selectors.Selector, ins idname.Cacher, ) (operations.BackupOperation, error) { - ctrl, err := connectToM365(ctx, sel, r.Account) + ctrl, err := connectToM365(ctx, sel.PathService(), r.Account, r.Opts) if err != nil { - return operations.BackupOperation{}, errors.Wrap(err, "connecting to m365") + return operations.BackupOperation{}, clues.Wrap(err, "connecting to m365") } ownerID, ownerName, err := ctrl.PopulateOwnerIDAndNamesFrom(ctx, sel.DiscreteOwner, ins) if err != nil { - return operations.BackupOperation{}, errors.Wrap(err, "resolving resource owner details") + return operations.BackupOperation{}, clues.Wrap(err, "resolving resource owner details") } // TODO: retrieve display name from gc @@ -348,9 +355,9 @@ func (r repository) NewRestore( sel selectors.Selector, restoreCfg control.RestoreConfig, ) (operations.RestoreOperation, error) { - ctrl, err := connectToM365(ctx, sel, r.Account) + ctrl, err := connectToM365(ctx, sel.PathService(), r.Account, r.Opts) if err != nil { - return operations.RestoreOperation{}, errors.Wrap(err, "connecting to m365") + return operations.RestoreOperation{}, clues.Wrap(err, "connecting to m365") } return operations.NewRestoreOperation( @@ -585,6 +592,18 @@ func deleteBackup( return sw.DeleteBackup(ctx, model.StableID(id)) } +func (r repository) ConnectToM365( + ctx context.Context, + pst path.ServiceType, +) (*m365.Controller, error) { + ctrl, err := connectToM365(ctx, pst, r.Account, r.Opts) + if err != nil { + return nil, clues.Wrap(err, "connecting to m365") + } + + return ctrl, nil +} + // --------------------------------------------------------------------------- // Repository ID Model // --------------------------------------------------------------------------- @@ -633,11 +652,11 @@ func newRepoID(s storage.Storage) string { // helpers // --------------------------------------------------------------------------- -// produces a graph m365. func connectToM365( ctx context.Context, - sel selectors.Selector, + pst path.ServiceType, acct account.Account, + co control.Options, ) (*m365.Controller, error) { complete := observe.MessageWithCompletion(ctx, "Connecting to M365") defer func() { @@ -647,11 +666,11 @@ func connectToM365( // retrieve data from the producer rc := resource.Users - if sel.Service == selectors.ServiceSharePoint { + if pst == path.SharePointService { rc = resource.Sites } - ctrl, err := m365.NewController(ctx, acct, rc) + ctrl, err := m365.NewController(ctx, acct, rc, pst, co) if err != nil { return nil, err } diff --git a/src/pkg/services/m365/api/events_test.go b/src/pkg/services/m365/api/events_test.go index 5b659972c..b39f7488a 100644 --- a/src/pkg/services/m365/api/events_test.go +++ b/src/pkg/services/m365/api/events_test.go @@ -223,7 +223,7 @@ type EventsAPIIntgSuite struct { its intgTesterSetup } -func TestEventsAPIntgSuite(t *testing.T) { +func TestEventsAPIIntgSuite(t *testing.T) { suite.Run(t, &EventsAPIIntgSuite{ Suite: tester.NewIntegrationSuite( t, diff --git a/src/pkg/services/m365/api/helper_test.go b/src/pkg/services/m365/api/helper_test.go index 7d0764ecd..a81ce63cb 100644 --- a/src/pkg/services/m365/api/helper_test.go +++ b/src/pkg/services/m365/api/helper_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api/mock" @@ -88,6 +89,8 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup { ctx, flush := tester.NewContext(t) defer flush() + graph.InitializeConcurrencyLimiter(ctx, true, 4) + a := tester.NewM365Account(t) creds, err := a.M365Config() require.NoError(t, err, clues.ToCore(err)) diff --git a/src/pkg/services/m365/m365_test.go b/src/pkg/services/m365/m365_test.go index e77338dea..838d908be 100644 --- a/src/pkg/services/m365/m365_test.go +++ b/src/pkg/services/m365/m365_test.go @@ -37,6 +37,8 @@ func (suite *M365IntegrationSuite) TestUsers() { ctx, flush := tester.NewContext(t) defer flush() + graph.InitializeConcurrencyLimiter(ctx, true, 4) + acct := tester.NewM365Account(suite.T()) users, err := Users(ctx, acct, fault.New(true)) @@ -84,6 +86,8 @@ func (suite *M365IntegrationSuite) TestGetUserInfo() { ctx, flush := tester.NewContext(t) defer flush() + graph.InitializeConcurrencyLimiter(ctx, true, 4) + var ( acct = tester.NewM365Account(t) uid = tester.M365UserID(t)