remove ctx embedding for counter (#4497)

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🧹 Tech Debt/Cleanup

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-10-31 10:15:05 -06:00 committed by GitHub
parent 54364ed6f0
commit 5e0014307c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
73 changed files with 425 additions and 187 deletions

View File

@ -163,7 +163,12 @@ func createExchangeCmd(cmd *cobra.Command, args []string) error {
sel := exchangeBackupCreateSelectors(flags.UserFV, flags.CategoryDataFV)
ins, err := utils.UsersMap(ctx, *acct, utils.Control(), fault.New(true))
ins, err := utils.UsersMap(
ctx,
*acct,
utils.Control(),
r.Counter(),
fault.New(true))
if err != nil {
return Only(ctx, clues.Wrap(err, "Failed to retrieve M365 users"))
}

View File

@ -22,6 +22,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/repository"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -35,13 +36,13 @@ import (
// GockClient produces a new exchange api client that can be
// mocked using gock.
func gockClient(creds account.M365Config) (api.Client, error) {
s, err := gmock.NewService(creds)
func gockClient(creds account.M365Config, counter *count.Bus) (api.Client, error) {
s, err := gmock.NewService(creds, counter)
if err != nil {
return api.Client{}, err
}
li, err := gmock.NewService(creds, graph.NoTimeout())
li, err := gmock.NewService(creds, counter, graph.NoTimeout())
if err != nil {
return api.Client{}, err
}
@ -85,10 +86,13 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup {
creds, err := its.acct.M365Config()
require.NoError(t, err, clues.ToCore(err))
its.ac, err = api.NewClient(creds, control.DefaultOptions())
its.ac, err = api.NewClient(
creds,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
its.gockAC, err = gockClient(creds)
its.gockAC, err = gockClient(creds, count.New())
require.NoError(t, err, clues.ToCore(err))
// user drive

View File

@ -143,7 +143,12 @@ func createOneDriveCmd(cmd *cobra.Command, args []string) error {
sel := oneDriveBackupCreateSelectors(flags.UserFV)
ins, err := utils.UsersMap(ctx, *acct, utils.Control(), fault.New(true))
ins, err := utils.UsersMap(
ctx,
*acct,
utils.Control(),
r.Counter(),
fault.New(true))
if err != nil {
return Only(ctx, clues.Wrap(err, "Failed to retrieve M365 users"))
}

View File

@ -8,6 +8,7 @@ import (
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -17,9 +18,10 @@ func UsersMap(
ctx context.Context,
acct account.Account,
co control.Options,
counter *count.Bus,
errs *fault.Bus,
) (idname.Cacher, error) {
au, err := makeUserAPI(acct, co)
au, err := makeUserAPI(acct, co, counter)
if err != nil {
return nil, clues.Wrap(err, "constructing a graph client")
}
@ -27,13 +29,17 @@ func UsersMap(
return au.GetAllIDsAndNames(ctx, errs)
}
func makeUserAPI(acct account.Account, co control.Options) (api.Users, error) {
func makeUserAPI(
acct account.Account,
co control.Options,
counter *count.Bus,
) (api.Users, error) {
creds, err := acct.M365Config()
if err != nil {
return api.Users{}, clues.Wrap(err, "getting m365 account creds")
}
cli, err := api.NewClient(creds, co)
cli, err := api.NewClient(creds, co, counter)
if err != nil {
return api.Users{}, clues.Wrap(err, "constructing api client")
}

View File

@ -145,7 +145,12 @@ func getControllerAndVerifyResourceOwner(
return nil, account.Account{}, nil, clues.Wrap(err, "finding m365 account details")
}
ctrl, err := m365.NewController(ctx, acct, pst, control.Options{})
ctrl, err := m365.NewController(
ctx,
acct,
pst,
control.Options{},
count.New())
if err != nil {
return nil, account.Account{}, nil, clues.Wrap(err, "connecting to graph api")
}

View File

@ -8,6 +8,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/credentials"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -62,5 +63,5 @@ func GetAC() (api.Client, error) {
AzureTenantID: os.Getenv(account.AzureTenantID),
}
return api.NewClient(creds, control.DefaultOptions())
return api.NewClient(creds, control.DefaultOptions(), count.New())
}

View File

@ -23,8 +23,8 @@ type (
pmr prefixmatcher.StringSetReader,
tags map[string]string,
buildTreeWithBase bool,
errs *fault.Bus,
counter *count.Bus,
errs *fault.Bus,
) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error)
}

View File

@ -152,8 +152,8 @@ func (w Wrapper) ConsumeBackupCollections(
globalExcludeSet prefixmatcher.StringSetReader,
additionalTags map[string]string,
buildTreeWithBase bool,
errs *fault.Bus,
counter *count.Bus,
errs *fault.Bus,
) (*BackupStats, *details.Builder, DetailsMergeInfoer, error) {
if w.c == nil {
return nil, nil, nil, clues.Stack(errNotConnected).WithClues(ctx)

View File

@ -110,8 +110,8 @@ func BenchmarkHierarchyMerge(b *testing.B) {
nil,
nil,
true,
fault.New(true),
counter)
counter,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.Zero(t, stats.IgnoredErrorCount)

View File

@ -895,8 +895,8 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
nil,
tags,
true,
fault.New(true),
counter)
counter,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, test.expectedUploadedFiles, stats.TotalFileCount, "total files")
@ -1215,8 +1215,8 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_NoDetailsForMeta() {
nil,
tags,
true,
fault.New(true),
counter)
counter,
fault.New(true))
assert.NoError(t, err, clues.ToCore(err))
assert.Equal(t, test.expectedUploadedFiles, stats.TotalFileCount, "total files")
@ -1310,8 +1310,8 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() {
nil,
nil,
true,
fault.New(true),
count.New())
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
err = k.Compression(ctx, "gzip")
@ -1406,8 +1406,8 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() {
nil,
nil,
true,
errs,
counter)
counter,
errs)
require.Error(t, err, clues.ToCore(err))
assert.Zero(t, stats.ErrorCount, "error count")
assert.Zero(t, counter.Get(count.PersistenceErrors), "error count")
@ -1494,8 +1494,8 @@ func (suite *KopiaIntegrationSuite) TestBackupCollectionsHandlesNoCollections()
nil,
nil,
true,
fault.New(true),
count.New())
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, BackupStats{}, *s)
@ -1656,8 +1656,8 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
nil,
nil,
false,
fault.New(true),
counter)
counter,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
require.Zero(t, stats.ErrorCount)
require.Zero(t, counter.Get(count.PersistenceErrors))
@ -1786,8 +1786,8 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
excluded,
nil,
true,
fault.New(true),
counter)
counter,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, test.expectedCachedItems, stats.CachedFileCount)
assert.Equal(t, test.expectedUncachedItems, stats.UncachedFileCount)

View File

