diff --git a/src/cli/backup/exchange.go b/src/cli/backup/exchange.go index beb378dbf..683ca4357 100644 --- a/src/cli/backup/exchange.go +++ b/src/cli/backup/exchange.go @@ -163,7 +163,12 @@ func createExchangeCmd(cmd *cobra.Command, args []string) error { sel := exchangeBackupCreateSelectors(flags.UserFV, flags.CategoryDataFV) - ins, err := utils.UsersMap(ctx, *acct, utils.Control(), fault.New(true)) + ins, err := utils.UsersMap( + ctx, + *acct, + utils.Control(), + r.Counter(), + fault.New(true)) if err != nil { return Only(ctx, clues.Wrap(err, "Failed to retrieve M365 users")) } diff --git a/src/cli/backup/helpers_test.go b/src/cli/backup/helpers_test.go index 03bea250b..66d27a762 100644 --- a/src/cli/backup/helpers_test.go +++ b/src/cli/backup/helpers_test.go @@ -22,6 +22,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/repository" "github.com/alcionai/corso/src/pkg/services/m365/api" @@ -35,13 +36,13 @@ import ( // GockClient produces a new exchange api client that can be // mocked using gock. -func gockClient(creds account.M365Config) (api.Client, error) { - s, err := gmock.NewService(creds) +func gockClient(creds account.M365Config, counter *count.Bus) (api.Client, error) { + s, err := gmock.NewService(creds, counter) if err != nil { return api.Client{}, err } - li, err := gmock.NewService(creds, graph.NoTimeout()) + li, err := gmock.NewService(creds, counter, graph.NoTimeout()) if err != nil { return api.Client{}, err } @@ -85,10 +86,13 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup { creds, err := its.acct.M365Config() require.NoError(t, err, clues.ToCore(err)) - its.ac, err = api.NewClient(creds, control.DefaultOptions()) + its.ac, err = api.NewClient( + creds, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) - its.gockAC, err = gockClient(creds) + its.gockAC, err = gockClient(creds, count.New()) require.NoError(t, err, clues.ToCore(err)) // user drive diff --git a/src/cli/backup/onedrive.go b/src/cli/backup/onedrive.go index 6500b5ef5..0614bbad7 100644 --- a/src/cli/backup/onedrive.go +++ b/src/cli/backup/onedrive.go @@ -143,7 +143,12 @@ func createOneDriveCmd(cmd *cobra.Command, args []string) error { sel := oneDriveBackupCreateSelectors(flags.UserFV) - ins, err := utils.UsersMap(ctx, *acct, utils.Control(), fault.New(true)) + ins, err := utils.UsersMap( + ctx, + *acct, + utils.Control(), + r.Counter(), + fault.New(true)) if err != nil { return Only(ctx, clues.Wrap(err, "Failed to retrieve M365 users")) } diff --git a/src/cli/utils/users.go b/src/cli/utils/users.go index affa520fd..92d0b4fcc 100644 --- a/src/cli/utils/users.go +++ b/src/cli/utils/users.go @@ -8,6 +8,7 @@ import ( "github.com/alcionai/corso/src/internal/common/idname" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -17,9 +18,10 @@ func UsersMap( ctx context.Context, acct account.Account, co control.Options, + counter *count.Bus, errs *fault.Bus, ) (idname.Cacher, error) { - au, err := makeUserAPI(acct, co) + au, err := makeUserAPI(acct, co, counter) if err != nil { return nil, clues.Wrap(err, "constructing a graph client") } @@ -27,13 +29,17 @@ func UsersMap( return au.GetAllIDsAndNames(ctx, errs) } -func makeUserAPI(acct account.Account, co control.Options) (api.Users, error) { +func makeUserAPI( + acct account.Account, + co control.Options, + counter *count.Bus, +) (api.Users, error) { creds, err := acct.M365Config() if err != nil { return api.Users{}, clues.Wrap(err, "getting m365 account creds") } - cli, err := api.NewClient(creds, co) + cli, err := api.NewClient(creds, co, counter) if err != nil { return api.Users{}, clues.Wrap(err, "constructing api client") } diff --git a/src/cmd/factory/impl/common.go b/src/cmd/factory/impl/common.go index 876b1dc2e..6c69e661f 100644 --- a/src/cmd/factory/impl/common.go +++ b/src/cmd/factory/impl/common.go @@ -145,7 +145,12 @@ func getControllerAndVerifyResourceOwner( return nil, account.Account{}, nil, clues.Wrap(err, "finding m365 account details") } - ctrl, err := m365.NewController(ctx, acct, pst, control.Options{}) + ctrl, err := m365.NewController( + ctx, + acct, + pst, + control.Options{}, + count.New()) if err != nil { return nil, account.Account{}, nil, clues.Wrap(err, "connecting to graph api") } diff --git a/src/cmd/sanity_test/common/common.go b/src/cmd/sanity_test/common/common.go index ec505dbd7..3008fa13a 100644 --- a/src/cmd/sanity_test/common/common.go +++ b/src/cmd/sanity_test/common/common.go @@ -8,6 +8,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/credentials" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -62,5 +63,5 @@ func GetAC() (api.Client, error) { AzureTenantID: os.Getenv(account.AzureTenantID), } - return api.NewClient(creds, control.DefaultOptions()) + return api.NewClient(creds, control.DefaultOptions(), count.New()) } diff --git a/src/internal/kopia/inject/inject.go b/src/internal/kopia/inject/inject.go index 09425bc9b..6e677f300 100644 --- a/src/internal/kopia/inject/inject.go +++ b/src/internal/kopia/inject/inject.go @@ -23,8 +23,8 @@ type ( pmr prefixmatcher.StringSetReader, tags map[string]string, buildTreeWithBase bool, - errs *fault.Bus, counter *count.Bus, + errs *fault.Bus, ) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error) } diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 0febc3e68..d7d728762 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -152,8 +152,8 @@ func (w Wrapper) ConsumeBackupCollections( globalExcludeSet prefixmatcher.StringSetReader, additionalTags map[string]string, buildTreeWithBase bool, - errs *fault.Bus, counter *count.Bus, + errs *fault.Bus, ) (*BackupStats, *details.Builder, DetailsMergeInfoer, error) { if w.c == nil { return nil, nil, nil, clues.Stack(errNotConnected).WithClues(ctx) diff --git a/src/internal/kopia/wrapper_scale_test.go b/src/internal/kopia/wrapper_scale_test.go index 50567d453..f3934ccda 100644 --- a/src/internal/kopia/wrapper_scale_test.go +++ b/src/internal/kopia/wrapper_scale_test.go @@ -110,8 +110,8 @@ func BenchmarkHierarchyMerge(b *testing.B) { nil, nil, true, - fault.New(true), - counter) + counter, + fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.Zero(t, stats.IgnoredErrorCount) diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index a50f03962..9d365c92f 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -895,8 +895,8 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { nil, tags, true, - fault.New(true), - counter) + counter, + fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.Equal(t, test.expectedUploadedFiles, stats.TotalFileCount, "total files") @@ -1215,8 +1215,8 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_NoDetailsForMeta() { nil, tags, true, - fault.New(true), - counter) + counter, + fault.New(true)) assert.NoError(t, err, clues.ToCore(err)) assert.Equal(t, test.expectedUploadedFiles, stats.TotalFileCount, "total files") @@ -1310,8 +1310,8 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { nil, nil, true, - fault.New(true), - count.New()) + count.New(), + fault.New(true)) require.NoError(t, err, clues.ToCore(err)) err = k.Compression(ctx, "gzip") @@ -1406,8 +1406,8 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { nil, nil, true, - errs, - counter) + counter, + errs) require.Error(t, err, clues.ToCore(err)) assert.Zero(t, stats.ErrorCount, "error count") assert.Zero(t, counter.Get(count.PersistenceErrors), "error count") @@ -1494,8 +1494,8 @@ func (suite *KopiaIntegrationSuite) TestBackupCollectionsHandlesNoCollections() nil, nil, true, - fault.New(true), - count.New()) + count.New(), + fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.Equal(t, BackupStats{}, *s) @@ -1656,8 +1656,8 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { nil, nil, false, - fault.New(true), - counter) + counter, + fault.New(true)) require.NoError(t, err, clues.ToCore(err)) require.Zero(t, stats.ErrorCount) require.Zero(t, counter.Get(count.PersistenceErrors)) @@ -1786,8 +1786,8 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { excluded, nil, true, - fault.New(true), - counter) + counter, + fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.Equal(t, test.expectedCachedItems, stats.CachedFileCount) assert.Equal(t, test.expectedUncachedItems, stats.UncachedFileCount) diff --git a/src/internal/m365/backup.go b/src/internal/m365/backup.go index 886d66cda..204232fb0 100644 --- a/src/internal/m365/backup.go +++ b/src/internal/m365/backup.go @@ -17,6 +17,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/service/sharepoint" "github.com/alcionai/corso/src/internal/operations/inject" bupMD "github.com/alcionai/corso/src/pkg/backup/metadata" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/filters" "github.com/alcionai/corso/src/pkg/path" @@ -35,6 +36,7 @@ import ( func (ctrl *Controller) ProduceBackupCollections( ctx context.Context, bpc inject.BackupProducerConfig, + counter *count.Bus, errs *fault.Bus, ) ([]data.BackupCollection, prefixmatcher.StringSetReader, bool, error) { service := bpc.Selector.PathService() @@ -94,6 +96,7 @@ func (ctrl *Controller) ProduceBackupCollections( ctrl.AC, ctrl.credentials, ctrl.UpdateStatus, + counter, errs) if err != nil { return nil, nil, false, err diff --git a/src/internal/m365/backup_test.go b/src/internal/m365/backup_test.go index e17915dcc..07f2aa8f4 100644 --- a/src/internal/m365/backup_test.go +++ b/src/internal/m365/backup_test.go @@ -22,6 +22,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/internal/version" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" @@ -61,7 +62,10 @@ func (suite *DataCollectionIntgSuite) SetupSuite() { suite.tenantID = creds.AzureTenantID - suite.ac, err = api.NewClient(creds, control.DefaultOptions()) + suite.ac, err = api.NewClient( + creds, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) } @@ -251,6 +255,7 @@ func (suite *DataCollectionIntgSuite) TestDataCollections_invalidResourceOwner() collections, excludes, canUsePreviousBackup, err := ctrl.ProduceBackupCollections( ctx, bpc, + count.New(), fault.New(true)) assert.Error(t, err, clues.ToCore(err)) assert.False(t, canUsePreviousBackup, "can use previous backup") @@ -311,6 +316,7 @@ func (suite *DataCollectionIntgSuite) TestSharePointDataCollection() { suite.ac, ctrl.credentials, ctrl.UpdateStatus, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.True(t, canUsePreviousBackup, "can use previous backup") @@ -398,6 +404,7 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Libraries() { cols, excludes, canUsePreviousBackup, err := ctrl.ProduceBackupCollections( ctx, bpc, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.True(t, canUsePreviousBackup, "can use previous backup") @@ -448,6 +455,7 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Lists() { cols, excludes, canUsePreviousBackup, err := ctrl.ProduceBackupCollections( ctx, bpc, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.True(t, canUsePreviousBackup, "can use previous backup") @@ -534,6 +542,7 @@ func (suite *GroupsCollectionIntgSuite) TestCreateGroupsCollection_SharePoint() collections, excludes, canUsePreviousBackup, err := ctrl.ProduceBackupCollections( ctx, bpc, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.True(t, canUsePreviousBackup, "can use previous backup") @@ -634,6 +643,7 @@ func (suite *GroupsCollectionIntgSuite) TestCreateGroupsCollection_SharePoint_In collections, excludes, canUsePreviousBackup, err := ctrl.ProduceBackupCollections( ctx, bpc, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.True(t, canUsePreviousBackup, "can use previous backup") diff --git a/src/internal/m365/collection/drive/helper_test.go b/src/internal/m365/collection/drive/helper_test.go index 0c9ec8f8c..29e6e63c0 100644 --- a/src/internal/m365/collection/drive/helper_test.go +++ b/src/internal/m365/collection/drive/helper_test.go @@ -10,6 +10,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -21,7 +22,10 @@ type oneDriveService struct { } func NewOneDriveService(credentials account.M365Config) (*oneDriveService, error) { - ac, err := api.NewClient(credentials, control.DefaultOptions()) + ac, err := api.NewClient( + credentials, + control.DefaultOptions(), + count.New()) if err != nil { return nil, err } diff --git a/src/internal/m365/collection/drive/item.go b/src/internal/m365/collection/drive/item.go index 3756d0abd..6d86d10be 100644 --- a/src/internal/m365/collection/drive/item.go +++ b/src/internal/m365/collection/drive/item.go @@ -17,6 +17,7 @@ import ( "github.com/alcionai/corso/src/internal/common/str" "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata" "github.com/alcionai/corso/src/internal/m365/graph" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -191,6 +192,7 @@ func driveItemWriter( nicu NewItemContentUploader, driveID, itemID string, itemSize int64, + counter *count.Bus, ) (io.Writer, string, error) { ctx = clues.Add(ctx, "upload_item_id", itemID) @@ -199,7 +201,11 @@ func driveItemWriter( return nil, "", clues.Stack(err) } - iw := graph.NewLargeItemWriter(itemID, ptr.Val(icu.GetUploadUrl()), itemSize) + iw := graph.NewLargeItemWriter( + itemID, + ptr.Val(icu.GetUploadUrl()), + itemSize, + counter) return iw, ptr.Val(icu.GetUploadUrl()), nil } diff --git a/src/internal/m365/collection/drive/item_collector_test.go b/src/internal/m365/collection/drive/item_collector_test.go index 0e5e44ace..8f2a53055 100644 --- a/src/internal/m365/collection/drive/item_collector_test.go +++ b/src/internal/m365/collection/drive/item_collector_test.go @@ -18,6 +18,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/services/m365/api" @@ -232,7 +233,10 @@ func (suite *OneDriveIntgSuite) SetupSuite() { suite.creds = creds - suite.ac, err = api.NewClient(creds, control.DefaultOptions()) + suite.ac, err = api.NewClient( + creds, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) } diff --git a/src/internal/m365/collection/drive/item_test.go b/src/internal/m365/collection/drive/item_test.go index 7fc9e065c..bec6d6f57 100644 --- a/src/internal/m365/collection/drive/item_test.go +++ b/src/internal/m365/collection/drive/item_test.go @@ -22,6 +22,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control/testdata" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -231,7 +232,8 @@ func (suite *ItemIntegrationSuite) TestItemWriter() { rh, test.driveID, ptr.Val(newItem.GetId()), - writeSize) + writeSize, + count.New()) require.NoError(t, err, clues.ToCore(err)) // Using a 32 KB buffer for the copy allows us to validate the diff --git a/src/internal/m365/collection/drive/restore.go b/src/internal/m365/collection/drive/restore.go index e5eb9c8b7..a014f29ba 100644 --- a/src/internal/m365/collection/drive/restore.go +++ b/src/internal/m365/collection/drive/restore.go @@ -788,7 +788,13 @@ func restoreFile( return "", details.ItemInfo{}, err } - w, uploadURL, err := driveItemWriter(ctx, ir, driveID, ptr.Val(newItem.GetId()), ss.Size()) + w, uploadURL, err := driveItemWriter( + ctx, + ir, + driveID, + ptr.Val(newItem.GetId()), + ss.Size(), + ctr) if err != nil { return "", details.ItemInfo{}, clues.Wrap(err, "get item upload session") } @@ -841,7 +847,11 @@ func restoreFile( // refresh the io.Writer to restart the upload // TODO: @vkamra verify if var session is the desired input - w = graph.NewLargeItemWriter(ptr.Val(newItem.GetId()), uploadURL, ss.Size()) + w = graph.NewLargeItemWriter( + ptr.Val(newItem.GetId()), + uploadURL, + ss.Size(), + ctr) } if err != nil { diff --git a/src/internal/m365/collection/drive/url_cache_test.go b/src/internal/m365/collection/drive/url_cache_test.go index 99327d8fa..6697eac00 100644 --- a/src/internal/m365/collection/drive/url_cache_test.go +++ b/src/internal/m365/collection/drive/url_cache_test.go @@ -24,6 +24,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control/testdata" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api/pagers" @@ -61,7 +62,10 @@ func (suite *URLCacheIntegrationSuite) SetupSuite() { creds, err := acct.M365Config() require.NoError(t, err, clues.ToCore(err)) - suite.ac, err = api.NewClient(creds, control.DefaultOptions()) + suite.ac, err = api.NewClient( + creds, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) drive, err := suite.ac.Users().GetDefaultDrive(ctx, suite.user) @@ -159,7 +163,7 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() { require.Equal(t, false, props.isDeleted) // Validate download URL - c := graph.NewNoTimeoutHTTPWrapper() + c := graph.NewNoTimeoutHTTPWrapper(count.New()) resp, err := c.Request( ctx, diff --git a/src/internal/m365/collection/exchange/backup_test.go b/src/internal/m365/collection/exchange/backup_test.go index 55d3c9b17..1c4efc139 100644 --- a/src/internal/m365/collection/exchange/backup_test.go +++ b/src/internal/m365/collection/exchange/backup_test.go @@ -27,6 +27,7 @@ import ( "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/backup/metadata" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" @@ -432,7 +433,10 @@ func (suite *BackupIntgSuite) SetupSuite() { creds, err := acct.M365Config() require.NoError(t, err, clues.ToCore(err)) - suite.ac, err = api.NewClient(creds, control.DefaultOptions()) + suite.ac, err = api.NewClient( + creds, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) suite.tenantID = creds.AzureTenantID diff --git a/src/internal/m365/collection/exchange/container_resolver_test.go b/src/internal/m365/collection/exchange/container_resolver_test.go index 12f8c06ee..a552f20ba 100644 --- a/src/internal/m365/collection/exchange/container_resolver_test.go +++ b/src/internal/m365/collection/exchange/container_resolver_test.go @@ -18,6 +18,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" @@ -699,7 +700,10 @@ func (suite *ContainerResolverSuite) SetupSuite() { } func (suite *ContainerResolverSuite) TestPopulate() { - ac, err := api.NewClient(suite.credentials, control.DefaultOptions()) + ac, err := api.NewClient( + suite.credentials, + control.DefaultOptions(), + count.New()) require.NoError(suite.T(), err, clues.ToCore(err)) eventFunc := func(t *testing.T) graph.ContainerResolver { diff --git a/src/internal/m365/collection/exchange/helper_test.go b/src/internal/m365/collection/exchange/helper_test.go index 9b1583b9c..9d4fccb32 100644 --- a/src/internal/m365/collection/exchange/helper_test.go +++ b/src/internal/m365/collection/exchange/helper_test.go @@ -10,6 +10,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -31,7 +32,10 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup { its.creds = creds - its.ac, err = api.NewClient(creds, control.DefaultOptions()) + its.ac, err = api.NewClient( + creds, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) its.userID = tconfig.GetM365UserID(ctx) diff --git a/src/internal/m365/collection/exchange/mail_container_cache_test.go b/src/internal/m365/collection/exchange/mail_container_cache_test.go index bf1e7b52b..9fa738484 100644 --- a/src/internal/m365/collection/exchange/mail_container_cache_test.go +++ b/src/internal/m365/collection/exchange/mail_container_cache_test.go @@ -13,6 +13,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -83,7 +84,10 @@ func (suite *MailFolderCacheIntegrationSuite) TestDeltaFetch() { ctx, flush := tester.NewContext(t) defer flush() - ac, err := api.NewClient(suite.credentials, control.DefaultOptions()) + ac, err := api.NewClient( + suite.credentials, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) acm := ac.Mail() diff --git a/src/internal/m365/collection/exchange/restore_test.go b/src/internal/m365/collection/exchange/restore_test.go index 1fea2f0c7..5ea04adc6 100644 --- a/src/internal/m365/collection/exchange/restore_test.go +++ b/src/internal/m365/collection/exchange/restore_test.go @@ -44,7 +44,10 @@ func (suite *RestoreIntgSuite) SetupSuite() { require.NoError(t, err, clues.ToCore(err)) suite.credentials = m365 - suite.ac, err = api.NewClient(m365, control.DefaultOptions()) + suite.ac, err = api.NewClient( + m365, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) } diff --git a/src/internal/m365/collection/groups/backup_test.go b/src/internal/m365/collection/groups/backup_test.go index 7976bf5ca..1f2a7568d 100644 --- a/src/internal/m365/collection/groups/backup_test.go +++ b/src/internal/m365/collection/groups/backup_test.go @@ -24,6 +24,7 @@ import ( "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/metadata" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" @@ -463,7 +464,10 @@ func (suite *BackupIntgSuite) SetupSuite() { creds, err := acct.M365Config() require.NoError(t, err, clues.ToCore(err)) - suite.ac, err = api.NewClient(creds, control.DefaultOptions()) + suite.ac, err = api.NewClient( + creds, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) suite.tenantID = creds.AzureTenantID diff --git a/src/internal/m365/collection/site/backup.go b/src/internal/m365/collection/site/backup.go index a168e6dba..a8565c29f 100644 --- a/src/internal/m365/collection/site/backup.go +++ b/src/internal/m365/collection/site/backup.go @@ -13,6 +13,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -60,6 +61,7 @@ func CollectPages( ac api.Client, scope selectors.SharePointScope, su support.StatusUpdater, + counter *count.Bus, errs *fault.Bus, ) ([]data.BackupCollection, error) { logger.Ctx(ctx).Debug("creating SharePoint Pages collections") @@ -74,7 +76,8 @@ func CollectPages( adpt, err := graph.CreateAdapter( creds.AzureTenantID, creds.AzureClientID, - creds.AzureClientSecret) + creds.AzureClientSecret, + counter) if err != nil { return nil, clues.Wrap(err, "creating azure client adapter") } diff --git a/src/internal/m365/collection/site/backup_test.go b/src/internal/m365/collection/site/backup_test.go index 46dff1a97..dc05a6583 100644 --- a/src/internal/m365/collection/site/backup_test.go +++ b/src/internal/m365/collection/site/backup_test.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/internal/version" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/services/m365/api" @@ -46,14 +47,18 @@ func (suite *SharePointPagesSuite) TestCollectPages() { defer flush() var ( - siteID = tconfig.M365SiteID(t) - a = tconfig.NewM365Account(t) + siteID = tconfig.M365SiteID(t) + a = tconfig.NewM365Account(t) + counter = count.New() ) creds, err := a.M365Config() require.NoError(t, err, clues.ToCore(err)) - ac, err := api.NewClient(creds, control.DefaultOptions()) + ac, err := api.NewClient( + creds, + control.DefaultOptions(), + counter) require.NoError(t, err, clues.ToCore(err)) bpc := inject.BackupProducerConfig{ @@ -71,6 +76,7 @@ func (suite *SharePointPagesSuite) TestCollectPages() { ac, sel.Lists(selectors.Any())[0], (&MockGraphService{}).UpdateStatus, + counter, fault.New(true)) assert.NoError(t, err, clues.ToCore(err)) assert.NotEmpty(t, col) diff --git a/src/internal/m365/collection/site/collection_test.go b/src/internal/m365/collection/site/collection_test.go index 5e0420c63..9e1d9d5c8 100644 --- a/src/internal/m365/collection/site/collection_test.go +++ b/src/internal/m365/collection/site/collection_test.go @@ -22,6 +22,7 @@ import ( "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control/testdata" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" @@ -45,7 +46,10 @@ func (suite *SharePointCollectionSuite) SetupSuite() { suite.creds = m365 - ac, err := api.NewClient(m365, control.DefaultOptions()) + ac, err := api.NewClient( + m365, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) suite.ac = ac diff --git a/src/internal/m365/collection/site/helper_test.go b/src/internal/m365/collection/site/helper_test.go index ca953a9b4..2663696bd 100644 --- a/src/internal/m365/collection/site/helper_test.go +++ b/src/internal/m365/collection/site/helper_test.go @@ -10,6 +10,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/count" ) // --------------------------------------------------------------------------- @@ -50,7 +51,8 @@ func createTestService(t *testing.T, credentials account.M365Config) *graph.Serv adapter, err := graph.CreateAdapter( credentials.AzureTenantID, credentials.AzureClientID, - credentials.AzureClientSecret) + credentials.AzureClientSecret, + count.New()) require.NoError(t, err, "creating microsoft graph service for exchange", clues.ToCore(err)) return graph.NewService(adapter) diff --git a/src/internal/m365/controller.go b/src/internal/m365/controller.go index 6bcd5e27b..90dc6b841 100644 --- a/src/internal/m365/controller.go +++ b/src/internal/m365/controller.go @@ -16,6 +16,7 @@ import ( "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -68,6 +69,7 @@ func NewController( acct account.Account, pst path.ServiceType, co control.Options, + counter *count.Bus, ) (*Controller, error) { graph.InitializeConcurrencyLimiter(ctx, pst == path.ExchangeService, co.Parallelism.ItemFetch) @@ -76,7 +78,7 @@ func NewController( return nil, clues.Wrap(err, "retrieving m365 account configuration").WithClues(ctx) } - ac, err := api.NewClient(creds, co) + ac, err := api.NewClient(creds, co, counter) if err != nil { return nil, clues.Wrap(err, "creating api client").WithClues(ctx) } diff --git a/src/internal/m365/controller_test.go b/src/internal/m365/controller_test.go index 2ae60d2ac..d06b9ce3b 100644 --- a/src/internal/m365/controller_test.go +++ b/src/internal/m365/controller_test.go @@ -637,6 +637,7 @@ func runBackupAndCompare( dcs, excludes, canUsePreviousBackup, err := backupCtrl.ProduceBackupCollections( ctx, bpc, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.True(t, canUsePreviousBackup, "can use previous backup") @@ -1232,6 +1233,7 @@ func (suite *ControllerIntegrationSuite) TestMultiFolderBackupDifferentNames() { dcs, excludes, canUsePreviousBackup, err := backupCtrl.ProduceBackupCollections( ctx, bpc, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.True(t, canUsePreviousBackup, "can use previous backup") @@ -1411,6 +1413,7 @@ func (suite *ControllerIntegrationSuite) TestBackup_CreatesPrefixCollections() { dcs, excludes, canUsePreviousBackup, err := backupCtrl.ProduceBackupCollections( ctx, bpc, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.True(t, canUsePreviousBackup, "can use previous backup") diff --git a/src/internal/m365/graph/betasdk/beta_client_test.go b/src/internal/m365/graph/betasdk/beta_client_test.go index 5216e9d6f..6add009ab 100644 --- a/src/internal/m365/graph/betasdk/beta_client_test.go +++ b/src/internal/m365/graph/betasdk/beta_client_test.go @@ -12,6 +12,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/count" ) type BetaClientSuite struct { @@ -45,7 +46,8 @@ func (suite *BetaClientSuite) TestCreateBetaClient() { adpt, err := graph.CreateAdapter( suite.credentials.AzureTenantID, suite.credentials.AzureClientID, - suite.credentials.AzureClientSecret) + suite.credentials.AzureClientSecret, + count.New()) require.NoError(t, err, clues.ToCore(err)) @@ -66,7 +68,8 @@ func (suite *BetaClientSuite) TestBasicClientGetFunctionality() { adpt, err := graph.CreateAdapter( suite.credentials.AzureTenantID, suite.credentials.AzureClientID, - suite.credentials.AzureClientSecret) + suite.credentials.AzureClientSecret, + count.New()) require.NoError(t, err, clues.ToCore(err)) client := NewBetaClient(adpt) diff --git a/src/internal/m365/graph/concurrency_middleware.go b/src/internal/m365/graph/concurrency_middleware.go index c470fc1cf..60762ca32 100644 --- a/src/internal/m365/graph/concurrency_middleware.go +++ b/src/internal/m365/graph/concurrency_middleware.go @@ -291,7 +291,8 @@ func (tf *timedFence) RaiseFence(seconds time.Duration) { // throttlingMiddleware is used to ensure we don't overstep per-min request limits. type throttlingMiddleware struct { - tf *timedFence + tf *timedFence + counter *count.Bus } func (mw *throttlingMiddleware) Intercept( @@ -314,8 +315,7 @@ func (mw *throttlingMiddleware) Intercept( return resp, nil } - countBus := count.Ctx(req.Context()) - countBus.Inc(count.ThrottledAPICalls) + mw.counter.Inc(count.ThrottledAPICalls) // if all prior conditions pass, we need to add a fence that blocks // calls, globally, from progressing until the timeout retry-after diff --git a/src/internal/m365/graph/concurrency_middleware_test.go b/src/internal/m365/graph/concurrency_middleware_test.go index c6b937784..034f48e5f 100644 --- a/src/internal/m365/graph/concurrency_middleware_test.go +++ b/src/internal/m365/graph/concurrency_middleware_test.go @@ -16,6 +16,7 @@ import ( "golang.org/x/net/context" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/count" ) type ConcurrencyMWUnitTestSuite struct { @@ -307,7 +308,10 @@ func (suite *ConcurrencyMWUnitTestSuite) TestThrottlingMiddleware() { ctx, flush := tester.NewContext(t) defer flush() - tm := throttlingMiddleware{newTimedFence()} + tm := &throttlingMiddleware{ + tf: newTimedFence(), + counter: count.New(), + } req := &http.Request{} req = req.WithContext(ctx) diff --git a/src/internal/m365/graph/http_wrapper.go b/src/internal/m365/graph/http_wrapper.go index 7f5c840b2..0b5b17b5d 100644 --- a/src/internal/m365/graph/http_wrapper.go +++ b/src/internal/m365/graph/http_wrapper.go @@ -13,6 +13,7 @@ import ( "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/version" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/logger" ) @@ -36,12 +37,15 @@ type Requester interface { // and consume relatively unbound socket connections. It is important // to centralize this client to be passed downstream where api calls // can utilize it on a per-download basis. -func NewHTTPWrapper(opts ...Option) *httpWrapper { +func NewHTTPWrapper( + counter *count.Bus, + opts ...Option, +) *httpWrapper { var ( cc = populateConfig(opts...) rt = customTransport{ n: pipeline{ - middlewares: internalMiddleware(cc), + middlewares: internalMiddleware(cc, counter), transport: defaultTransport(), }, } @@ -66,9 +70,12 @@ func NewHTTPWrapper(opts ...Option) *httpWrapper { // and consume relatively unbound socket connections. It is important // to centralize this client to be passed downstream where api calls // can utilize it on a per-download basis. -func NewNoTimeoutHTTPWrapper(opts ...Option) *httpWrapper { +func NewNoTimeoutHTTPWrapper( + counter *count.Bus, + opts ...Option, +) *httpWrapper { opts = append(opts, NoTimeout()) - return NewHTTPWrapper(opts...) + return NewHTTPWrapper(counter, opts...) } // --------------------------------------------------------------------------- @@ -178,7 +185,15 @@ func defaultTransport() http.RoundTripper { return defaultTransport } -func internalMiddleware(cc *clientConfig) []khttp.Middleware { +func internalMiddleware( + cc *clientConfig, + counter *count.Bus, +) []khttp.Middleware { + throttler := &throttlingMiddleware{ + tf: newTimedFence(), + counter: counter, + } + mw := []khttp.Middleware{ &RetryMiddleware{ MaxRetries: cc.maxRetries, @@ -187,7 +202,7 @@ func internalMiddleware(cc *clientConfig) []khttp.Middleware { khttp.NewRetryHandler(), khttp.NewRedirectHandler(), &LoggingMiddleware{}, - &throttlingMiddleware{newTimedFence()}, + throttler, &RateLimiterMiddleware{}, &MetricsMiddleware{}, } diff --git a/src/internal/m365/graph/http_wrapper_test.go b/src/internal/m365/graph/http_wrapper_test.go index fb01ef5b4..604ca5e9d 100644 --- a/src/internal/m365/graph/http_wrapper_test.go +++ b/src/internal/m365/graph/http_wrapper_test.go @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" + "github.com/alcionai/corso/src/pkg/count" ) type HTTPWrapperIntgSuite struct { @@ -34,7 +35,7 @@ func (suite *HTTPWrapperIntgSuite) TestNewHTTPWrapper() { ctx, flush := tester.NewContext(t) defer flush() - hw := NewHTTPWrapper() + hw := NewHTTPWrapper(count.New()) resp, err := hw.Request( ctx, @@ -107,7 +108,7 @@ func (suite *HTTPWrapperUnitSuite) TestNewHTTPWrapper_redirectMiddleware() { }, } - hw := NewHTTPWrapper(appendMiddleware(&mwResp)) + hw := NewHTTPWrapper(count.New(), appendMiddleware(&mwResp)) resp, err := hw.Request(ctx, http.MethodGet, url, nil, nil) require.NoError(t, err, clues.ToCore(err)) @@ -175,6 +176,7 @@ func (suite *HTTPWrapperUnitSuite) TestNewHTTPWrapper_http2StreamErrorRetries() } hw := NewHTTPWrapper( + count.New(), appendMiddleware(&mwResp), MaxConnectionRetries(test.retries)) diff --git a/src/internal/m365/graph/middleware.go b/src/internal/m365/graph/middleware.go index d5935a5b0..4fd28c5fd 100644 --- a/src/internal/m365/graph/middleware.go +++ b/src/internal/m365/graph/middleware.go @@ -323,7 +323,9 @@ func (mw RetryMiddleware) getRetryDelay( // --------------------------------------------------------------------------- // MetricsMiddleware aggregates per-request metrics on the events bus -type MetricsMiddleware struct{} +type MetricsMiddleware struct { + counter *count.Bus +} const xmruHeader = "x-ms-resource-unit" @@ -362,8 +364,7 @@ func (mw *MetricsMiddleware) Intercept( xmrui = 1 } - countBus := count.Ctx(req.Context()) - countBus.Add(count.APICallTokensConsumed, int64(xmrui)) + mw.counter.Add(count.APICallTokensConsumed, int64(xmrui)) events.IncN(xmrui, events.APICall, xmruHeader) diff --git a/src/internal/m365/graph/middleware_test.go b/src/internal/m365/graph/middleware_test.go index 4dca30146..ddaf13cd7 100644 --- a/src/internal/m365/graph/middleware_test.go +++ b/src/internal/m365/graph/middleware_test.go @@ -25,6 +25,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/path" ) @@ -109,7 +110,7 @@ func mockAdapter( var ( clientOptions = msgraphsdkgo.GetDefaultClientOptions() cc = populateConfig(MinimumBackoff(10 * time.Millisecond)) - middlewares = append(kiotaMiddlewares(&clientOptions, cc), mw) + middlewares = append(kiotaMiddlewares(&clientOptions, cc, count.New()), mw) httpClient = msgraphgocore.GetDefaultClient(&clientOptions, middlewares...) ) diff --git a/src/internal/m365/graph/mock/service.go b/src/internal/m365/graph/mock/service.go index 64f6c3d6f..4f35a24ec 100644 --- a/src/internal/m365/graph/mock/service.go +++ b/src/internal/m365/graph/mock/service.go @@ -7,13 +7,19 @@ import ( "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/count" ) -func NewService(creds account.M365Config, opts ...graph.Option) (*graph.Service, error) { +func NewService( + creds account.M365Config, + counter *count.Bus, + opts ...graph.Option, +) (*graph.Service, error) { a, err := CreateAdapter( creds.AzureTenantID, creds.AzureClientID, creds.AzureClientSecret, + counter, opts...) if err != nil { return nil, clues.Wrap(err, "generating graph adapter") @@ -26,6 +32,7 @@ func NewService(creds account.M365Config, opts ...graph.Option) (*graph.Service, // enable interceptions via gock to make it mockable. func CreateAdapter( tenant, client, secret string, + counter *count.Bus, opts ...graph.Option, ) (*msgraphsdkgo.GraphRequestAdapter, error) { auth, err := graph.GetAuth(tenant, client, secret) @@ -33,7 +40,7 @@ func CreateAdapter( return nil, err } - httpClient, _ := graph.KiotaHTTPClient(opts...) + httpClient, _ := graph.KiotaHTTPClient(counter, opts...) // This makes sure that we are able to intercept any requests via // gock. Only necessary for testing. diff --git a/src/internal/m365/graph/service.go b/src/internal/m365/graph/service.go index 0b54b5589..d48b1577a 100644 --- a/src/internal/m365/graph/service.go +++ b/src/internal/m365/graph/service.go @@ -17,6 +17,7 @@ import ( "github.com/alcionai/corso/src/internal/common/crash" "github.com/alcionai/corso/src/internal/common/idname" "github.com/alcionai/corso/src/internal/events" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/filters" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -104,6 +105,7 @@ func (s Service) Serialize(object serialization.Parsable) ([]byte, error) { // to create a graph api client connection. func CreateAdapter( tenant, client, secret string, + counter *count.Bus, opts ...Option, ) (abstractions.RequestAdapter, error) { auth, err := GetAuth(tenant, client, secret) @@ -111,7 +113,7 @@ func CreateAdapter( return nil, err } - httpClient, cc := KiotaHTTPClient(opts...) + httpClient, cc := KiotaHTTPClient(counter, opts...) adpt, err := msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient( auth, @@ -148,11 +150,14 @@ func GetAuth(tenant string, client string, secret string) (*kauth.AzureIdentityA // and consume relatively unbound socket connections. It is important // to centralize this client to be passed downstream where api calls // can utilize it on a per-download basis. -func KiotaHTTPClient(opts ...Option) (*http.Client, *clientConfig) { +func KiotaHTTPClient( + counter *count.Bus, + opts ...Option, +) (*http.Client, *clientConfig) { var ( clientOptions = msgraphsdkgo.GetDefaultClientOptions() cc = populateConfig(opts...) - middlewares = kiotaMiddlewares(&clientOptions, cc) + middlewares = kiotaMiddlewares(&clientOptions, cc, counter) httpClient = msgraphgocore.GetDefaultClient(&clientOptions, middlewares...) ) @@ -271,6 +276,7 @@ func MaxConnectionRetries(max int) Option { func kiotaMiddlewares( options *msgraphgocore.GraphClientOptions, cc *clientConfig, + counter *count.Bus, ) []khttp.Middleware { mw := []khttp.Middleware{ msgraphgocore.NewGraphTelemetryHandler(options), @@ -291,9 +297,14 @@ func kiotaMiddlewares( mw = append(mw, concurrencyLimitMiddlewareSingleton) } + throttler := &throttlingMiddleware{ + tf: newTimedFence(), + counter: counter, + } + mw = append( mw, - &throttlingMiddleware{newTimedFence()}, + throttler, &RateLimiterMiddleware{}, &MetricsMiddleware{}) diff --git a/src/internal/m365/graph/service_test.go b/src/internal/m365/graph/service_test.go index fb8e31b7b..3aa800fb1 100644 --- a/src/internal/m365/graph/service_test.go +++ b/src/internal/m365/graph/service_test.go @@ -16,6 +16,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/count" ) type GraphIntgSuite struct { @@ -59,7 +60,8 @@ func (suite *GraphIntgSuite) TestCreateAdapter() { adpt, err := CreateAdapter( suite.fakeCredentials.AzureTenantID, suite.fakeCredentials.AzureClientID, - suite.fakeCredentials.AzureClientSecret) + suite.fakeCredentials.AzureClientSecret, + count.New()) assert.NoError(t, err, clues.ToCore(err)) assert.NotNil(t, adpt) @@ -143,7 +145,7 @@ func (suite *GraphIntgSuite) TestHTTPClient() { suite.Run(test.name, func() { t := suite.T() - cli, cc := KiotaHTTPClient(test.opts...) + cli, cc := KiotaHTTPClient(count.New(), test.opts...) assert.NotNil(t, cli) test.check(t, cli) test.checkConfig(t, cc) @@ -156,7 +158,8 @@ func (suite *GraphIntgSuite) TestSerializationEndPoint() { adpt, err := CreateAdapter( suite.fakeCredentials.AzureTenantID, suite.fakeCredentials.AzureClientID, - suite.fakeCredentials.AzureClientSecret) + suite.fakeCredentials.AzureClientSecret, + count.New()) require.NoError(t, err, clues.ToCore(err)) serv := NewService(adpt) @@ -189,6 +192,7 @@ func (suite *GraphIntgSuite) TestAdapterWrap_catchesPanic() { suite.credentials.AzureTenantID, suite.credentials.AzureClientID, suite.credentials.AzureClientSecret, + count.New(), appendMiddleware(&alwaysPanicMiddleware)) require.NoError(t, err, clues.ToCore(err)) @@ -210,13 +214,13 @@ func (suite *GraphIntgSuite) TestAdapterWrap_retriesConnectionClose() { defer flush() url := "https://graph.microsoft.com/fnords/beaux/regard" - count := 0 + retryInc := 0 // the panics should get caught and returned as errors alwaysECONNRESET := mwForceResp{ err: syscall.ECONNRESET, alternate: func(req *http.Request) (bool, *http.Response, error) { - count++ + retryInc++ return false, nil, nil }, } @@ -225,18 +229,19 @@ func (suite *GraphIntgSuite) TestAdapterWrap_retriesConnectionClose() { suite.credentials.AzureTenantID, suite.credentials.AzureClientID, suite.credentials.AzureClientSecret, + count.New(), appendMiddleware(&alwaysECONNRESET)) require.NoError(t, err, clues.ToCore(err)) // the query doesn't matter _, err = users.NewItemCalendarsItemEventsDeltaRequestBuilder(url, adpt).Get(ctx, nil) require.ErrorIs(t, err, syscall.ECONNRESET, clues.ToCore(err)) - require.Equal(t, 16, count, "number of retries") + require.Equal(t, 16, retryInc, "number of retries") - count = 0 + retryInc = 0 // the query doesn't matter _, err = NewService(adpt).Client().Users().Get(ctx, nil) require.ErrorIs(t, err, syscall.ECONNRESET, clues.ToCore(err)) - require.Equal(t, 16, count, "number of retries") + require.Equal(t, 16, retryInc, "number of retries") } diff --git a/src/internal/m365/graph/uploadsession.go b/src/internal/m365/graph/uploadsession.go index 412e04727..a511d81b2 100644 --- a/src/internal/m365/graph/uploadsession.go +++ b/src/internal/m365/graph/uploadsession.go @@ -9,6 +9,7 @@ import ( "github.com/alcionai/clues" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/logger" ) @@ -36,12 +37,16 @@ type largeItemWriter struct { client httpWrapper } -func NewLargeItemWriter(parentID, url string, size int64) *largeItemWriter { +func NewLargeItemWriter( + parentID, url string, + size int64, + counter *count.Bus, +) *largeItemWriter { return &largeItemWriter{ parentID: parentID, url: url, contentLength: size, - client: *NewNoTimeoutHTTPWrapper(), + client: *NewNoTimeoutHTTPWrapper(counter), } } diff --git a/src/internal/m365/graph/uploadsession_test.go b/src/internal/m365/graph/uploadsession_test.go index 74d2c71c9..8b15af334 100644 --- a/src/internal/m365/graph/uploadsession_test.go +++ b/src/internal/m365/graph/uploadsession_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/count" ) type UploadSessionSuite struct { @@ -69,7 +70,7 @@ func (suite *UploadSessionSuite) TestWriter() { defer ts.Close() - writer := NewLargeItemWriter("item", ts.URL, writeSize) + writer := NewLargeItemWriter("item", ts.URL, writeSize, count.New()) // Using a 32 KB buffer for the copy allows us to validate the // multi-part upload. `io.CopyBuffer` will only write 32 KB at diff --git a/src/internal/m365/helper_test.go b/src/internal/m365/helper_test.go index 12fa78dbd..17ea870f0 100644 --- a/src/internal/m365/helper_test.go +++ b/src/internal/m365/helper_test.go @@ -23,6 +23,7 @@ import ( m365Stub "github.com/alcionai/corso/src/internal/m365/stub" "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" @@ -1217,7 +1218,12 @@ func newController( ) *Controller { a := tconfig.NewM365Account(t) - controller, err := NewController(ctx, a, pst, control.Options{}) + controller, err := NewController( + ctx, + a, + pst, + control.Options{}, + count.New()) require.NoError(t, err, clues.ToCore(err)) return controller diff --git a/src/internal/m365/mock/connector.go b/src/internal/m365/mock/connector.go index 7f59f78e7..a6db1d5e0 100644 --- a/src/internal/m365/mock/connector.go +++ b/src/internal/m365/mock/connector.go @@ -40,6 +40,7 @@ type Controller struct { func (ctrl Controller) ProduceBackupCollections( _ context.Context, _ inject.BackupProducerConfig, + _ *count.Bus, _ *fault.Bus, ) ( []data.BackupCollection, diff --git a/src/internal/m365/service/groups/restore_test.go b/src/internal/m365/service/groups/restore_test.go index 262bc3159..c047b8a38 100644 --- a/src/internal/m365/service/groups/restore_test.go +++ b/src/internal/m365/service/groups/restore_test.go @@ -18,6 +18,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" @@ -94,7 +95,10 @@ func (suite *groupsIntegrationSuite) SetupSuite() { creds, err := acct.M365Config() require.NoError(t, err, clues.ToCore(err)) - suite.ac, err = api.NewClient(creds, control.DefaultOptions()) + suite.ac, err = api.NewClient( + creds, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) suite.tenantID = creds.AzureTenantID diff --git a/src/internal/m365/service/sharepoint/api/beta_service_test.go b/src/internal/m365/service/sharepoint/api/beta_service_test.go index 69fc8ace5..e9222badb 100644 --- a/src/internal/m365/service/sharepoint/api/beta_service_test.go +++ b/src/internal/m365/service/sharepoint/api/beta_service_test.go @@ -12,6 +12,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/graph/betasdk/models" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester/tconfig" + "github.com/alcionai/corso/src/pkg/count" ) type BetaUnitSuite struct { @@ -31,7 +32,8 @@ func (suite *BetaUnitSuite) TestBetaService_Adapter() { adpt, err := graph.CreateAdapter( m365.AzureTenantID, m365.AzureClientID, - m365.AzureClientSecret) + m365.AzureClientSecret, + count.New()) require.NoError(t, err, clues.ToCore(err)) service := NewBetaService(adpt) diff --git a/src/internal/m365/service/sharepoint/api/pages_test.go b/src/internal/m365/service/sharepoint/api/pages_test.go index 0de4e3a4e..15f352f64 100644 --- a/src/internal/m365/service/sharepoint/api/pages_test.go +++ b/src/internal/m365/service/sharepoint/api/pages_test.go @@ -19,6 +19,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control/testdata" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" ) @@ -26,7 +27,8 @@ func createTestBetaService(t *testing.T, credentials account.M365Config) *api.Be adapter, err := graph.CreateAdapter( credentials.AzureTenantID, credentials.AzureClientID, - credentials.AzureClientSecret) + credentials.AzureClientSecret, + count.New()) require.NoError(t, err, clues.ToCore(err)) return api.NewBetaService(adapter) diff --git a/src/internal/m365/service/sharepoint/backup.go b/src/internal/m365/service/sharepoint/backup.go index 490baffb2..a9f2d7fdf 100644 --- a/src/internal/m365/service/sharepoint/backup.go +++ b/src/internal/m365/service/sharepoint/backup.go @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" @@ -25,6 +26,7 @@ func ProduceBackupCollections( ac api.Client, creds account.M365Config, su support.StatusUpdater, + counter *count.Bus, errs *fault.Bus, ) ([]data.BackupCollection, *prefixmatcher.StringSetMatcher, bool, error) { b, err := bpc.Selector.ToSharePointBackup() @@ -102,6 +104,7 @@ func ProduceBackupCollections( ac, scope, su, + counter, errs) if err != nil { el.AddRecoverable(ctx, err) diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 32bdbb236..80cd4055a 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -204,9 +204,6 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx}) defer flushMetrics() - // for cases where we can't pass the counter down as part of a func call. - ctx = count.Embed(ctx, op.Counter) - // Check if the protected resource has the service enabled in order for us // to run a backup. enabled, err := op.bp.IsServiceEnabled( @@ -421,6 +418,7 @@ func (op *BackupOperation) do( mdColls, lastBackupVersion, op.Options, + op.Counter, op.Errors) if err != nil { return nil, clues.Wrap(err, "producing backup data collections") @@ -441,8 +439,8 @@ func (op *BackupOperation) do( ssmb, backupID, op.incremental && canUseMetadata && canUsePreviousBackup, - op.Errors, - op.Counter) + op.Counter, + op.Errors) if err != nil { return nil, clues.Wrap(err, "persisting collection backups") } @@ -500,6 +498,7 @@ func produceBackupDataCollections( metadata []data.RestoreCollection, lastBackupVersion int, ctrlOpts control.Options, + counter *count.Bus, errs *fault.Bus, ) ([]data.BackupCollection, prefixmatcher.StringSetReader, bool, error) { progressBar := observe.MessageWithCompletion(ctx, "Discovering items to backup") @@ -513,7 +512,7 @@ func produceBackupDataCollections( Selector: sel, } - return bp.ProduceBackupCollections(ctx, bpc, errs) + return bp.ProduceBackupCollections(ctx, bpc, counter, errs) } // --------------------------------------------------------------------------- @@ -531,8 +530,8 @@ func consumeBackupCollections( pmr prefixmatcher.StringSetReader, backupID model.StableID, isIncremental bool, - errs *fault.Bus, counter *count.Bus, + errs *fault.Bus, ) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error) { ctx = clues.Add( ctx, @@ -555,8 +554,8 @@ func consumeBackupCollections( pmr, tags, isIncremental, - errs, - counter) + counter, + errs) if err != nil { if kopiaStats == nil { return nil, nil, nil, clues.Stack(err) diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index 687c7dd4e..26b8612c5 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -136,8 +136,8 @@ func (mbu mockBackupConsumer) ConsumeBackupCollections( excluded prefixmatcher.StringSetReader, tags map[string]string, buildTreeWithBase bool, - errs *fault.Bus, counter *count.Bus, + errs *fault.Bus, ) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error) { if mbu.checkFunc != nil { mbu.checkFunc(backupReasons, bases, cs, tags, buildTreeWithBase) @@ -529,8 +529,8 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_ConsumeBackupDataCollections nil, backupID, true, - fault.New(true), - count.New()) + count.New(), + fault.New(true)) } func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsItems() { @@ -1439,7 +1439,10 @@ func (suite *BackupOpIntegrationSuite) SetupSuite() { creds, err := a.M365Config() require.NoError(t, err, clues.ToCore(err)) - suite.ac, err = api.NewClient(creds, control.DefaultOptions()) + suite.ac, err = api.NewClient( + creds, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) } diff --git a/src/internal/operations/help_test.go b/src/internal/operations/help_test.go index 11706bf01..46e25a327 100644 --- a/src/internal/operations/help_test.go +++ b/src/internal/operations/help_test.go @@ -11,6 +11,7 @@ import ( "github.com/alcionai/corso/src/internal/m365" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/selectors" ) @@ -25,7 +26,12 @@ func ControllerWithSelector( ins idname.Cacher, onFail func(), ) (*m365.Controller, selectors.Selector) { - ctrl, err := m365.NewController(ctx, acct, sel.PathService(), control.DefaultOptions()) + ctrl, err := m365.NewController( + ctx, + acct, + sel.PathService(), + control.DefaultOptions(), + count.New()) if !assert.NoError(t, err, clues.ToCore(err)) { if onFail != nil { onFail() diff --git a/src/internal/operations/inject/inject.go b/src/internal/operations/inject/inject.go index 28bdad807..fb9e13528 100644 --- a/src/internal/operations/inject/inject.go +++ b/src/internal/operations/inject/inject.go @@ -23,6 +23,7 @@ type ( ProduceBackupCollections( ctx context.Context, bpc BackupProducerConfig, + counter *count.Bus, errs *fault.Bus, ) ([]data.BackupCollection, prefixmatcher.StringSetReader, bool, error) diff --git a/src/internal/operations/inject/mock/inject.go b/src/internal/operations/inject/mock/inject.go index eb87e6012..a6f931a88 100644 --- a/src/internal/operations/inject/mock/inject.go +++ b/src/internal/operations/inject/mock/inject.go @@ -11,6 +11,7 @@ import ( kinject "github.com/alcionai/corso/src/internal/kopia/inject" "github.com/alcionai/corso/src/internal/m365" "github.com/alcionai/corso/src/internal/operations/inject" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -38,6 +39,7 @@ func NewMockBackupProducer( func (mbp *mockBackupProducer) ProduceBackupCollections( context.Context, inject.BackupProducerConfig, + *count.Bus, *fault.Bus, ) ([]data.BackupCollection, prefixmatcher.StringSetReader, bool, error) { if mbp.injectNonRecoverableErr { diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 08df5e133..669db972c 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -136,8 +136,6 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx}) defer flushMetrics() - ctx = count.Embed(ctx, op.Counter) - cats, err := op.Selectors.AllHumanPathCategories() if err != nil { // No need to exit over this, we'll just be missing a bit of info in the diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index a22ab3998..129a6e88f 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -344,7 +344,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_errorNoBackup() { ctx, suite.acct, rsel.PathService(), - control.DefaultOptions()) + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) ro, err := NewRestoreOperation( diff --git a/src/internal/operations/test/exchange_test.go b/src/internal/operations/test/exchange_test.go index 7439c04ae..f9ca1d767 100644 --- a/src/internal/operations/test/exchange_test.go +++ b/src/internal/operations/test/exchange_test.go @@ -393,7 +393,10 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr creds, err := acct.M365Config() require.NoError(t, err, clues.ToCore(err)) - ac, err := api.NewClient(creds, control.DefaultOptions()) + ac, err := api.NewClient( + creds, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) // generate 3 new folders with two items each. diff --git a/src/internal/operations/test/helper_test.go b/src/internal/operations/test/helper_test.go index a88319523..afad482ed 100644 --- a/src/internal/operations/test/helper_test.go +++ b/src/internal/operations/test/helper_test.go @@ -52,13 +52,13 @@ import ( // GockClient produces a new exchange api client that can be // mocked using gock. -func gockClient(creds account.M365Config) (api.Client, error) { - s, err := gmock.NewService(creds) +func gockClient(creds account.M365Config, counter *count.Bus) (api.Client, error) { + s, err := gmock.NewService(creds, counter) if err != nil { return api.Client{}, err } - li, err := gmock.NewService(creds, graph.NoTimeout()) + li, err := gmock.NewService(creds, counter, graph.NoTimeout()) if err != nil { return api.Client{}, err } @@ -562,7 +562,12 @@ func ControllerWithSelector( ins idname.Cacher, onFail func(*testing.T, context.Context), ) (*m365.Controller, selectors.Selector) { - ctrl, err := m365.NewController(ctx, acct, sel.PathService(), control.DefaultOptions()) + ctrl, err := m365.NewController( + ctx, + acct, + sel.PathService(), + control.DefaultOptions(), + count.New()) if !assert.NoError(t, err, clues.ToCore(err)) { if onFail != nil { onFail(t, ctx) @@ -623,10 +628,15 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup { creds, err := a.M365Config() require.NoError(t, err, clues.ToCore(err)) - its.ac, err = api.NewClient(creds, control.DefaultOptions()) + counter := count.New() + + its.ac, err = api.NewClient( + creds, + control.DefaultOptions(), + counter) require.NoError(t, err, clues.ToCore(err)) - its.gockAC, err = gockClient(creds) + its.gockAC, err = gockClient(creds, counter) require.NoError(t, err, clues.ToCore(err)) its.user = userIDs(t, tconfig.M365UserID(t), its.ac) diff --git a/src/internal/operations/test/onedrive_test.go b/src/internal/operations/test/onedrive_test.go index 508d3313d..f3419661d 100644 --- a/src/internal/operations/test/onedrive_test.go +++ b/src/internal/operations/test/onedrive_test.go @@ -814,7 +814,12 @@ func runDriveIncrementalTest( } for _, test := range table { suite.Run(test.name, func() { - cleanCtrl, err := m365.NewController(ctx, acct, sel.PathService(), control.DefaultOptions()) + cleanCtrl, err := m365.NewController( + ctx, + acct, + sel.PathService(), + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) bod.ctrl = cleanCtrl @@ -1117,7 +1122,8 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveOwnerMigration() { ctx, acct, path.OneDriveService, - control.DefaultOptions()) + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) userable, err := ctrl.AC.Users().GetByID(ctx, suite.its.user.ID) diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index 44a5eb986..ee2221cfd 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -216,8 +216,8 @@ func write( prefixmatcher.NopReader[map[string]struct{}](), nil, false, - errs, - count.New()) + count.New(), + errs) if err != nil { return "", clues.Wrap(err, "storing marshalled bytes in repository") } diff --git a/src/pkg/count/count.go b/src/pkg/count/count.go index 194529ac4..9bd89fcff 100644 --- a/src/pkg/count/count.go +++ b/src/pkg/count/count.go @@ -1,8 +1,6 @@ package count import ( - "context" - "github.com/puzpuzpuz/xsync/v2" ) @@ -116,33 +114,3 @@ func (b *Bus) TotalValues() map[string]int64 { return b.Values() } - -// --------------------------------------------------------------------------- -// context embedding -// --------------------------------------------------------------------------- - -type countKey string - -const ctxKey countKey = "corsoCounter" - -// Ctx retrieves the count.Bus embedded in the context. -func Ctx(ctx context.Context) *Bus { - l := ctx.Value(ctxKey) - if l == nil { - return New() - } - - return l.(*Bus) -} - -// Embed allows users to embed their own count.Bus within the context. -func Embed( - ctx context.Context, - bus *Bus, -) context.Context { - if bus == nil { - bus = New() - } - - return context.WithValue(ctx, ctxKey, bus) -} diff --git a/src/pkg/repository/data_providers.go b/src/pkg/repository/data_providers.go index f95f85b56..8f219b4bb 100644 --- a/src/pkg/repository/data_providers.go +++ b/src/pkg/repository/data_providers.go @@ -79,7 +79,12 @@ func connectToM365( progressBar := observe.MessageWithCompletion(ctx, "Connecting to M365") defer close(progressBar) - ctrl, err := m365.NewController(ctx, r.Account, pst, r.Opts) + ctrl, err := m365.NewController( + ctx, + r.Account, + pst, + r.Opts, + r.counter.Local()) if err != nil { return nil, clues.Wrap(err, "creating m365 client controller") } diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 283af8e56..522cd2ca6 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -18,6 +18,7 @@ import ( "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" ctrlRepo "github.com/alcionai/corso/src/pkg/control/repository" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/storage" @@ -57,6 +58,8 @@ type Repositoryer interface { ctx context.Context, rcOpts ctrlRepo.Retention, ) (operations.RetentionConfigOperation, error) + + Counter() *count.Bus } // Repository contains storage provider information. @@ -70,6 +73,7 @@ type repository struct { Opts control.Options Provider DataProvider // the client controller used for external user data CRUD + counter *count.Bus Bus events.Eventer dataLayer *kopia.Wrapper modelStore *kopia.ModelStore @@ -110,6 +114,7 @@ func New( Version: "v1", Account: acct, Storage: st, + counter: count.New(), Bus: bus, Opts: opts, } @@ -336,6 +341,10 @@ func (r repository) NewRetentionConfig( r.Bus) } +func (r repository) Counter() *count.Bus { + return r.counter +} + // --------------------------------------------------------------------------- // Repository ID Model // --------------------------------------------------------------------------- diff --git a/src/pkg/services/m365/api/access_test.go b/src/pkg/services/m365/api/access_test.go index 0b7f8f366..970d110a5 100644 --- a/src/pkg/services/m365/api/access_test.go +++ b/src/pkg/services/m365/api/access_test.go @@ -11,6 +11,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" ) type AccessAPIIntgSuite struct { @@ -109,7 +110,10 @@ func (suite *AccessAPIIntgSuite) TestGetToken() { ctx, flush := tester.NewContext(t) defer flush() - ac, err := NewClient(suite.its.ac.Credentials, control.DefaultOptions()) + ac, err := NewClient( + suite.its.ac.Credentials, + control.DefaultOptions(), + count.New()) require.NoError(t, err, clues.ToCore(err)) ac.Credentials = test.creds() diff --git a/src/pkg/services/m365/api/client.go b/src/pkg/services/m365/api/client.go index 23bf34dfd..f35232098 100644 --- a/src/pkg/services/m365/api/client.go +++ b/src/pkg/services/m365/api/client.go @@ -10,6 +10,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/path" ) @@ -39,29 +40,44 @@ type Client struct { // graph api client. Requester graph.Requester + counter *count.Bus + options control.Options } // NewClient produces a new exchange api client. Must be used in // place of creating an ad-hoc client struct. -func NewClient(creds account.M365Config, co control.Options) (Client, error) { - s, err := NewService(creds) +func NewClient( + creds account.M365Config, + co control.Options, + counter *count.Bus, +) (Client, error) { + s, err := NewService(creds, counter) if err != nil { return Client{}, err } - li, err := newLargeItemService(creds) + li, err := newLargeItemService(creds, counter) if err != nil { return Client{}, err } - rqr := graph.NewNoTimeoutHTTPWrapper() + rqr := graph.NewNoTimeoutHTTPWrapper(counter) if co.DeltaPageSize < 1 || co.DeltaPageSize > maxDeltaPageSize { co.DeltaPageSize = maxDeltaPageSize } - return Client{creds, s, li, rqr, co}, nil + cli := Client{ + Credentials: creds, + Stable: s, + LargeItem: li, + Requester: rqr, + counter: counter, + options: co, + } + + return cli, nil } // initConcurrencyLimit ensures that the graph concurrency limiter is @@ -77,15 +93,20 @@ func InitConcurrencyLimit(ctx context.Context, pst path.ServiceType) { // so that in-flight state within the adapter doesn't get clobbered. // Most calls should use the Client.Stable property instead of calling this // func, unless it is explicitly necessary. -func (c Client) Service() (graph.Servicer, error) { - return NewService(c.Credentials) +func (c Client) Service(counter *count.Bus) (graph.Servicer, error) { + return NewService(c.Credentials, counter) } -func NewService(creds account.M365Config, opts ...graph.Option) (*graph.Service, error) { +func NewService( + creds account.M365Config, + counter *count.Bus, + opts ...graph.Option, +) (*graph.Service, error) { a, err := graph.CreateAdapter( creds.AzureTenantID, creds.AzureClientID, creds.AzureClientSecret, + counter, opts...) if err != nil { return nil, clues.Wrap(err, "generating graph api adapter") @@ -94,8 +115,11 @@ func NewService(creds account.M365Config, opts ...graph.Option) (*graph.Service, return graph.NewService(a), nil } -func newLargeItemService(creds account.M365Config) (*graph.Service, error) { - a, err := NewService(creds, graph.NoTimeout()) +func newLargeItemService( + creds account.M365Config, + counter *count.Bus, +) (*graph.Service, error) { + a, err := NewService(creds, counter, graph.NoTimeout()) if err != nil { return nil, clues.Wrap(err, "generating no-timeout graph adapter") } diff --git a/src/pkg/services/m365/api/contacts.go b/src/pkg/services/m365/api/contacts.go index 36a8ab16f..89daaef92 100644 --- a/src/pkg/services/m365/api/contacts.go +++ b/src/pkg/services/m365/api/contacts.go @@ -63,7 +63,7 @@ func (c Contacts) DeleteContainer( ) error { // deletes require unique http clients // https://github.com/alcionai/corso/issues/2707 - srv, err := NewService(c.Credentials) + srv, err := NewService(c.Credentials, c.counter) if err != nil { return graph.Stack(ctx, err) } @@ -230,7 +230,7 @@ func (c Contacts) DeleteItem( ) error { // deletes require unique http clients // https://github.com/alcionai/corso/issues/2707 - srv, err := c.Service() + srv, err := c.Service(c.counter) if err != nil { return graph.Stack(ctx, err) } diff --git a/src/pkg/services/m365/api/drive.go b/src/pkg/services/m365/api/drive.go index 374fa545c..802c42bfc 100644 --- a/src/pkg/services/m365/api/drive.go +++ b/src/pkg/services/m365/api/drive.go @@ -239,7 +239,7 @@ func (c Drives) DeleteItem( ) error { // deletes require unique http clients // https://github.com/alcionai/corso/issues/2707 - srv, err := c.Service() + srv, err := c.Service(c.counter) if err != nil { return graph.Wrap(ctx, err, "creating adapter to delete item permission") } @@ -309,7 +309,7 @@ func (c Drives) DeleteItemPermission( ) error { // deletes require unique http clients // https://github.com/alcionai/corso/issues/2707 - srv, err := c.Service() + srv, err := c.Service(c.counter) if err != nil { return graph.Wrap(ctx, err, "creating adapter to delete item permission") } diff --git a/src/pkg/services/m365/api/events.go b/src/pkg/services/m365/api/events.go index 2590baa85..2403c04a8 100644 --- a/src/pkg/services/m365/api/events.go +++ b/src/pkg/services/m365/api/events.go @@ -73,7 +73,7 @@ func (c Events) DeleteContainer( ) error { // deletes require unique http clients // https://github.com/alcionai/corso/issues/2707 - srv, err := NewService(c.Credentials) + srv, err := NewService(c.Credentials, c.counter) if err != nil { return graph.Stack(ctx, err) } @@ -475,7 +475,7 @@ func (c Events) DeleteItem( ) error { // deletes require unique http clients // https://github.com/alcionai/corso/issues/2707 - srv, err := c.Service() + srv, err := c.Service(c.counter) if err != nil { return graph.Stack(ctx, err) } @@ -541,7 +541,7 @@ func (c Events) PostLargeAttachment( } url := ptr.Val(us.GetUploadUrl()) - w := graph.NewLargeItemWriter(parentItemID, url, size) + w := graph.NewLargeItemWriter(parentItemID, url, size, c.counter) copyBuffer := make([]byte, graph.AttachmentChunkSize) _, err = io.CopyBuffer(w, bytes.NewReader(content), copyBuffer) diff --git a/src/pkg/services/m365/api/groups.go b/src/pkg/services/m365/api/groups.go index 384a2121b..adcb1b5b3 100644 --- a/src/pkg/services/m365/api/groups.go +++ b/src/pkg/services/m365/api/groups.go @@ -42,7 +42,7 @@ func (c Groups) GetAll( ctx context.Context, errs *fault.Bus, ) ([]models.Groupable, error) { - service, err := c.Service() + service, err := c.Service(c.counter) if err != nil { return nil, err } @@ -110,7 +110,7 @@ func (c Groups) GetByID( identifier string, _ CallConfig, // matching standards ) (models.Groupable, error) { - service, err := c.Service() + service, err := c.Service(c.counter) if err != nil { return nil, err } @@ -229,7 +229,7 @@ func (c Groups) GetAllSites( return nil, clues.Wrap(err, "getting channels") } - service, err := c.Service() + service, err := c.Service(c.counter) if err != nil { return nil, graph.Stack(ctx, err) } @@ -292,7 +292,7 @@ func (c Groups) GetRootSite( ctx context.Context, identifier string, ) (models.Siteable, error) { - service, err := c.Service() + service, err := c.Service(c.counter) if err != nil { return nil, err } diff --git a/src/pkg/services/m365/api/helper_test.go b/src/pkg/services/m365/api/helper_test.go index 439b2ca54..920e07cef 100644 --- a/src/pkg/services/m365/api/helper_test.go +++ b/src/pkg/services/m365/api/helper_test.go @@ -19,6 +19,7 @@ import ( "github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" ) // --------------------------------------------------------------------------- @@ -27,13 +28,13 @@ import ( // GockClient produces a new exchange api client that can be // mocked using gock. -func gockClient(creds account.M365Config) (Client, error) { - s, err := gmock.NewService(creds) +func gockClient(creds account.M365Config, counter *count.Bus) (Client, error) { + s, err := gmock.NewService(creds, counter) if err != nil { return Client{}, err } - li, err := gmock.NewService(creds, graph.NoTimeout()) + li, err := gmock.NewService(creds, counter, graph.NoTimeout()) if err != nil { return Client{}, err } @@ -128,10 +129,10 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup { creds, err := a.M365Config() require.NoError(t, err, clues.ToCore(err)) - its.ac, err = NewClient(creds, control.DefaultOptions()) + its.ac, err = NewClient(creds, control.DefaultOptions(), count.New()) require.NoError(t, err, clues.ToCore(err)) - its.gockAC, err = gockClient(creds) + its.gockAC, err = gockClient(creds, count.New()) require.NoError(t, err, clues.ToCore(err)) // user drive diff --git a/src/pkg/services/m365/api/mail.go b/src/pkg/services/m365/api/mail.go index 15083ec20..beb012e6d 100644 --- a/src/pkg/services/m365/api/mail.go +++ b/src/pkg/services/m365/api/mail.go @@ -73,7 +73,7 @@ func (c Mail) DeleteContainer( ) error { // deletes require unique http clients // https://github.com/alcionai/corso/issues/2707 - srv, err := NewService(c.Credentials) + srv, err := NewService(c.Credentials, c.counter) if err != nil { return graph.Stack(ctx, err) } @@ -442,7 +442,7 @@ func (c Mail) DeleteItem( ) error { // deletes require unique http clients // https://github.com/alcionai/corso/issues/2707 - srv, err := NewService(c.Credentials) + srv, err := NewService(c.Credentials, c.counter) if err != nil { return graph.Stack(ctx, err) } @@ -508,7 +508,7 @@ func (c Mail) PostLargeAttachment( } url := ptr.Val(us.GetUploadUrl()) - w := graph.NewLargeItemWriter(parentItemID, url, size) + w := graph.NewLargeItemWriter(parentItemID, url, size, c.counter) copyBuffer := make([]byte, graph.AttachmentChunkSize) _, err = io.CopyBuffer(w, bytes.NewReader(content), copyBuffer) diff --git a/src/pkg/services/m365/api/users.go b/src/pkg/services/m365/api/users.go index 3c03c835a..36844437a 100644 --- a/src/pkg/services/m365/api/users.go +++ b/src/pkg/services/m365/api/users.go @@ -58,7 +58,7 @@ func (c Users) GetAll( ctx context.Context, errs *fault.Bus, ) ([]models.Userable, error) { - service, err := c.Service() + service, err := c.Service(c.counter) if err != nil { return nil, err } diff --git a/src/pkg/services/m365/groups_test.go b/src/pkg/services/m365/groups_test.go index 107fc6dc8..d0a1cfd3d 100644 --- a/src/pkg/services/m365/groups_test.go +++ b/src/pkg/services/m365/groups_test.go @@ -109,7 +109,10 @@ func (suite *GroupsIntgSuite) TestGroups() { graph.InitializeConcurrencyLimiter(ctx, true, 4) - groups, err := m365.Groups(ctx, suite.acct, fault.New(true)) + groups, err := m365.Groups( + ctx, + suite.acct, + fault.New(true)) assert.NoError(t, err, clues.ToCore(err)) assert.NotEmpty(t, groups) @@ -155,7 +158,10 @@ func (suite *GroupsIntgSuite) TestGroupsMap() { graph.InitializeConcurrencyLimiter(ctx, true, 4) - gm, err := m365.GroupsMap(ctx, suite.acct, fault.New(true)) + gm, err := m365.GroupsMap( + ctx, + suite.acct, + fault.New(true)) assert.NoError(t, err, clues.ToCore(err)) assert.NotEmpty(t, gm) @@ -203,7 +209,10 @@ func (suite *GroupsIntgSuite) TestGroups_InvalidCredentials() { ctx, flush := tester.NewContext(t) defer flush() - groups, err := m365.Groups(ctx, test.acct(t), fault.New(true)) + groups, err := m365.Groups( + ctx, + test.acct(t), + fault.New(true)) assert.Empty(t, groups, "returned no groups") assert.NotNil(t, err) }) diff --git a/src/pkg/services/m365/m365.go b/src/pkg/services/m365/m365.go index 5e20ccda3..436f2497b 100644 --- a/src/pkg/services/m365/m365.go +++ b/src/pkg/services/m365/m365.go @@ -7,6 +7,7 @@ import ( "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" @@ -36,7 +37,10 @@ func makeAC( return api.Client{}, clues.Wrap(err, "getting m365 account creds").WithClues(ctx) } - cli, err := api.NewClient(creds, control.DefaultOptions()) + cli, err := api.NewClient( + creds, + control.DefaultOptions(), + count.New()) if err != nil { return api.Client{}, clues.Wrap(err, "constructing api client").WithClues(ctx) }