@ -17,6 +17,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/service/sharepoint"
"github.com/alcionai/corso/src/internal/operations/inject"
bupMD "github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/filters"
"github.com/alcionai/corso/src/pkg/path"
@ -35,6 +36,7 @@ import (
func (ctrl *Controller) ProduceBackupCollections(
ctx context.Context,
bpc inject.BackupProducerConfig,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, prefixmatcher.StringSetReader, bool, error) {
service := bpc.Selector.PathService()
@ -94,6 +96,7 @@ func (ctrl *Controller) ProduceBackupCollections(
ctrl.AC,
ctrl.credentials,
ctrl.UpdateStatus,
counter,
errs)
if err != nil {
return nil, nil, false, err

View File

@ -22,6 +22,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -61,7 +62,10 @@ func (suite *DataCollectionIntgSuite) SetupSuite() {
suite.tenantID = creds.AzureTenantID
suite.ac, err = api.NewClient(creds, control.DefaultOptions())
suite.ac, err = api.NewClient(
creds,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
}
@ -251,6 +255,7 @@ func (suite *DataCollectionIntgSuite) TestDataCollections_invalidResourceOwner()
collections, excludes, canUsePreviousBackup, err := ctrl.ProduceBackupCollections(
ctx,
bpc,
count.New(),
fault.New(true))
assert.Error(t, err, clues.ToCore(err))
assert.False(t, canUsePreviousBackup, "can use previous backup")
@ -311,6 +316,7 @@ func (suite *DataCollectionIntgSuite) TestSharePointDataCollection() {
suite.ac,
ctrl.credentials,
ctrl.UpdateStatus,
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")
@ -398,6 +404,7 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Libraries() {
cols, excludes, canUsePreviousBackup, err := ctrl.ProduceBackupCollections(
ctx,
bpc,
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")
@ -448,6 +455,7 @@ func (suite *SPCollectionIntgSuite) TestCreateSharePointCollection_Lists() {
cols, excludes, canUsePreviousBackup, err := ctrl.ProduceBackupCollections(
ctx,
bpc,
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")
@ -534,6 +542,7 @@ func (suite *GroupsCollectionIntgSuite) TestCreateGroupsCollection_SharePoint()
collections, excludes, canUsePreviousBackup, err := ctrl.ProduceBackupCollections(
ctx,
bpc,
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")
@ -634,6 +643,7 @@ func (suite *GroupsCollectionIntgSuite) TestCreateGroupsCollection_SharePoint_In
collections, excludes, canUsePreviousBackup, err := ctrl.ProduceBackupCollections(
ctx,
bpc,
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")

View File

@ -10,6 +10,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -21,7 +22,10 @@ type oneDriveService struct {
}
func NewOneDriveService(credentials account.M365Config) (*oneDriveService, error) {
ac, err := api.NewClient(credentials, control.DefaultOptions())
ac, err := api.NewClient(
credentials,
control.DefaultOptions(),
count.New())
if err != nil {
return nil, err
}

View File

@ -17,6 +17,7 @@ import (
"github.com/alcionai/corso/src/internal/common/str"
"github.com/alcionai/corso/src/internal/m365/collection/drive/metadata"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -191,6 +192,7 @@ func driveItemWriter(
nicu NewItemContentUploader,
driveID, itemID string,
itemSize int64,
counter *count.Bus,
) (io.Writer, string, error) {
ctx = clues.Add(ctx, "upload_item_id", itemID)
@ -199,7 +201,11 @@ func driveItemWriter(
return nil, "", clues.Stack(err)
}
iw := graph.NewLargeItemWriter(itemID, ptr.Val(icu.GetUploadUrl()), itemSize)
iw := graph.NewLargeItemWriter(
itemID,
ptr.Val(icu.GetUploadUrl()),
itemSize,
counter)
return iw, ptr.Val(icu.GetUploadUrl()), nil
}

View File

@ -18,6 +18,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -232,7 +233,10 @@ func (suite *OneDriveIntgSuite) SetupSuite() {
suite.creds = creds
suite.ac, err = api.NewClient(creds, control.DefaultOptions())
suite.ac, err = api.NewClient(
creds,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
}

View File

@ -22,6 +22,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/control/testdata"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -231,7 +232,8 @@ func (suite *ItemIntegrationSuite) TestItemWriter() {
rh,
test.driveID,
ptr.Val(newItem.GetId()),
writeSize)
writeSize,
count.New())
require.NoError(t, err, clues.ToCore(err))
// Using a 32 KB buffer for the copy allows us to validate the

View File

@ -788,7 +788,13 @@ func restoreFile(
return "", details.ItemInfo{}, err
}
w, uploadURL, err := driveItemWriter(ctx, ir, driveID, ptr.Val(newItem.GetId()), ss.Size())
w, uploadURL, err := driveItemWriter(
ctx,
ir,
driveID,
ptr.Val(newItem.GetId()),
ss.Size(),
ctr)
if err != nil {
return "", details.ItemInfo{}, clues.Wrap(err, "get item upload session")
}
@ -841,7 +847,11 @@ func restoreFile(
// refresh the io.Writer to restart the upload
// TODO: @vkamra verify if var session is the desired input
w = graph.NewLargeItemWriter(ptr.Val(newItem.GetId()), uploadURL, ss.Size())
w = graph.NewLargeItemWriter(
ptr.Val(newItem.GetId()),
uploadURL,
ss.Size(),
ctr)
}
if err != nil {

View File

@ -24,6 +24,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/control/testdata"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/services/m365/api"
"github.com/alcionai/corso/src/pkg/services/m365/api/pagers"
@ -61,7 +62,10 @@ func (suite *URLCacheIntegrationSuite) SetupSuite() {
creds, err := acct.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.ac, err = api.NewClient(creds, control.DefaultOptions())
suite.ac, err = api.NewClient(
creds,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
drive, err := suite.ac.Users().GetDefaultDrive(ctx, suite.user)
@ -159,7 +163,7 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
require.Equal(t, false, props.isDeleted)
// Validate download URL
c := graph.NewNoTimeoutHTTPWrapper()
c := graph.NewNoTimeoutHTTPWrapper(count.New())
resp, err := c.Request(
ctx,

View File

@ -27,6 +27,7 @@ import (
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -432,7 +433,10 @@ func (suite *BackupIntgSuite) SetupSuite() {
creds, err := acct.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.ac, err = api.NewClient(creds, control.DefaultOptions())
suite.ac, err = api.NewClient(
creds,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
suite.tenantID = creds.AzureTenantID

View File

@ -18,6 +18,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -699,7 +700,10 @@ func (suite *ContainerResolverSuite) SetupSuite() {
}
func (suite *ContainerResolverSuite) TestPopulate() {
ac, err := api.NewClient(suite.credentials, control.DefaultOptions())
ac, err := api.NewClient(
suite.credentials,
control.DefaultOptions(),
count.New())
require.NoError(suite.T(), err, clues.ToCore(err))
eventFunc := func(t *testing.T) graph.ContainerResolver {

View File

@ -10,6 +10,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -31,7 +32,10 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup {
its.creds = creds
its.ac, err = api.NewClient(creds, control.DefaultOptions())
its.ac, err = api.NewClient(
creds,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
its.userID = tconfig.GetM365UserID(ctx)

View File

@ -13,6 +13,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -83,7 +84,10 @@ func (suite *MailFolderCacheIntegrationSuite) TestDeltaFetch() {
ctx, flush := tester.NewContext(t)
defer flush()
ac, err := api.NewClient(suite.credentials, control.DefaultOptions())
ac, err := api.NewClient(
suite.credentials,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
acm := ac.Mail()

View File

@ -44,7 +44,10 @@ func (suite *RestoreIntgSuite) SetupSuite() {
require.NoError(t, err, clues.ToCore(err))
suite.credentials = m365
suite.ac, err = api.NewClient(m365, control.DefaultOptions())
suite.ac, err = api.NewClient(
m365,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
}

View File

@ -24,6 +24,7 @@ import (
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -463,7 +464,10 @@ func (suite *BackupIntgSuite) SetupSuite() {
creds, err := acct.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.ac, err = api.NewClient(creds, control.DefaultOptions())
suite.ac, err = api.NewClient(
creds,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
suite.tenantID = creds.AzureTenantID

View File

@ -13,6 +13,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -60,6 +61,7 @@ func CollectPages(
ac api.Client,
scope selectors.SharePointScope,
su support.StatusUpdater,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, error) {
logger.Ctx(ctx).Debug("creating SharePoint Pages collections")
@ -74,7 +76,8 @@ func CollectPages(
adpt, err := graph.CreateAdapter(
creds.AzureTenantID,
creds.AzureClientID,
creds.AzureClientSecret)
creds.AzureClientSecret,
counter)
if err != nil {
return nil, clues.Wrap(err, "creating azure client adapter")
}

View File

@ -15,6 +15,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -48,12 +49,16 @@ func (suite *SharePointPagesSuite) TestCollectPages() {
var (
siteID = tconfig.M365SiteID(t)
a = tconfig.NewM365Account(t)
counter = count.New()
)
creds, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
ac, err := api.NewClient(creds, control.DefaultOptions())
ac, err := api.NewClient(
creds,
control.DefaultOptions(),
counter)
require.NoError(t, err, clues.ToCore(err))
bpc := inject.BackupProducerConfig{
@ -71,6 +76,7 @@ func (suite *SharePointPagesSuite) TestCollectPages() {
ac,
sel.Lists(selectors.Any())[0],
(&MockGraphService{}).UpdateStatus,
counter,
fault.New(true))
assert.NoError(t, err, clues.ToCore(err))
assert.NotEmpty(t, col)

View File

@ -22,6 +22,7 @@ import (
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/control/testdata"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -45,7 +46,10 @@ func (suite *SharePointCollectionSuite) SetupSuite() {
suite.creds = m365
ac, err := api.NewClient(m365, control.DefaultOptions())
ac, err := api.NewClient(
m365,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
suite.ac = ac

View File

@ -10,6 +10,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/count"
)
// ---------------------------------------------------------------------------
@ -50,7 +51,8 @@ func createTestService(t *testing.T, credentials account.M365Config) *graph.Serv
adapter, err := graph.CreateAdapter(
credentials.AzureTenantID,
credentials.AzureClientID,
credentials.AzureClientSecret)
credentials.AzureClientSecret,
count.New())
require.NoError(t, err, "creating microsoft graph service for exchange", clues.ToCore(err))
return graph.NewService(adapter)

View File

@ -16,6 +16,7 @@ import (
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -68,6 +69,7 @@ func NewController(
acct account.Account,
pst path.ServiceType,
co control.Options,
counter *count.Bus,
) (*Controller, error) {
graph.InitializeConcurrencyLimiter(ctx, pst == path.ExchangeService, co.Parallelism.ItemFetch)
@ -76,7 +78,7 @@ func NewController(
return nil, clues.Wrap(err, "retrieving m365 account configuration").WithClues(ctx)
}
ac, err := api.NewClient(creds, co)
ac, err := api.NewClient(creds, co, counter)
if err != nil {
return nil, clues.Wrap(err, "creating api client").WithClues(ctx)
}

View File

@ -637,6 +637,7 @@ func runBackupAndCompare(
dcs, excludes, canUsePreviousBackup, err := backupCtrl.ProduceBackupCollections(
ctx,
bpc,
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")
@ -1232,6 +1233,7 @@ func (suite *ControllerIntegrationSuite) TestMultiFolderBackupDifferentNames() {
dcs, excludes, canUsePreviousBackup, err := backupCtrl.ProduceBackupCollections(
ctx,
bpc,
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")
@ -1411,6 +1413,7 @@ func (suite *ControllerIntegrationSuite) TestBackup_CreatesPrefixCollections() {
dcs, excludes, canUsePreviousBackup, err := backupCtrl.ProduceBackupCollections(
ctx,
bpc,
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")

View File

@ -12,6 +12,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/count"
)
type BetaClientSuite struct {
@ -45,7 +46,8 @@ func (suite *BetaClientSuite) TestCreateBetaClient() {
adpt, err := graph.CreateAdapter(
suite.credentials.AzureTenantID,
suite.credentials.AzureClientID,
suite.credentials.AzureClientSecret)
suite.credentials.AzureClientSecret,
count.New())
require.NoError(t, err, clues.ToCore(err))
@ -66,7 +68,8 @@ func (suite *BetaClientSuite) TestBasicClientGetFunctionality() {
adpt, err := graph.CreateAdapter(
suite.credentials.AzureTenantID,
suite.credentials.AzureClientID,
suite.credentials.AzureClientSecret)
suite.credentials.AzureClientSecret,
count.New())
require.NoError(t, err, clues.ToCore(err))
client := NewBetaClient(adpt)

View File

@ -292,6 +292,7 @@ func (tf *timedFence) RaiseFence(seconds time.Duration) {
// throttlingMiddleware is used to ensure we don't overstep per-min request limits.
type throttlingMiddleware struct {
tf *timedFence
counter *count.Bus
}
func (mw *throttlingMiddleware) Intercept(
@ -314,8 +315,7 @@ func (mw *throttlingMiddleware) Intercept(
return resp, nil
}
countBus := count.Ctx(req.Context())
countBus.Inc(count.ThrottledAPICalls)
mw.counter.Inc(count.ThrottledAPICalls)
// if all prior conditions pass, we need to add a fence that blocks
// calls, globally, from progressing until the timeout retry-after

View File

@ -16,6 +16,7 @@ import (
"golang.org/x/net/context"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/count"
)
type ConcurrencyMWUnitTestSuite struct {
@ -307,7 +308,10 @@ func (suite *ConcurrencyMWUnitTestSuite) TestThrottlingMiddleware() {
ctx, flush := tester.NewContext(t)
defer flush()
tm := throttlingMiddleware{newTimedFence()}
tm := &throttlingMiddleware{
tf: newTimedFence(),
counter: count.New(),
}
req := &http.Request{}
req = req.WithContext(ctx)

View File

@ -13,6 +13,7 @@ import (
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/logger"
)
@ -36,12 +37,15 @@ type Requester interface {
// and consume relatively unbound socket connections. It is important
// to centralize this client to be passed downstream where api calls
// can utilize it on a per-download basis.
func NewHTTPWrapper(opts ...Option) *httpWrapper {
func NewHTTPWrapper(
counter *count.Bus,
opts ...Option,
) *httpWrapper {
var (
cc = populateConfig(opts...)
rt = customTransport{
n: pipeline{
middlewares: internalMiddleware(cc),
middlewares: internalMiddleware(cc, counter),
transport: defaultTransport(),
},
}
@ -66,9 +70,12 @@ func NewHTTPWrapper(opts ...Option) *httpWrapper {
// and consume relatively unbound socket connections. It is important
// to centralize this client to be passed downstream where api calls
// can utilize it on a per-download basis.
func NewNoTimeoutHTTPWrapper(opts ...Option) *httpWrapper {
func NewNoTimeoutHTTPWrapper(
counter *count.Bus,
opts ...Option,
) *httpWrapper {
opts = append(opts, NoTimeout())
return NewHTTPWrapper(opts...)
return NewHTTPWrapper(counter, opts...)
}
// ---------------------------------------------------------------------------
@ -178,7 +185,15 @@ func defaultTransport() http.RoundTripper {
return defaultTransport
}
func internalMiddleware(cc *clientConfig) []khttp.Middleware {
func internalMiddleware(
cc *clientConfig,
counter *count.Bus,
) []khttp.Middleware {
throttler := &throttlingMiddleware{
tf: newTimedFence(),
counter: counter,
}
mw := []khttp.Middleware{
&RetryMiddleware{
MaxRetries: cc.maxRetries,
@ -187,7 +202,7 @@ func internalMiddleware(cc *clientConfig) []khttp.Middleware {
khttp.NewRetryHandler(),
khttp.NewRedirectHandler(),
&LoggingMiddleware{},
&throttlingMiddleware{newTimedFence()},
throttler,
&RateLimiterMiddleware{},
&MetricsMiddleware{},
}

View File

@ -14,6 +14,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/count"
)
type HTTPWrapperIntgSuite struct {
@ -34,7 +35,7 @@ func (suite *HTTPWrapperIntgSuite) TestNewHTTPWrapper() {
ctx, flush := tester.NewContext(t)
defer flush()
hw := NewHTTPWrapper()
hw := NewHTTPWrapper(count.New())
resp, err := hw.Request(
ctx,
@ -107,7 +108,7 @@ func (suite *HTTPWrapperUnitSuite) TestNewHTTPWrapper_redirectMiddleware() {
},
}
hw := NewHTTPWrapper(appendMiddleware(&mwResp))
hw := NewHTTPWrapper(count.New(), appendMiddleware(&mwResp))
resp, err := hw.Request(ctx, http.MethodGet, url, nil, nil)
require.NoError(t, err, clues.ToCore(err))
@ -175,6 +176,7 @@ func (suite *HTTPWrapperUnitSuite) TestNewHTTPWrapper_http2StreamErrorRetries()
}
hw := NewHTTPWrapper(
count.New(),
appendMiddleware(&mwResp),
MaxConnectionRetries(test.retries))

View File

@ -323,7 +323,9 @@ func (mw RetryMiddleware) getRetryDelay(
// ---------------------------------------------------------------------------
// MetricsMiddleware aggregates per-request metrics on the events bus
type MetricsMiddleware struct{}
type MetricsMiddleware struct {
counter *count.Bus
}
const xmruHeader = "x-ms-resource-unit"
@ -362,8 +364,7 @@ func (mw *MetricsMiddleware) Intercept(
xmrui = 1
}
countBus := count.Ctx(req.Context())
countBus.Add(count.APICallTokensConsumed, int64(xmrui))
mw.counter.Add(count.APICallTokensConsumed, int64(xmrui))
events.IncN(xmrui, events.APICall, xmruHeader)

View File

@ -25,6 +25,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/path"
)
@ -109,7 +110,7 @@ func mockAdapter(
var (
clientOptions = msgraphsdkgo.GetDefaultClientOptions()
cc = populateConfig(MinimumBackoff(10 * time.Millisecond))
middlewares = append(kiotaMiddlewares(&clientOptions, cc), mw)
middlewares = append(kiotaMiddlewares(&clientOptions, cc, count.New()), mw)
httpClient = msgraphgocore.GetDefaultClient(&clientOptions, middlewares...)
)

View File

@ -7,13 +7,19 @@ import (
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/count"
)
func NewService(creds account.M365Config, opts ...graph.Option) (*graph.Service, error) {
func NewService(
creds account.M365Config,
counter *count.Bus,
opts ...graph.Option,
) (*graph.Service, error) {
a, err := CreateAdapter(
creds.AzureTenantID,
creds.AzureClientID,
creds.AzureClientSecret,
counter,
opts...)
if err != nil {
return nil, clues.Wrap(err, "generating graph adapter")
@ -26,6 +32,7 @@ func NewService(creds account.M365Config, opts ...graph.Option) (*graph.Service,
// enable interceptions via gock to make it mockable.
func CreateAdapter(
tenant, client, secret string,
counter *count.Bus,
opts ...graph.Option,
) (*msgraphsdkgo.GraphRequestAdapter, error) {
auth, err := graph.GetAuth(tenant, client, secret)
@ -33,7 +40,7 @@ func CreateAdapter(
return nil, err
}
httpClient, _ := graph.KiotaHTTPClient(opts...)
httpClient, _ := graph.KiotaHTTPClient(counter, opts...)
// This makes sure that we are able to intercept any requests via
// gock. Only necessary for testing.

View File

@ -17,6 +17,7 @@ import (
"github.com/alcionai/corso/src/internal/common/crash"
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/filters"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -104,6 +105,7 @@ func (s Service) Serialize(object serialization.Parsable) ([]byte, error) {
// to create a graph api client connection.
func CreateAdapter(
tenant, client, secret string,
counter *count.Bus,
opts ...Option,
) (abstractions.RequestAdapter, error) {
auth, err := GetAuth(tenant, client, secret)
@ -111,7 +113,7 @@ func CreateAdapter(
return nil, err
}
httpClient, cc := KiotaHTTPClient(opts...)
httpClient, cc := KiotaHTTPClient(counter, opts...)
adpt, err := msgraphsdkgo.NewGraphRequestAdapterWithParseNodeFactoryAndSerializationWriterFactoryAndHttpClient(
auth,
@ -148,11 +150,14 @@ func GetAuth(tenant string, client string, secret string) (*kauth.AzureIdentityA
// and consume relatively unbound socket connections. It is important
// to centralize this client to be passed downstream where api calls
// can utilize it on a per-download basis.
func KiotaHTTPClient(opts ...Option) (*http.Client, *clientConfig) {
func KiotaHTTPClient(
counter *count.Bus,
opts ...Option,
) (*http.Client, *clientConfig) {
var (
clientOptions = msgraphsdkgo.GetDefaultClientOptions()
cc = populateConfig(opts...)
middlewares = kiotaMiddlewares(&clientOptions, cc)
middlewares = kiotaMiddlewares(&clientOptions, cc, counter)
httpClient = msgraphgocore.GetDefaultClient(&clientOptions, middlewares...)
)
@ -271,6 +276,7 @@ func MaxConnectionRetries(max int) Option {
func kiotaMiddlewares(
options *msgraphgocore.GraphClientOptions,
cc *clientConfig,
counter *count.Bus,
) []khttp.Middleware {
mw := []khttp.Middleware{
msgraphgocore.NewGraphTelemetryHandler(options),
@ -291,9 +297,14 @@ func kiotaMiddlewares(
mw = append(mw, concurrencyLimitMiddlewareSingleton)
}
throttler := &throttlingMiddleware{
tf: newTimedFence(),
counter: counter,
}
mw = append(
mw,
&throttlingMiddleware{newTimedFence()},
throttler,
&RateLimiterMiddleware{},
&MetricsMiddleware{})

View File

@ -16,6 +16,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/count"
)
type GraphIntgSuite struct {
@ -59,7 +60,8 @@ func (suite *GraphIntgSuite) TestCreateAdapter() {
adpt, err := CreateAdapter(
suite.fakeCredentials.AzureTenantID,
suite.fakeCredentials.AzureClientID,
suite.fakeCredentials.AzureClientSecret)
suite.fakeCredentials.AzureClientSecret,
count.New())
assert.NoError(t, err, clues.ToCore(err))
assert.NotNil(t, adpt)
@ -143,7 +145,7 @@ func (suite *GraphIntgSuite) TestHTTPClient() {
suite.Run(test.name, func() {
t := suite.T()
cli, cc := KiotaHTTPClient(test.opts...)
cli, cc := KiotaHTTPClient(count.New(), test.opts...)
assert.NotNil(t, cli)
test.check(t, cli)
test.checkConfig(t, cc)
@ -156,7 +158,8 @@ func (suite *GraphIntgSuite) TestSerializationEndPoint() {
adpt, err := CreateAdapter(
suite.fakeCredentials.AzureTenantID,
suite.fakeCredentials.AzureClientID,
suite.fakeCredentials.AzureClientSecret)
suite.fakeCredentials.AzureClientSecret,
count.New())
require.NoError(t, err, clues.ToCore(err))
serv := NewService(adpt)
@ -189,6 +192,7 @@ func (suite *GraphIntgSuite) TestAdapterWrap_catchesPanic() {
suite.credentials.AzureTenantID,
suite.credentials.AzureClientID,
suite.credentials.AzureClientSecret,
count.New(),
appendMiddleware(&alwaysPanicMiddleware))
require.NoError(t, err, clues.ToCore(err))
@ -210,13 +214,13 @@ func (suite *GraphIntgSuite) TestAdapterWrap_retriesConnectionClose() {
defer flush()
url := "https://graph.microsoft.com/fnords/beaux/regard"
count := 0
retryInc := 0
// the panics should get caught and returned as errors
alwaysECONNRESET := mwForceResp{
err: syscall.ECONNRESET,
alternate: func(req *http.Request) (bool, *http.Response, error) {
count++
retryInc++
return false, nil, nil
},
}
@ -225,18 +229,19 @@ func (suite *GraphIntgSuite) TestAdapterWrap_retriesConnectionClose() {
suite.credentials.AzureTenantID,
suite.credentials.AzureClientID,
suite.credentials.AzureClientSecret,
count.New(),
appendMiddleware(&alwaysECONNRESET))
require.NoError(t, err, clues.ToCore(err))
// the query doesn't matter
_, err = users.NewItemCalendarsItemEventsDeltaRequestBuilder(url, adpt).Get(ctx, nil)
require.ErrorIs(t, err, syscall.ECONNRESET, clues.ToCore(err))
require.Equal(t, 16, count, "number of retries")
require.Equal(t, 16, retryInc, "number of retries")
count = 0
retryInc = 0
// the query doesn't matter
_, err = NewService(adpt).Client().Users().Get(ctx, nil)
require.ErrorIs(t, err, syscall.ECONNRESET, clues.ToCore(err))
require.Equal(t, 16, count, "number of retries")
require.Equal(t, 16, retryInc, "number of retries")
}

View File

@ -9,6 +9,7 @@ import (
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/logger"
)
@ -36,12 +37,16 @@ type largeItemWriter struct {
client httpWrapper
}
func NewLargeItemWriter(parentID, url string, size int64) *largeItemWriter {
func NewLargeItemWriter(
parentID, url string,
size int64,
counter *count.Bus,
) *largeItemWriter {
return &largeItemWriter{
parentID: parentID,
url: url,
contentLength: size,
client: *NewNoTimeoutHTTPWrapper(),
client: *NewNoTimeoutHTTPWrapper(counter),
}
}

View File

@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/count"
)
type UploadSessionSuite struct {
@ -69,7 +70,7 @@ func (suite *UploadSessionSuite) TestWriter() {
defer ts.Close()
writer := NewLargeItemWriter("item", ts.URL, writeSize)
writer := NewLargeItemWriter("item", ts.URL, writeSize, count.New())
// Using a 32 KB buffer for the copy allows us to validate the
// multi-part upload. `io.CopyBuffer` will only write 32 KB at

View File

@ -23,6 +23,7 @@ import (
m365Stub "github.com/alcionai/corso/src/internal/m365/stub"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -1217,7 +1218,12 @@ func newController(
) *Controller {
a := tconfig.NewM365Account(t)
controller, err := NewController(ctx, a, pst, control.Options{})
controller, err := NewController(
ctx,
a,
pst,
control.Options{},
count.New())
require.NoError(t, err, clues.ToCore(err))
return controller

View File

@ -40,6 +40,7 @@ type Controller struct {
func (ctrl Controller) ProduceBackupCollections(
_ context.Context,
_ inject.BackupProducerConfig,
_ *count.Bus,
_ *fault.Bus,
) (
[]data.BackupCollection,

View File

@ -18,6 +18,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -94,7 +95,10 @@ func (suite *groupsIntegrationSuite) SetupSuite() {
creds, err := acct.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.ac, err = api.NewClient(creds, control.DefaultOptions())
suite.ac, err = api.NewClient(
creds,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
suite.tenantID = creds.AzureTenantID

View File

@ -12,6 +12,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/graph/betasdk/models"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/count"
)
type BetaUnitSuite struct {
@ -31,7 +32,8 @@ func (suite *BetaUnitSuite) TestBetaService_Adapter() {
adpt, err := graph.CreateAdapter(
m365.AzureTenantID,
m365.AzureClientID,
m365.AzureClientSecret)
m365.AzureClientSecret,
count.New())
require.NoError(t, err, clues.ToCore(err))
service := NewBetaService(adpt)

View File

@ -19,6 +19,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control/testdata"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
)
@ -26,7 +27,8 @@ func createTestBetaService(t *testing.T, credentials account.M365Config) *api.Be
adapter, err := graph.CreateAdapter(
credentials.AzureTenantID,
credentials.AzureClientID,
credentials.AzureClientSecret)
credentials.AzureClientSecret,
count.New())
require.NoError(t, err, clues.ToCore(err))
return api.NewBetaService(adapter)

View File

@ -14,6 +14,7 @@ import (
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -25,6 +26,7 @@ func ProduceBackupCollections(
ac api.Client,
creds account.M365Config,
su support.StatusUpdater,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, *prefixmatcher.StringSetMatcher, bool, error) {
b, err := bpc.Selector.ToSharePointBackup()
@ -102,6 +104,7 @@ func ProduceBackupCollections(
ac,
scope,
su,
counter,
errs)
if err != nil {
el.AddRecoverable(ctx, err)

View File

@ -204,9 +204,6 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
defer flushMetrics()
// for cases where we can't pass the counter down as part of a func call.
ctx = count.Embed(ctx, op.Counter)
// Check if the protected resource has the service enabled in order for us
// to run a backup.
enabled, err := op.bp.IsServiceEnabled(
@ -421,6 +418,7 @@ func (op *BackupOperation) do(
mdColls,
lastBackupVersion,
op.Options,
op.Counter,
op.Errors)
if err != nil {
return nil, clues.Wrap(err, "producing backup data collections")
@ -441,8 +439,8 @@ func (op *BackupOperation) do(
ssmb,
backupID,
op.incremental && canUseMetadata && canUsePreviousBackup,
op.Errors,
op.Counter)
op.Counter,
op.Errors)
if err != nil {
return nil, clues.Wrap(err, "persisting collection backups")
}
@ -500,6 +498,7 @@ func produceBackupDataCollections(
metadata []data.RestoreCollection,
lastBackupVersion int,
ctrlOpts control.Options,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, prefixmatcher.StringSetReader, bool, error) {
progressBar := observe.MessageWithCompletion(ctx, "Discovering items to backup")
@ -513,7 +512,7 @@ func produceBackupDataCollections(
Selector: sel,
}
return bp.ProduceBackupCollections(ctx, bpc, errs)
return bp.ProduceBackupCollections(ctx, bpc, counter, errs)
}
// ---------------------------------------------------------------------------
@ -531,8 +530,8 @@ func consumeBackupCollections(
pmr prefixmatcher.StringSetReader,
backupID model.StableID,
isIncremental bool,
errs *fault.Bus,
counter *count.Bus,
errs *fault.Bus,
) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error) {
ctx = clues.Add(
ctx,
@ -555,8 +554,8 @@ func consumeBackupCollections(
pmr,
tags,
isIncremental,
errs,
counter)
counter,
errs)
if err != nil {
if kopiaStats == nil {
return nil, nil, nil, clues.Stack(err)

View File

@ -136,8 +136,8 @@ func (mbu mockBackupConsumer) ConsumeBackupCollections(
excluded prefixmatcher.StringSetReader,
tags map[string]string,
buildTreeWithBase bool,
errs *fault.Bus,
counter *count.Bus,
errs *fault.Bus,
) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error) {
if mbu.checkFunc != nil {
mbu.checkFunc(backupReasons, bases, cs, tags, buildTreeWithBase)
@ -529,8 +529,8 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_ConsumeBackupDataCollections
nil,
backupID,
true,
fault.New(true),
count.New())
count.New(),
fault.New(true))
}
func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsItems() {
@ -1439,7 +1439,10 @@ func (suite *BackupOpIntegrationSuite) SetupSuite() {
creds, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.ac, err = api.NewClient(creds, control.DefaultOptions())
suite.ac, err = api.NewClient(
creds,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
}

View File

@ -11,6 +11,7 @@ import (
"github.com/alcionai/corso/src/internal/m365"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/selectors"
)
@ -25,7 +26,12 @@ func ControllerWithSelector(
ins idname.Cacher,
onFail func(),
) (*m365.Controller, selectors.Selector) {
ctrl, err := m365.NewController(ctx, acct, sel.PathService(), control.DefaultOptions())
ctrl, err := m365.NewController(
ctx,
acct,
sel.PathService(),
control.DefaultOptions(),
count.New())
if !assert.NoError(t, err, clues.ToCore(err)) {
if onFail != nil {
onFail()

View File

@ -23,6 +23,7 @@ type (
ProduceBackupCollections(
ctx context.Context,
bpc BackupProducerConfig,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, prefixmatcher.StringSetReader, bool, error)

View File

@ -11,6 +11,7 @@ import (
kinject "github.com/alcionai/corso/src/internal/kopia/inject"
"github.com/alcionai/corso/src/internal/m365"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -38,6 +39,7 @@ func NewMockBackupProducer(
func (mbp *mockBackupProducer) ProduceBackupCollections(
context.Context,
inject.BackupProducerConfig,
*count.Bus,
*fault.Bus,
) ([]data.BackupCollection, prefixmatcher.StringSetReader, bool, error) {
if mbp.injectNonRecoverableErr {

View File

@ -136,8 +136,6 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
defer flushMetrics()
ctx = count.Embed(ctx, op.Counter)
cats, err := op.Selectors.AllHumanPathCategories()
if err != nil {
// No need to exit over this, we'll just be missing a bit of info in the

View File

@ -344,7 +344,8 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_errorNoBackup() {
ctx,
suite.acct,
rsel.PathService(),
control.DefaultOptions())
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
ro, err := NewRestoreOperation(

View File

@ -393,7 +393,10 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr
creds, err := acct.M365Config()
require.NoError(t, err, clues.ToCore(err))
ac, err := api.NewClient(creds, control.DefaultOptions())
ac, err := api.NewClient(
creds,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
// generate 3 new folders with two items each.

View File

@ -52,13 +52,13 @@ import (
// GockClient produces a new exchange api client that can be
// mocked using gock.
func gockClient(creds account.M365Config) (api.Client, error) {
s, err := gmock.NewService(creds)
func gockClient(creds account.M365Config, counter *count.Bus) (api.Client, error) {
s, err := gmock.NewService(creds, counter)
if err != nil {
return api.Client{}, err
}
li, err := gmock.NewService(creds, graph.NoTimeout())
li, err := gmock.NewService(creds, counter, graph.NoTimeout())
if err != nil {
return api.Client{}, err
}
@ -562,7 +562,12 @@ func ControllerWithSelector(
ins idname.Cacher,
onFail func(*testing.T, context.Context),
) (*m365.Controller, selectors.Selector) {
ctrl, err := m365.NewController(ctx, acct, sel.PathService(), control.DefaultOptions())
ctrl, err := m365.NewController(
ctx,
acct,
sel.PathService(),
control.DefaultOptions(),
count.New())
if !assert.NoError(t, err, clues.ToCore(err)) {
if onFail != nil {
onFail(t, ctx)
@ -623,10 +628,15 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup {
creds, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
its.ac, err = api.NewClient(creds, control.DefaultOptions())
counter := count.New()
its.ac, err = api.NewClient(
creds,
control.DefaultOptions(),
counter)
require.NoError(t, err, clues.ToCore(err))
its.gockAC, err = gockClient(creds)
its.gockAC, err = gockClient(creds, counter)
require.NoError(t, err, clues.ToCore(err))
its.user = userIDs(t, tconfig.M365UserID(t), its.ac)

View File

@ -814,7 +814,12 @@ func runDriveIncrementalTest(
}
for _, test := range table {
suite.Run(test.name, func() {
cleanCtrl, err := m365.NewController(ctx, acct, sel.PathService(), control.DefaultOptions())
cleanCtrl, err := m365.NewController(
ctx,
acct,
sel.PathService(),
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
bod.ctrl = cleanCtrl
@ -1117,7 +1122,8 @@ func (suite *OneDriveBackupIntgSuite) TestBackup_Run_oneDriveOwnerMigration() {
ctx,
acct,
path.OneDriveService,
control.DefaultOptions())
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
userable, err := ctrl.AC.Users().GetByID(ctx, suite.its.user.ID)

View File

@ -216,8 +216,8 @@ func write(
prefixmatcher.NopReader[map[string]struct{}](),
nil,
false,
errs,
count.New())
count.New(),
errs)
if err != nil {
return "", clues.Wrap(err, "storing marshalled bytes in repository")
}

View File

@ -1,8 +1,6 @@
package count
import (
"context"
"github.com/puzpuzpuz/xsync/v2"
)
@ -116,33 +114,3 @@ func (b *Bus) TotalValues() map[string]int64 {
return b.Values()
}
// ---------------------------------------------------------------------------
// context embedding
// ---------------------------------------------------------------------------
type countKey string
const ctxKey countKey = "corsoCounter"
// Ctx retrieves the count.Bus embedded in the context.
func Ctx(ctx context.Context) *Bus {
l := ctx.Value(ctxKey)
if l == nil {
return New()
}
return l.(*Bus)
}
// Embed allows users to embed their own count.Bus within the context.
func Embed(
ctx context.Context,
bus *Bus,
) context.Context {
if bus == nil {
bus = New()
}
return context.WithValue(ctx, ctxKey, bus)
}

View File

@ -79,7 +79,12 @@ func connectToM365(
progressBar := observe.MessageWithCompletion(ctx, "Connecting to M365")
defer close(progressBar)
ctrl, err := m365.NewController(ctx, r.Account, pst, r.Opts)
ctrl, err := m365.NewController(
ctx,
r.Account,
pst,
r.Opts,
r.counter.Local())
if err != nil {
return nil, clues.Wrap(err, "creating m365 client controller")
}

View File

@ -18,6 +18,7 @@ import (
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
ctrlRepo "github.com/alcionai/corso/src/pkg/control/repository"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/storage"
@ -57,6 +58,8 @@ type Repositoryer interface {
ctx context.Context,
rcOpts ctrlRepo.Retention,
) (operations.RetentionConfigOperation, error)
Counter() *count.Bus
}
// Repository contains storage provider information.
@ -70,6 +73,7 @@ type repository struct {
Opts control.Options
Provider DataProvider // the client controller used for external user data CRUD
counter *count.Bus
Bus events.Eventer
dataLayer *kopia.Wrapper
modelStore *kopia.ModelStore
@ -110,6 +114,7 @@ func New(
Version: "v1",
Account: acct,
Storage: st,
counter: count.New(),
Bus: bus,
Opts: opts,
}
@ -336,6 +341,10 @@ func (r repository) NewRetentionConfig(
r.Bus)
}
func (r repository) Counter() *count.Bus {
return r.counter
}
// ---------------------------------------------------------------------------
// Repository ID Model
// ---------------------------------------------------------------------------

View File

@ -11,6 +11,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
)
type AccessAPIIntgSuite struct {
@ -109,7 +110,10 @@ func (suite *AccessAPIIntgSuite) TestGetToken() {
ctx, flush := tester.NewContext(t)
defer flush()
ac, err := NewClient(suite.its.ac.Credentials, control.DefaultOptions())
ac, err := NewClient(
suite.its.ac.Credentials,
control.DefaultOptions(),
count.New())
require.NoError(t, err, clues.ToCore(err))
ac.Credentials = test.creds()

View File

@ -10,6 +10,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/path"
)
@ -39,29 +40,44 @@ type Client struct {
// graph api client.
Requester graph.Requester
counter *count.Bus
options control.Options
}
// NewClient produces a new exchange api client. Must be used in
// place of creating an ad-hoc client struct.
func NewClient(creds account.M365Config, co control.Options) (Client, error) {
s, err := NewService(creds)
func NewClient(
creds account.M365Config,
co control.Options,
counter *count.Bus,
) (Client, error) {
s, err := NewService(creds, counter)
if err != nil {
return Client{}, err
}
li, err := newLargeItemService(creds)
li, err := newLargeItemService(creds, counter)
if err != nil {
return Client{}, err
}
rqr := graph.NewNoTimeoutHTTPWrapper()
rqr := graph.NewNoTimeoutHTTPWrapper(counter)
if co.DeltaPageSize < 1 || co.DeltaPageSize > maxDeltaPageSize {
co.DeltaPageSize = maxDeltaPageSize
}
return Client{creds, s, li, rqr, co}, nil
cli := Client{
Credentials: creds,
Stable: s,
LargeItem: li,
Requester: rqr,
counter: counter,
options: co,
}
return cli, nil
}
// initConcurrencyLimit ensures that the graph concurrency limiter is
@ -77,15 +93,20 @@ func InitConcurrencyLimit(ctx context.Context, pst path.ServiceType) {
// so that in-flight state within the adapter doesn't get clobbered.
// Most calls should use the Client.Stable property instead of calling this
// func, unless it is explicitly necessary.
func (c Client) Service() (graph.Servicer, error) {
return NewService(c.Credentials)
func (c Client) Service(counter *count.Bus) (graph.Servicer, error) {
return NewService(c.Credentials, counter)
}
func NewService(creds account.M365Config, opts ...graph.Option) (*graph.Service, error) {
func NewService(
creds account.M365Config,
counter *count.Bus,
opts ...graph.Option,
) (*graph.Service, error) {
a, err := graph.CreateAdapter(
creds.AzureTenantID,
creds.AzureClientID,
creds.AzureClientSecret,
counter,
opts...)
if err != nil {
return nil, clues.Wrap(err, "generating graph api adapter")
@ -94,8 +115,11 @@ func NewService(creds account.M365Config, opts ...graph.Option) (*graph.Service,
return graph.NewService(a), nil
}
func newLargeItemService(creds account.M365Config) (*graph.Service, error) {
a, err := NewService(creds, graph.NoTimeout())
func newLargeItemService(
creds account.M365Config,
counter *count.Bus,
) (*graph.Service, error) {
a, err := NewService(creds, counter, graph.NoTimeout())
if err != nil {
return nil, clues.Wrap(err, "generating no-timeout graph adapter")
}

View File

@ -63,7 +63,7 @@ func (c Contacts) DeleteContainer(
) error {
// deletes require unique http clients
// https://github.com/alcionai/corso/issues/2707
srv, err := NewService(c.Credentials)
srv, err := NewService(c.Credentials, c.counter)
if err != nil {
return graph.Stack(ctx, err)
}
@ -230,7 +230,7 @@ func (c Contacts) DeleteItem(
) error {
// deletes require unique http clients
// https://github.com/alcionai/corso/issues/2707
srv, err := c.Service()
srv, err := c.Service(c.counter)
if err != nil {
return graph.Stack(ctx, err)
}

View File

@ -239,7 +239,7 @@ func (c Drives) DeleteItem(
) error {
// deletes require unique http clients
// https://github.com/alcionai/corso/issues/2707
srv, err := c.Service()
srv, err := c.Service(c.counter)
if err != nil {
return graph.Wrap(ctx, err, "creating adapter to delete item permission")
}
@ -309,7 +309,7 @@ func (c Drives) DeleteItemPermission(
) error {
// deletes require unique http clients
// https://github.com/alcionai/corso/issues/2707
srv, err := c.Service()
srv, err := c.Service(c.counter)
if err != nil {
return graph.Wrap(ctx, err, "creating adapter to delete item permission")
}

View File

@ -73,7 +73,7 @@ func (c Events) DeleteContainer(
) error {
// deletes require unique http clients
// https://github.com/alcionai/corso/issues/2707
srv, err := NewService(c.Credentials)
srv, err := NewService(c.Credentials, c.counter)
if err != nil {
return graph.Stack(ctx, err)
}
@ -475,7 +475,7 @@ func (c Events) DeleteItem(
) error {
// deletes require unique http clients
// https://github.com/alcionai/corso/issues/2707
srv, err := c.Service()
srv, err := c.Service(c.counter)
if err != nil {
return graph.Stack(ctx, err)
}
@ -541,7 +541,7 @@ func (c Events) PostLargeAttachment(
}
url := ptr.Val(us.GetUploadUrl())
w := graph.NewLargeItemWriter(parentItemID, url, size)
w := graph.NewLargeItemWriter(parentItemID, url, size, c.counter)
copyBuffer := make([]byte, graph.AttachmentChunkSize)
_, err = io.CopyBuffer(w, bytes.NewReader(content), copyBuffer)

View File

@ -42,7 +42,7 @@ func (c Groups) GetAll(
ctx context.Context,
errs *fault.Bus,
) ([]models.Groupable, error) {
service, err := c.Service()
service, err := c.Service(c.counter)
if err != nil {
return nil, err
}
@ -110,7 +110,7 @@ func (c Groups) GetByID(
identifier string,
_ CallConfig, // matching standards
) (models.Groupable, error) {
service, err := c.Service()
service, err := c.Service(c.counter)
if err != nil {
return nil, err
}
@ -229,7 +229,7 @@ func (c Groups) GetAllSites(
return nil, clues.Wrap(err, "getting channels")
}
service, err := c.Service()
service, err := c.Service(c.counter)
if err != nil {
return nil, graph.Stack(ctx, err)
}
@ -292,7 +292,7 @@ func (c Groups) GetRootSite(
ctx context.Context,
identifier string,
) (models.Siteable, error) {
service, err := c.Service()
service, err := c.Service(c.counter)
if err != nil {
return nil, err
}

View File

@ -19,6 +19,7 @@ import (
"github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
)
// ---------------------------------------------------------------------------
@ -27,13 +28,13 @@ import (
// GockClient produces a new exchange api client that can be
// mocked using gock.
func gockClient(creds account.M365Config) (Client, error) {
s, err := gmock.NewService(creds)
func gockClient(creds account.M365Config, counter *count.Bus) (Client, error) {
s, err := gmock.NewService(creds, counter)
if err != nil {
return Client{}, err
}
li, err := gmock.NewService(creds, graph.NoTimeout())
li, err := gmock.NewService(creds, counter, graph.NoTimeout())
if err != nil {
return Client{}, err
}
@ -128,10 +129,10 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup {
creds, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
its.ac, err = NewClient(creds, control.DefaultOptions())
its.ac, err = NewClient(creds, control.DefaultOptions(), count.New())
require.NoError(t, err, clues.ToCore(err))
its.gockAC, err = gockClient(creds)
its.gockAC, err = gockClient(creds, count.New())
require.NoError(t, err, clues.ToCore(err))
// user drive

View File

@ -73,7 +73,7 @@ func (c Mail) DeleteContainer(
) error {
// deletes require unique http clients
// https://github.com/alcionai/corso/issues/2707
srv, err := NewService(c.Credentials)
srv, err := NewService(c.Credentials, c.counter)
if err != nil {
return graph.Stack(ctx, err)
}
@ -442,7 +442,7 @@ func (c Mail) DeleteItem(
) error {
// deletes require unique http clients
// https://github.com/alcionai/corso/issues/2707
srv, err := NewService(c.Credentials)
srv, err := NewService(c.Credentials, c.counter)
if err != nil {
return graph.Stack(ctx, err)
}
@ -508,7 +508,7 @@ func (c Mail) PostLargeAttachment(
}
url := ptr.Val(us.GetUploadUrl())
w := graph.NewLargeItemWriter(parentItemID, url, size)
w := graph.NewLargeItemWriter(parentItemID, url, size, c.counter)
copyBuffer := make([]byte, graph.AttachmentChunkSize)
_, err = io.CopyBuffer(w, bytes.NewReader(content), copyBuffer)

View File

@ -58,7 +58,7 @@ func (c Users) GetAll(
ctx context.Context,
errs *fault.Bus,
) ([]models.Userable, error) {
service, err := c.Service()
service, err := c.Service(c.counter)
if err != nil {
return nil, err
}

View File

@ -109,7 +109,10 @@ func (suite *GroupsIntgSuite) TestGroups() {
graph.InitializeConcurrencyLimiter(ctx, true, 4)
groups, err := m365.Groups(ctx, suite.acct, fault.New(true))
groups, err := m365.Groups(
ctx,
suite.acct,
fault.New(true))
assert.NoError(t, err, clues.ToCore(err))
assert.NotEmpty(t, groups)
@ -155,7 +158,10 @@ func (suite *GroupsIntgSuite) TestGroupsMap() {
graph.InitializeConcurrencyLimiter(ctx, true, 4)
gm, err := m365.GroupsMap(ctx, suite.acct, fault.New(true))
gm, err := m365.GroupsMap(
ctx,
suite.acct,
fault.New(true))
assert.NoError(t, err, clues.ToCore(err))
assert.NotEmpty(t, gm)
@ -203,7 +209,10 @@ func (suite *GroupsIntgSuite) TestGroups_InvalidCredentials() {
ctx, flush := tester.NewContext(t)
defer flush()
groups, err := m365.Groups(ctx, test.acct(t), fault.New(true))
groups, err := m365.Groups(
ctx,
test.acct(t),
fault.New(true))
assert.Empty(t, groups, "returned no groups")
assert.NotNil(t, err)
})

View File

@ -7,6 +7,7 @@ import (
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -36,7 +37,10 @@ func makeAC(
return api.Client{}, clues.Wrap(err, "getting m365 account creds").WithClues(ctx)
}
cli, err := api.NewClient(creds, control.DefaultOptions())
cli, err := api.NewClient(
creds,
control.DefaultOptions(),
count.New())
if err != nil {
return api.Client{}, clues.Wrap(err, "constructing api client").WithClues(ctx)
}