From 9ecefd2569d3b332d1081cb87d9714fb3db5641c Mon Sep 17 00:00:00 2001 From: Keepers Date: Thu, 16 Nov 2023 16:22:00 -0700 Subject: [PATCH] use counter in exchange backup enum (#4661) Increase proliferation of the count bus to record runtime stats. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :robot: Supportability/Tests #### Test Plan - [x] :zap: Unit test - [x] :green_heart: E2E --- src/internal/data/collection_test.go | 8 +- src/internal/data/implementations.go | 26 +++++- src/internal/data/item.go | 7 ++ src/internal/data/item_test.go | 7 +- src/internal/m365/backup.go | 19 ++-- src/internal/m365/backup_test.go | 3 +- .../m365/collection/drive/collection.go | 44 ++++++++- .../m365/collection/drive/collection_test.go | 18 ++-- .../m365/collection/drive/collections.go | 93 +++++++++++++++---- .../m365/collection/drive/collections_test.go | 22 +++-- src/internal/m365/collection/drive/debug.go | 4 +- .../collection/drive/item_collector_test.go | 3 +- .../m365/collection/drive/url_cache.go | 9 +- .../m365/collection/drive/url_cache_test.go | 4 + .../m365/collection/exchange/backup.go | 41 +++++--- .../m365/collection/exchange/backup_test.go | 13 ++- .../m365/collection/exchange/collection.go | 34 ++++++- .../collection/exchange/collection_test.go | 23 +++-- src/internal/m365/collection/groups/backup.go | 39 ++++++-- .../m365/collection/groups/backup_test.go | 3 + .../m365/collection/groups/collection.go | 32 ++++--- .../m365/collection/groups/collection_test.go | 7 +- src/internal/m365/collection/site/backup.go | 4 +- src/internal/m365/debug.go | 5 +- src/internal/m365/service/exchange/backup.go | 11 ++- src/internal/m365/service/groups/backup.go | 74 ++++++++++----- src/internal/m365/service/onedrive/backup.go | 23 +++-- .../m365/service/onedrive/backup_test.go | 3 +- .../m365/service/sharepoint/backup.go | 2 + .../m365/service/sharepoint/backup_test.go | 5 +- src/internal/operations/backup.go | 4 +- src/internal/operations/backup_test.go | 6 +- src/internal/operations/maintenance_test.go | 3 +- src/internal/operations/restore.go | 2 + src/pkg/count/count.go | 56 +++++++---- src/pkg/count/count_test.go | 83 ++++++++++++++++- src/pkg/count/keys.go | 93 +++++++++++++++---- .../services/m365/api/graph/collections.go | 9 +- .../m365/api/graph/collections_test.go | 3 +- .../m365/api/graph/metadata_collection.go | 9 +- .../api/graph/metadata_collection_test.go | 9 +- 41 files changed, 676 insertions(+), 187 deletions(-) diff --git a/src/internal/data/collection_test.go b/src/internal/data/collection_test.go index 92b0eae77..743e5bcbe 100644 --- a/src/internal/data/collection_test.go +++ b/src/internal/data/collection_test.go @@ -10,6 +10,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/path" ) @@ -66,7 +67,7 @@ func (suite *CollectionSuite) TestStateOf() { } for _, test := range table { suite.Run(test.name, func() { - state := StateOf(test.prev, test.curr) + state := StateOf(test.prev, test.curr, count.New()) assert.Equal(suite.T(), test.expect, state) }) } @@ -142,7 +143,8 @@ func (suite *CollectionSuite) TestNewBaseCollection() { test.previous, loc, control.Options{}, - test.doNotMerge) + test.doNotMerge, + count.New()) assert.Equal(t, test.expectCurrent, b.FullPath(), "full path") assert.Equal(t, test.expectPrev, b.PreviousPath(), "previous path") @@ -160,7 +162,7 @@ func (suite *CollectionSuite) TestNewTombstoneCollection() { fooP, err := path.Build("t", "u", path.ExchangeService, path.EmailCategory, false, "foo") require.NoError(t, err, clues.ToCore(err)) - c := NewTombstoneCollection(fooP, control.Options{}) + c := NewTombstoneCollection(fooP, control.Options{}, count.New()) assert.Nil(t, c.FullPath(), "full path") assert.Equal(t, fooP, c.PreviousPath(), "previous path") assert.Nil(t, c.LocationPath(), "location path") diff --git a/src/internal/data/implementations.go b/src/internal/data/implementations.go index c95eb339e..0d121f729 100644 --- a/src/internal/data/implementations.go +++ b/src/internal/data/implementations.go @@ -6,6 +6,7 @@ import ( "github.com/alcionai/clues" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -38,19 +39,27 @@ func (c NoFetchRestoreCollection) FetchItemByName(context.Context, string) (Item // StateOf lets us figure out the state of the collection from the // previous and current path -func StateOf(prev, curr path.Path) CollectionState { +func StateOf( + prev, curr path.Path, + counter *count.Bus, +) CollectionState { if curr == nil || len(curr.String()) == 0 { + counter.Inc(count.CollectionTombstoned) return DeletedState } if prev == nil || len(prev.String()) == 0 { + counter.Inc(count.CollectionNew) return NewState } if curr.String() != prev.String() { + counter.Inc(count.CollectionMoved) return MovedState } + counter.Inc(count.CollectionNotMoved) + return NotMovedState } @@ -63,6 +72,7 @@ func NewBaseCollection( location *path.Builder, ctrlOpts control.Options, doNotMergeItems bool, + counter *count.Bus, ) BaseCollection { return BaseCollection{ opts: ctrlOpts, @@ -70,7 +80,8 @@ func NewBaseCollection( fullPath: curr, locationPath: location, prevPath: prev, - state: StateOf(prev, curr), + state: StateOf(prev, curr, counter), + Counter: counter.Local(), } } @@ -98,6 +109,8 @@ type BaseCollection struct { // doNotMergeItems should only be true if the old delta token expired. doNotMergeItems bool + + Counter *count.Bus } // FullPath returns the BaseCollection's fullPath []string @@ -145,9 +158,16 @@ func (col BaseCollection) Opts() control.Options { func NewTombstoneCollection( prev path.Path, opts control.Options, + counter *count.Bus, ) *tombstoneCollection { return &tombstoneCollection{ - BaseCollection: NewBaseCollection(nil, prev, nil, opts, false), + BaseCollection: NewBaseCollection( + nil, + prev, + nil, + opts, + false, + counter), } } diff --git a/src/internal/data/item.go b/src/internal/data/item.go index 3cdfaaef6..50ceb0235 100644 --- a/src/internal/data/item.go +++ b/src/internal/data/item.go @@ -11,6 +11,7 @@ import ( "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" ) @@ -136,6 +137,7 @@ func NewLazyItem( itemGetter ItemDataGetter, itemID string, modTime time.Time, + counter *count.Bus, errs *fault.Bus, ) *lazyItem { return &lazyItem{ @@ -143,6 +145,7 @@ func NewLazyItem( id: itemID, itemGetter: itemGetter, modTime: modTime, + counter: counter, errs: errs, } } @@ -157,6 +160,7 @@ type lazyItem struct { ctx context.Context mu sync.Mutex id string + counter *count.Bus errs *fault.Bus itemGetter ItemDataGetter @@ -203,6 +207,7 @@ func (i *lazyItem) ToReader() io.ReadCloser { // etc.) and the item isn't enumerated in that set. if delInFlight { logger.Ctx(i.ctx).Info("item not found") + i.counter.Inc(count.LazyDeletedInFlight) i.delInFlight = true format.DelInFlight = true @@ -232,6 +237,7 @@ func NewLazyItemWithInfo( itemGetter ItemDataGetter, itemID string, modTime time.Time, + counter *count.Bus, errs *fault.Bus, ) *lazyItemWithInfo { return &lazyItemWithInfo{ @@ -240,6 +246,7 @@ func NewLazyItemWithInfo( itemGetter, itemID, modTime, + counter, errs), } } diff --git a/src/internal/data/item_test.go b/src/internal/data/item_test.go index 16dc8b117..9a7136484 100644 --- a/src/internal/data/item_test.go +++ b/src/internal/data/item_test.go @@ -16,6 +16,7 @@ import ( "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" ) @@ -74,6 +75,7 @@ func (suite *ItemUnitSuite) TestUnindexedLazyItem() { nil, "foo", time.Time{}, + count.New(), fault.New(true)) var item data.Item = lazy @@ -296,6 +298,7 @@ func (suite *ItemUnitSuite) TestLazyItem() { test.mid, id, now, + count.New(), errs) assert.Equal(t, id, item.ID(), "ID") @@ -354,7 +357,7 @@ func (suite *ItemUnitSuite) TestLazyItem_DeletedInFlight() { mid := &mockItemDataGetter{delInFlight: true} defer mid.check(t, true) - item := data.NewLazyItemWithInfo(ctx, mid, id, now, errs) + item := data.NewLazyItemWithInfo(ctx, mid, id, now, count.New(), errs) assert.Equal(t, id, item.ID(), "ID") assert.False(t, item.Deleted(), "deleted") @@ -400,7 +403,7 @@ func (suite *ItemUnitSuite) TestLazyItem_InfoBeforeReadErrors() { mid := &mockItemDataGetter{} defer mid.check(t, false) - item := data.NewLazyItemWithInfo(ctx, mid, id, now, errs) + item := data.NewLazyItemWithInfo(ctx, mid, id, now, count.New(), errs) assert.Equal(t, id, item.ID(), "ID") assert.False(t, item.Deleted(), "deleted") diff --git a/src/internal/m365/backup.go b/src/internal/m365/backup.go index 5794eee54..a9b6527f0 100644 --- a/src/internal/m365/backup.go +++ b/src/internal/m365/backup.go @@ -58,37 +58,39 @@ func (ctrl *Controller) ProduceBackupCollections( var ( colls []data.BackupCollection - ssmb *prefixmatcher.StringSetMatcher + excludeItems *prefixmatcher.StringSetMatcher canUsePreviousBackup bool ) switch service { case path.ExchangeService: - colls, ssmb, canUsePreviousBackup, err = exchange.ProduceBackupCollections( + colls, excludeItems, canUsePreviousBackup, err = exchange.ProduceBackupCollections( ctx, bpc, ctrl.AC, - ctrl.credentials.AzureTenantID, + ctrl.credentials, ctrl.UpdateStatus, + counter, errs) if err != nil { return nil, nil, false, err } case path.OneDriveService: - colls, ssmb, canUsePreviousBackup, err = onedrive.ProduceBackupCollections( + colls, excludeItems, canUsePreviousBackup, err = onedrive.ProduceBackupCollections( ctx, bpc, ctrl.AC, - ctrl.credentials.AzureTenantID, + ctrl.credentials, ctrl.UpdateStatus, + counter, errs) if err != nil { return nil, nil, false, err } case path.SharePointService: - colls, ssmb, canUsePreviousBackup, err = sharepoint.ProduceBackupCollections( + colls, excludeItems, canUsePreviousBackup, err = sharepoint.ProduceBackupCollections( ctx, bpc, ctrl.AC, @@ -101,12 +103,13 @@ func (ctrl *Controller) ProduceBackupCollections( } case path.GroupsService: - colls, ssmb, err = groups.ProduceBackupCollections( + colls, excludeItems, err = groups.ProduceBackupCollections( ctx, bpc, ctrl.AC, ctrl.credentials, ctrl.UpdateStatus, + counter, errs) if err != nil { return nil, nil, false, err @@ -132,7 +135,7 @@ func (ctrl *Controller) ProduceBackupCollections( } } - return colls, ssmb, canUsePreviousBackup, nil + return colls, excludeItems, canUsePreviousBackup, nil } func (ctrl *Controller) IsServiceEnabled( diff --git a/src/internal/m365/backup_test.go b/src/internal/m365/backup_test.go index 07f2aa8f4..bc1293157 100644 --- a/src/internal/m365/backup_test.go +++ b/src/internal/m365/backup_test.go @@ -143,8 +143,9 @@ func (suite *DataCollectionIntgSuite) TestExchangeDataCollection() { ctx, bpc, suite.ac, - suite.tenantID, + suite.ac.Credentials, ctrl.UpdateStatus, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.True(t, canUsePreviousBackup, "can use previous backup") diff --git a/src/internal/m365/collection/drive/collection.go b/src/internal/m365/collection/drive/collection.go index d06859450..b416648aa 100644 --- a/src/internal/m365/collection/drive/collection.go +++ b/src/internal/m365/collection/drive/collection.go @@ -1,4 +1,3 @@ -// Package drive provides support for retrieving M365 Drive objects package drive import ( @@ -22,6 +21,7 @@ import ( "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/extensions" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" @@ -88,6 +88,8 @@ type Collection struct { doNotMergeItems bool urlCache getItemPropertyer + + counter *count.Bus } func pathToLocation(p path.Path) (*path.Builder, error) { @@ -115,6 +117,7 @@ func NewCollection( isPackageOrChildOfPackage bool, doNotMergeItems bool, urlCache getItemPropertyer, + counter *count.Bus, ) (*Collection, error) { // TODO(ashmrtn): If OneDrive switches to using folder IDs then this will need // to be changed as we won't be able to extract path information from the @@ -140,7 +143,8 @@ func NewCollection( ctrlOpts, isPackageOrChildOfPackage, doNotMergeItems, - urlCache) + urlCache, + counter) c.locPath = locPath c.prevLocPath = prevLocPath @@ -159,6 +163,7 @@ func newColl( isPackageOrChildOfPackage bool, doNotMergeItems bool, urlCache getItemPropertyer, + counter *count.Bus, ) *Collection { dataCh := make(chan data.Item, graph.Parallelism(path.OneDriveMetadataService).CollectionBufferSize()) @@ -172,10 +177,11 @@ func newColl( data: dataCh, statusUpdater: statusUpdater, ctrl: ctrlOpts, - state: data.StateOf(prevPath, currPath), + state: data.StateOf(prevPath, currPath, counter), isPackageOrChildOfPackage: isPackageOrChildOfPackage, doNotMergeItems: doNotMergeItems, urlCache: urlCache, + counter: counter, } return c @@ -228,7 +234,7 @@ func (oc Collection) PreviousPath() path.Path { func (oc *Collection) SetFullPath(curPath path.Path) { oc.folderPath = curPath - oc.state = data.StateOf(oc.prevPath, curPath) + oc.state = data.StateOf(oc.prevPath, curPath, oc.counter) } func (oc Collection) LocationPath() *path.Builder { @@ -263,7 +269,13 @@ func (oc *Collection) getDriveItemContent( itemName = ptr.Val(item.GetName()) ) - itemData, err := downloadContent(ctx, oc.handler, oc.urlCache, item, oc.driveID) + itemData, err := downloadContent( + ctx, + oc.handler, + oc.urlCache, + item, + oc.driveID, + oc.counter) if err != nil { if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) { logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipMalware).Info("item flagged as malware") @@ -334,6 +346,7 @@ func downloadContent( uc getItemPropertyer, item models.DriveItemable, driveID string, + counter *count.Bus, ) (io.ReadCloser, error) { itemID := ptr.Val(item.GetId()) ctx = clues.Add(ctx, "item_id", itemID) @@ -359,6 +372,7 @@ func downloadContent( // to preserve existing behavior. Fallback to refetching the item using the // API. logger.CtxErr(ctx, err).Info("url cache miss: refetching from API") + counter.Inc(count.ItemDownloadURLRefetch) di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId())) if err != nil { @@ -428,7 +442,9 @@ func (oc *Collection) streamItems(ctx context.Context, errs *fault.Bus) { // `details.OneDriveInfo` parentPath, err := path.GetDriveFolderPath(oc.folderPath) if err != nil { + logger.CtxErr(ctx, err).Info("getting drive folder path") oc.reportAsCompleted(ctx, 0, 0, 0) + return } @@ -559,10 +575,15 @@ func (oc *Collection) streamDriveItem( if isFile { atomic.AddInt64(&stats.itemsFound, 1) + if oc.counter.Inc(count.StreamItemsFound)%1000 == 0 { + logger.Ctx(ctx).Infow("item stream progress", "stats", oc.counter.Values()) + } + metaFileName = itemID metaSuffix = metadata.MetaFileSuffix } else { atomic.AddInt64(&stats.dirsFound, 1) + oc.counter.Inc(count.StreamDirsFound) // metaFileName not set for directories so we get just ".dirmeta" metaSuffix = metadata.DirMetaFileSuffix @@ -588,6 +609,15 @@ func (oc *Collection) streamDriveItem( ctx = clues.Add(ctx, "item_info", itemInfo) + // Drive content download requests are also rate limited by graph api. + // Ensure that this request goes through the drive limiter & not the default + // limiter. + ctx = graph.BindRateLimiterConfig( + ctx, + graph.LimiterCfg{ + Service: path.OneDriveService, + }) + if isFile { dataSuffix := metadata.DataFileSuffix @@ -607,6 +637,7 @@ func (oc *Collection) streamDriveItem( }, itemID+dataSuffix, itemInfo.Modified(), + oc.counter, errs) } @@ -640,11 +671,14 @@ func (oc *Collection) streamDriveItem( // Item read successfully, add to collection if isFile { + oc.counter.Inc(count.StreamItemsAdded) atomic.AddInt64(&stats.itemsRead, 1) } else { + oc.counter.Inc(count.StreamDirsAdded) atomic.AddInt64(&stats.dirsRead, 1) } + oc.counter.Add(count.StreamBytesAdded, itemSize) atomic.AddInt64(&stats.byteCount, itemSize) } diff --git a/src/internal/m365/collection/drive/collection_test.go b/src/internal/m365/collection/drive/collection_test.go index a6ecd2002..6ee30143a 100644 --- a/src/internal/m365/collection/drive/collection_test.go +++ b/src/internal/m365/collection/drive/collection_test.go @@ -29,6 +29,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/extensions" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" @@ -215,7 +216,8 @@ func (suite *CollectionUnitSuite) TestCollection() { control.Options{ToggleFeatures: control.Toggles{}}, false, true, - nil) + nil, + count.New()) require.NoError(t, err, clues.ToCore(err)) require.NotNil(t, coll) assert.Equal(t, folderPath, coll.FullPath()) @@ -337,7 +339,8 @@ func (suite *CollectionUnitSuite) TestCollectionReadError() { control.Options{ToggleFeatures: control.Toggles{}}, false, true, - nil) + nil, + count.New()) require.NoError(t, err, clues.ToCore(err)) stubItem := odTD.NewStubDriveItem( @@ -415,7 +418,8 @@ func (suite *CollectionUnitSuite) TestCollectionReadUnauthorizedErrorRetry() { control.Options{ToggleFeatures: control.Toggles{}}, false, true, - nil) + nil, + count.New()) require.NoError(t, err, clues.ToCore(err)) coll.Add(stubItem) @@ -471,7 +475,8 @@ func (suite *CollectionUnitSuite) TestCollectionPermissionBackupLatestModTime() control.Options{ToggleFeatures: control.Toggles{}}, false, true, - nil) + nil, + count.New()) require.NoError(t, err, clues.ToCore(err)) mtime := time.Now().AddDate(0, -1, 0) @@ -814,7 +819,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { mbh.GetResps = resps mbh.GetErrs = test.getErr - r, err := downloadContent(ctx, mbh, test.muc, item, driveID) + r, err := downloadContent(ctx, mbh, test.muc, item, driveID, count.New()) test.expect(t, r) test.expectErr(t, err, clues.ToCore(err)) }) @@ -1002,7 +1007,8 @@ func (suite *CollectionUnitSuite) TestItemExtensions() { opts, false, true, - nil) + nil, + count.New()) require.NoError(t, err, clues.ToCore(err)) stubItem := odTD.NewStubDriveItem( diff --git a/src/internal/m365/collection/drive/collections.go b/src/internal/m365/collection/drive/collections.go index a26e01411..ce93a9fa8 100644 --- a/src/internal/m365/collection/drive/collections.go +++ b/src/internal/m365/collection/drive/collections.go @@ -19,6 +19,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/support" bupMD "github.com/alcionai/corso/src/pkg/backup/metadata" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/filters" "github.com/alcionai/corso/src/pkg/logger" @@ -51,6 +52,8 @@ type Collections struct { NumItems int NumFiles int NumContainers int + + counter *count.Bus } func NewCollections( @@ -59,6 +62,7 @@ func NewCollections( protectedResource idname.Provider, statusUpdater support.StatusUpdater, ctrlOpts control.Options, + counter *count.Bus, ) *Collections { return &Collections{ handler: bh, @@ -67,6 +71,7 @@ func NewCollections( CollectionMap: map[string]map[string]*Collection{}, statusUpdater: statusUpdater, ctrl: ctrlOpts, + counter: counter, } } @@ -79,6 +84,7 @@ func (c *Collections) resetStats() { func deserializeAndValidateMetadata( ctx context.Context, cols []data.RestoreCollection, + counter *count.Bus, fb *fault.Bus, ) (map[string]string, map[string]map[string]string, bool, error) { deltas, prevs, canUse, err := DeserializeMetadata(ctx, cols) @@ -117,7 +123,7 @@ func deserializeAndValidateMetadata( } } - alertIfPrevPathsHaveCollisions(ctx, prevs, fb) + alertIfPrevPathsHaveCollisions(ctx, prevs, counter, fb) return deltas, prevs, canUse, nil } @@ -125,6 +131,7 @@ func deserializeAndValidateMetadata( func alertIfPrevPathsHaveCollisions( ctx context.Context, prevs map[string]map[string]string, + counter *count.Bus, fb *fault.Bus, ) { for driveID, folders := range prevs { @@ -150,6 +157,8 @@ func alertIfPrevPathsHaveCollisions( "collision_drive_id": driveID, "collision_prev_path": prev, })) + + counter.Inc(count.PreviousPathMetadataCollision) } prevPathCollisions[prev] = fid @@ -274,7 +283,11 @@ func (c *Collections) Get( ssmb *prefixmatcher.StringSetMatchBuilder, errs *fault.Bus, ) ([]data.BackupCollection, bool, error) { - deltasByDriveID, prevPathsByDriveID, canUsePrevBackup, err := deserializeAndValidateMetadata(ctx, prevMetadata, errs) + deltasByDriveID, prevPathsByDriveID, canUsePrevBackup, err := deserializeAndValidateMetadata( + ctx, + prevMetadata, + c.counter, + errs) if err != nil { return nil, false, err } @@ -295,6 +308,9 @@ func (c *Collections) Get( return nil, false, err } + c.counter.Add(count.Drives, int64(len(drives))) + c.counter.Add(count.PrevDeltas, int64(len(deltasByDriveID))) + var ( driveIDToDeltaLink = map[string]string{} driveIDToPrevPaths = map[string]map[string]string{} @@ -303,6 +319,7 @@ func (c *Collections) Get( for _, d := range drives { var ( + cl = c.counter.Local() driveID = ptr.Val(d.GetId()) driveName = ptr.Val(d.GetName()) ictx = clues.Add( @@ -322,6 +339,8 @@ func (c *Collections) Get( packagePaths = map[string]struct{}{} ) + ictx = clues.AddLabelCounter(ictx, cl.PlainAdder()) + delete(driveTombstones, driveID) if _, ok := driveIDToPrevPaths[driveID]; !ok { @@ -332,6 +351,7 @@ func (c *Collections) Get( c.CollectionMap[driveID] = map[string]*Collection{} } + cl.Add(count.PrevPaths, int64(len(oldPrevPaths))) logger.Ctx(ictx).Infow( "previous metadata for drive", "count_old_prev_paths", len(oldPrevPaths)) @@ -344,6 +364,7 @@ func (c *Collections) Get( excludedItemIDs, packagePaths, prevDeltaLink, + cl.Local(), errs) if err != nil { return nil, false, clues.Stack(err) @@ -373,6 +394,8 @@ func (c *Collections) Get( numDriveItems := c.NumItems - numPrevItems numPrevItems = c.NumItems + cl.Add(count.NewPrevPaths, int64(len(newPrevPaths))) + // Attach an url cache to the drive if the number of discovered items is // below the threshold. Attaching cache to larger drives can cause // performance issues since cache delta queries start taking up majority of @@ -387,6 +410,7 @@ func (c *Collections) Get( prevDeltaLink, urlCacheRefreshInterval, c.handler, + cl, errs) if err != nil { return nil, false, clues.Stack(err) @@ -446,7 +470,8 @@ func (c *Collections) Get( c.ctrl, false, true, - nil) + nil, + cl.Local()) if err != nil { return nil, false, clues.WrapWC(ictx, err, "making collection") } @@ -464,11 +489,13 @@ func (c *Collections) Get( } } + c.counter.Add(count.DriveTombstones, int64(len(driveTombstones))) + // generate tombstones for drives that were removed. for driveID := range driveTombstones { prevDrivePath, err := c.handler.PathPrefix(c.tenantID, driveID) if err != nil { - return nil, false, clues.WrapWC(ctx, err, "making drive tombstone for previous path") + return nil, false, clues.WrapWC(ctx, err, "making drive tombstone for previous path").Label(count.BadPathPrefix) } coll, err := NewCollection( @@ -481,7 +508,8 @@ func (c *Collections) Get( c.ctrl, false, true, - nil) + nil, + c.counter.Local()) if err != nil { return nil, false, clues.WrapWC(ctx, err, "making drive tombstone") } @@ -489,7 +517,7 @@ func (c *Collections) Get( collections = append(collections, coll) } - alertIfPrevPathsHaveCollisions(ctx, driveIDToPrevPaths, errs) + alertIfPrevPathsHaveCollisions(ctx, driveIDToPrevPaths, c.counter, errs) // add metadata collections pathPrefix, err := c.handler.MetadataPathPrefix(c.tenantID) @@ -508,7 +536,8 @@ func (c *Collections) Get( graph.NewMetadataEntry(bupMD.PreviousPathFileName, driveIDToPrevPaths), graph.NewMetadataEntry(bupMD.DeltaURLsFileName, driveIDToDeltaLink), }, - c.statusUpdater) + c.statusUpdater, + count.New()) if err != nil { // Technically it's safe to continue here because the logic for starting an @@ -563,13 +592,17 @@ func updateCollectionPaths( } func (c *Collections) handleDelete( + ctx context.Context, itemID, driveID string, oldPrevPaths, currPrevPaths, newPrevPaths map[string]string, isFolder bool, excluded map[string]struct{}, invalidPrevDelta bool, + counter *count.Bus, ) error { if !isFolder { + counter.Inc(count.DeleteItemMarker) + // Try to remove the item from the Collection if an entry exists for this // item. This handles cases where an item was created and deleted during the // same delta query. @@ -597,6 +630,8 @@ func (c *Collections) handleDelete( return nil } + counter.Inc(count.DeleteFolderMarker) + var prevPath path.Path prevPathStr, ok := oldPrevPaths[itemID] @@ -605,11 +640,12 @@ func (c *Collections) handleDelete( prevPath, err = path.FromDataLayerPath(prevPathStr, false) if err != nil { - return clues.Wrap(err, "invalid previous path"). + return clues.WrapWC(ctx, err, "invalid previous path"). With( "drive_id", driveID, "item_id", itemID, - "path_string", prevPathStr) + "path_string", prevPathStr). + Label(count.BadPrevPath) } } @@ -644,7 +680,8 @@ func (c *Collections) handleDelete( false, // DoNotMerge is not checked for deleted items. false, - nil) + nil, + counter.Local()) if err != nil { return clues.Wrap(err, "making collection").With( "drive_id", driveID, @@ -716,6 +753,7 @@ func (c *Collections) PopulateDriveCollections( excludedItemIDs map[string]struct{}, topLevelPackages map[string]struct{}, prevDeltaLink string, + counter *count.Bus, errs *fault.Bus, ) (pagers.DeltaUpdate, map[string]string, error) { var ( @@ -755,7 +793,11 @@ func (c *Collections) PopulateDriveCollections( break } + counter.Inc(count.PagesEnumerated) + if reset { + counter.Inc(count.PagerResets) + ctx = clues.Add(ctx, "delta_reset_occurred", true) newPrevPaths = map[string]string{} currPrevPaths = map[string]string{} @@ -783,6 +825,7 @@ func (c *Collections) PopulateDriveCollections( excludedItemIDs, topLevelPackages, invalidPrevDelta, + counter, el) if err != nil { el.AddRecoverable(ctx, clues.Stack(err)) @@ -795,6 +838,8 @@ func (c *Collections) PopulateDriveCollections( return du, nil, clues.Stack(err) } + logger.Ctx(ctx).Infow("populated collection", "stats", counter.Values()) + return du, newPrevPaths, el.Failure() } @@ -807,6 +852,7 @@ func (c *Collections) processItem( excludedItemIDs map[string]struct{}, topLevelPackages map[string]struct{}, invalidPrevDelta bool, + counter *count.Bus, skipper fault.AddSkipper, ) error { var ( @@ -831,6 +877,7 @@ func (c *Collections) processItem( skipper.AddSkip(ctx, skip) logger.Ctx(ctx).Infow("malware detected", "item_details", addtl) + counter.Inc(count.Malware) return nil } @@ -838,6 +885,7 @@ func (c *Collections) processItem( // Deleted file or folder. if item.GetDeleted() != nil { err := c.handleDelete( + ctx, itemID, driveID, oldPrevPaths, @@ -845,20 +893,22 @@ func (c *Collections) processItem( newPrevPaths, isFolder, excludedItemIDs, - invalidPrevDelta) + invalidPrevDelta, + counter) return clues.StackWC(ctx, err).OrNil() } collectionPath, err := c.getCollectionPath(driveID, item) if err != nil { - return clues.StackWC(ctx, err). - Label(fault.LabelForceNoBackupCreation) + return clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation, count.BadCollPath) } // Skip items that don't match the folder selectors we were given. if shouldSkip(ctx, collectionPath, c.handler, driveName) { + counter.Inc(count.SkippedContainers) logger.Ctx(ctx).Debugw("path not selected", "skipped_path", collectionPath.String()) + return nil } @@ -872,7 +922,8 @@ func (c *Collections) processItem( prevPath, err = path.FromDataLayerPath(prevPathStr, false) if err != nil { return clues.WrapWC(ctx, err, "invalid previous path"). - With("prev_path_string", path.LoggableDir(prevPathStr)) + With("prev_path_string", path.LoggableDir(prevPathStr)). + Label(count.BadPrevPath) } } else if item.GetRoot() != nil { // Root doesn't move or get renamed. @@ -899,9 +950,12 @@ func (c *Collections) processItem( isPackage := item.GetPackageEscaped() != nil if isPackage { + counter.Inc(count.Packages) // mark this path as a package type for all other collections. // any subfolder should get marked as a childOfPackage below. topLevelPackages[collectionPath.String()] = struct{}{} + } else { + counter.Inc(count.Folders) } childOfPackage := filters. @@ -944,7 +998,8 @@ func (c *Collections) processItem( c.ctrl, isPackage || childOfPackage, invalidPrevDelta || collPathAlreadyExists, - nil) + nil, + counter.Local()) if err != nil { return clues.StackWC(ctx, err) } @@ -966,9 +1021,11 @@ func (c *Collections) processItem( } case item.GetFile() != nil: + counter.Inc(count.Files) + // Deletions are handled above so this is just moves/renames. if len(ptr.Val(item.GetParentReference().GetId())) == 0 { - return clues.NewWC(ctx, "file without parent ID") + return clues.NewWC(ctx, "file without parent ID").Label(count.MissingParent) } // Get the collection for this item. @@ -977,7 +1034,7 @@ func (c *Collections) processItem( collection, ok := c.CollectionMap[driveID][parentID] if !ok { - return clues.NewWC(ctx, "item seen before parent folder") + return clues.NewWC(ctx, "item seen before parent folder").Label(count.ItemBeforeParent) } // This will only kick in if the file was moved multiple times @@ -1021,7 +1078,7 @@ func (c *Collections) processItem( default: return clues.NewWC(ctx, "item is neither folder nor file"). - Label(fault.LabelForceNoBackupCreation) + Label(fault.LabelForceNoBackupCreation, count.UnknownItemType) } return nil diff --git a/src/internal/m365/collection/drive/collections_test.go b/src/internal/m365/collection/drive/collections_test.go index f2e866c65..4065b39d3 100644 --- a/src/internal/m365/collection/drive/collections_test.go +++ b/src/internal/m365/collection/drive/collections_test.go @@ -25,6 +25,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" bupMD "github.com/alcionai/corso/src/pkg/backup/metadata" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" @@ -1158,7 +1159,8 @@ func (suite *CollectionsUnitSuite) TestPopulateDriveCollections() { tenant, idname.NewProvider(user, user), nil, - control.Options{ToggleFeatures: control.Toggles{}}) + control.Options{ToggleFeatures: control.Toggles{}}, + count.New()) c.CollectionMap[driveID] = map[string]*Collection{} @@ -1170,6 +1172,7 @@ func (suite *CollectionsUnitSuite) TestPopulateDriveCollections() { excludes, test.topLevelPackages, "prevdelta", + count.New(), errs) test.expect(t, err, clues.ToCore(err)) assert.ElementsMatch( @@ -1618,7 +1621,8 @@ func (suite *CollectionsUnitSuite) TestDeserializeMetadata() { mc, err := graph.MakeMetadataCollection( pathPrefix, c(), - func(*support.ControllerOperationStatus) {}) + func(*support.ControllerOperationStatus) {}, + count.New()) require.NoError(t, err, clues.ToCore(err)) cols = append(cols, dataMock.NewUnversionedRestoreCollection( @@ -1628,7 +1632,7 @@ func (suite *CollectionsUnitSuite) TestDeserializeMetadata() { fb := fault.New(true) - deltas, paths, canUsePreviousBackup, err := deserializeAndValidateMetadata(ctx, cols, fb) + deltas, paths, canUsePreviousBackup, err := deserializeAndValidateMetadata(ctx, cols, count.New(), fb) test.errCheck(t, err) assert.Equal(t, test.canUsePreviousBackup, canUsePreviousBackup, "can use previous backup") @@ -1656,7 +1660,7 @@ func (suite *CollectionsUnitSuite) TestDeserializeMetadata_ReadFailure() { fc := failingColl{} - _, _, canUsePreviousBackup, err := deserializeAndValidateMetadata(ctx, []data.RestoreCollection{fc}, fault.New(true)) + _, _, canUsePreviousBackup, err := deserializeAndValidateMetadata(ctx, []data.RestoreCollection{fc}, count.New(), fault.New(true)) require.NoError(t, err) require.False(t, canUsePreviousBackup) } @@ -3427,7 +3431,8 @@ func (suite *CollectionsUnitSuite) TestGet() { tenant, idname.NewProvider(user, user), func(*support.ControllerOperationStatus) {}, - control.Options{ToggleFeatures: control.Toggles{}}) + control.Options{ToggleFeatures: control.Toggles{}}, + count.New()) prevDelta := "prev-delta" @@ -3447,7 +3452,8 @@ func (suite *CollectionsUnitSuite) TestGet() { bupMD.PreviousPathFileName, test.previousPaths), }, - func(*support.ControllerOperationStatus) {}) + func(*support.ControllerOperationStatus) {}, + count.New()) assert.NoError(t, err, "creating metadata collection", clues.ToCore(err)) prevMetadata := []data.RestoreCollection{ @@ -3484,6 +3490,7 @@ func (suite *CollectionsUnitSuite) TestGet() { t, data.NoFetchRestoreCollection{Collection: baseCol}), }, + count.New(), errs) if !assert.NoError(t, err, "deserializing metadata", clues.ToCore(err)) { continue @@ -3616,7 +3623,8 @@ func (suite *CollectionsUnitSuite) TestAddURLCacheToDriveCollections() { tenant, idname.NewProvider(user, user), func(*support.ControllerOperationStatus) {}, - control.Options{ToggleFeatures: control.Toggles{}}) + control.Options{ToggleFeatures: control.Toggles{}}, + count.New()) errs := fault.New(true) delList := prefixmatcher.NewStringSetBuilder() diff --git a/src/internal/m365/collection/drive/debug.go b/src/internal/m365/collection/drive/debug.go index 7a4f5c46c..e6708dd76 100644 --- a/src/internal/m365/collection/drive/debug.go +++ b/src/internal/m365/collection/drive/debug.go @@ -7,6 +7,7 @@ import ( "github.com/alcionai/corso/src/internal/data" bupMD "github.com/alcionai/corso/src/pkg/backup/metadata" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/store" ) @@ -14,8 +15,9 @@ import ( func DeserializeMetadataFiles( ctx context.Context, colls []data.RestoreCollection, + counter *count.Bus, ) ([]store.MetadataFile, error) { - deltas, prevs, _, err := deserializeAndValidateMetadata(ctx, colls, fault.New(true)) + deltas, prevs, _, err := deserializeAndValidateMetadata(ctx, colls, counter, fault.New(true)) files := []store.MetadataFile{ { diff --git a/src/internal/m365/collection/drive/item_collector_test.go b/src/internal/m365/collection/drive/item_collector_test.go index 8a199a507..957722705 100644 --- a/src/internal/m365/collection/drive/item_collector_test.go +++ b/src/internal/m365/collection/drive/item_collector_test.go @@ -284,7 +284,8 @@ func (suite *OneDriveIntgSuite) TestOneDriveNewCollections() { service.updateStatus, control.Options{ ToggleFeatures: control.Toggles{}, - }) + }, + count.New()) ssmb := prefixmatcher.NewStringSetBuilder() diff --git a/src/internal/m365/collection/drive/url_cache.go b/src/internal/m365/collection/drive/url_cache.go index be5c49098..ba082ac9c 100644 --- a/src/internal/m365/collection/drive/url_cache.go +++ b/src/internal/m365/collection/drive/url_cache.go @@ -10,6 +10,7 @@ import ( "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/str" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/services/m365/api" @@ -49,7 +50,8 @@ type urlCache struct { enumerator EnumerateDriveItemsDeltaer - errs *fault.Bus + counter *count.Bus + errs *fault.Bus } // newURLache creates a new URL cache for the specified drive ID @@ -57,6 +59,7 @@ func newURLCache( driveID, prevDelta string, refreshInterval time.Duration, enumerator EnumerateDriveItemsDeltaer, + counter *count.Bus, errs *fault.Bus, ) (*urlCache, error) { err := validateCacheParams(driveID, refreshInterval, enumerator) @@ -71,6 +74,7 @@ func newURLCache( enumerator: enumerator, prevDelta: prevDelta, refreshInterval: refreshInterval, + counter: counter, errs: errs, }, nil @@ -148,6 +152,8 @@ func (uc *urlCache) refreshCache( return nil } + uc.counter.Inc(count.URLCacheRefresh) + // Hold cache lock in write mode for the entire duration of the refresh. // This is to prevent other threads from reading the cache while it is // being updated page by page @@ -201,6 +207,7 @@ func (uc *urlCache) readCache( props, ok := uc.idToProps[itemID] if !ok { + uc.counter.Inc(count.URLCacheMiss) return itemProps{}, clues.NewWC(ctx, "item not found in cache") } diff --git a/src/internal/m365/collection/drive/url_cache_test.go b/src/internal/m365/collection/drive/url_cache_test.go index 42f565cdb..90c8d129a 100644 --- a/src/internal/m365/collection/drive/url_cache_test.go +++ b/src/internal/m365/collection/drive/url_cache_test.go @@ -148,6 +148,7 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() { du.URL, 1*time.Hour, suite.ac.Drives(), + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -578,6 +579,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() { "", 1*time.Hour, &medi, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -622,6 +624,7 @@ func (suite *URLCacheUnitSuite) TestNeedsRefresh() { "", refreshInterval, &mock.EnumerateItemsDeltaByDrive{}, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -694,6 +697,7 @@ func (suite *URLCacheUnitSuite) TestNewURLCache() { "", test.refreshInt, test.itemPager, + count.New(), test.errors) test.expectErr(t, err, clues.ToCore(err)) diff --git a/src/internal/m365/collection/exchange/backup.go b/src/internal/m365/collection/exchange/backup.go index f1dcc4a72..550d6227a 100644 --- a/src/internal/m365/collection/exchange/backup.go +++ b/src/internal/m365/collection/exchange/backup.go @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/pkg/backup/metadata" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -31,6 +32,7 @@ func CreateCollections( scope selectors.ExchangeScope, dps metadata.DeltaPaths, su support.StatusUpdater, + counter *count.Bus, errs *fault.Bus, ) ([]data.BackupCollection, error) { ctx = clues.Add(ctx, "category", scope.Category().PathType()) @@ -78,11 +80,14 @@ func CreateCollections( scope, dps, bpc.Options, + counter, errs) if err != nil { return nil, clues.Wrap(err, "filling collections") } + counter.Add(count.Collections, int64(len(collections))) + for _, coll := range collections { allCollections = append(allCollections, coll) } @@ -108,6 +113,7 @@ func populateCollections( scope selectors.ExchangeScope, dps metadata.DeltaPaths, ctrlOpts control.Options, + counter *count.Bus, errs *fault.Bus, ) (map[string]data.BackupCollection, error) { var ( @@ -123,6 +129,7 @@ func populateCollections( ) logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps)) + counter.Add(count.PrevDeltas, int64(len(dps))) el := errs.Local() @@ -133,6 +140,7 @@ func populateCollections( var ( err error + cl = counter.Local() itemConfig = api.CallConfig{ CanMakeDeltaQueries: !ctrlOpts.ToggleFeatures.DisableDelta, UseImmutableIDs: ctrlOpts.ToggleFeatures.ExchangeImmutableIDs, @@ -152,9 +160,12 @@ func populateCollections( }) ) + ictx = clues.AddLabelCounter(ictx, cl.PlainAdder()) + // Only create a collection if the path matches the scope. currPath, locPath, ok := includeContainer(ictx, qp, c, scope, category) if !ok { + cl.Inc(count.SkippedContainers) continue } @@ -167,6 +178,7 @@ func populateCollections( if len(prevPathStr) > 0 { if prevPath, err = pathFromPrevString(prevPathStr); err != nil { + err = clues.Stack(err).Label(count.BadPrevPath) logger.CtxErr(ictx, err).Error("parsing prev path") // if the previous path is unusable, then the delta must be, too. prevDelta = "" @@ -200,6 +212,7 @@ func populateCollections( deltaURLs[cID] = addAndRem.DU.URL } else if !addAndRem.DU.Reset { logger.Ctx(ictx).Info("missing delta url") + cl.Inc(count.MissingDelta) } edc := NewCollection( @@ -208,17 +221,19 @@ func populateCollections( prevPath, locPath, ctrlOpts, - addAndRem.DU.Reset), + addAndRem.DU.Reset, + cl), qp.ProtectedResource.ID(), bh.itemHandler(), addAndRem.Added, addAndRem.Removed, // TODO: produce a feature flag that allows selective // enabling of valid modTimes. This currently produces - // rare-case failures with incorrect details merging. + // rare failures with incorrect details merging. // Root cause is not yet known. false, - statusUpdater) + statusUpdater, + cl) collections[cID] = edc @@ -242,7 +257,10 @@ func populateCollections( ) if collections[id] != nil { - el.AddRecoverable(ctx, clues.WrapWC(ictx, err, "conflict: tombstone exists for a live collection")) + err := clues.WrapWC(ictx, err, "conflict: tombstone exists for a live collection"). + Label(count.CollectionTombstoneConflict) + el.AddRecoverable(ctx, err) + continue } @@ -254,18 +272,18 @@ func populateCollections( prevPath, err := pathFromPrevString(p) if err != nil { + err = clues.StackWC(ctx, err).Label(count.BadPrevPath) // technically shouldn't ever happen. But just in case... logger.CtxErr(ictx, err).Error("parsing tombstone prev path") + continue } - collections[id] = data.NewTombstoneCollection(prevPath, ctrlOpts) + collections[id] = data.NewTombstoneCollection(prevPath, ctrlOpts, counter) } - logger.Ctx(ctx).Infow( - "adding metadata collection entries", - "num_paths_entries", len(currPaths), - "num_deltas_entries", len(deltaURLs)) + counter.Add(count.NewDeltas, int64(len(deltaURLs))) + counter.Add(count.NewPrevPaths, int64(len(currPaths))) pathPrefix, err := path.BuildMetadata( qp.TenantID, @@ -283,15 +301,14 @@ func populateCollections( graph.NewMetadataEntry(metadata.PreviousPathFileName, currPaths), graph.NewMetadataEntry(metadata.DeltaURLsFileName, deltaURLs), }, - statusUpdater) + statusUpdater, + count.New()) if err != nil { return nil, clues.Wrap(err, "making metadata collection") } collections["metadata"] = col - logger.Ctx(ctx).Infow("produced collections", "count_collections", len(collections)) - return collections, el.Failure() } diff --git a/src/internal/m365/collection/exchange/backup_test.go b/src/internal/m365/collection/exchange/backup_test.go index c032cb063..571817a61 100644 --- a/src/internal/m365/collection/exchange/backup_test.go +++ b/src/internal/m365/collection/exchange/backup_test.go @@ -332,7 +332,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { coll, err := graph.MakeMetadataCollection( pathPrefix, entries, - func(cos *support.ControllerOperationStatus) {}) + func(cos *support.ControllerOperationStatus) {}, + count.New()) require.NoError(t, err, clues.ToCore(err)) cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{ @@ -513,6 +514,7 @@ func (suite *BackupIntgSuite) TestMailFetch() { test.scope, metadata.DeltaPaths{}, func(status *support.ControllerOperationStatus) {}, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -593,6 +595,7 @@ func (suite *BackupIntgSuite) TestDelta() { test.scope, metadata.DeltaPaths{}, func(status *support.ControllerOperationStatus) {}, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) assert.Less(t, 1, len(collections), "retrieved metadata and data collections") @@ -625,6 +628,7 @@ func (suite *BackupIntgSuite) TestDelta() { test.scope, dps, func(status *support.ControllerOperationStatus) {}, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) }) @@ -664,6 +668,7 @@ func (suite *BackupIntgSuite) TestMailSerializationRegression() { sel.Scopes()[0], metadata.DeltaPaths{}, newStatusUpdater(t, &wg), + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -750,6 +755,7 @@ func (suite *BackupIntgSuite) TestContactSerializationRegression() { test.scope, metadata.DeltaPaths{}, newStatusUpdater(t, &wg), + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -859,6 +865,7 @@ func (suite *BackupIntgSuite) TestEventsSerializationRegression() { test.scope, metadata.DeltaPaths{}, newStatusUpdater(t, &wg), + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) require.Len(t, collections, 2) @@ -1135,6 +1142,7 @@ func (suite *CollectionPopulationSuite) TestPopulateCollections() { test.scope, dps, ctrlOpts, + count.New(), fault.New(test.failFast == control.FailFast)) test.expectErr(t, err, clues.ToCore(err)) @@ -1477,6 +1485,7 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_D sc.scope, test.inputMetadata(t, qp.Category), control.Options{FailureHandling: control.FailFast}, + count.New(), fault.New(true)) require.NoError(t, err, "getting collections", clues.ToCore(err)) @@ -1643,6 +1652,7 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_r allScope, dps, control.Options{FailureHandling: control.FailFast}, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -2064,6 +2074,7 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_i allScope, test.dps, ctrlOpts, + count.New(), fault.New(true)) assert.NoError(t, err, clues.ToCore(err)) diff --git a/src/internal/m365/collection/exchange/collection.go b/src/internal/m365/collection/exchange/collection.go index e704fc043..306e0b8fa 100644 --- a/src/internal/m365/collection/exchange/collection.go +++ b/src/internal/m365/collection/exchange/collection.go @@ -18,6 +18,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -110,6 +111,7 @@ func NewCollection( origRemoved []string, validModTimes bool, statusUpdater support.StatusUpdater, + counter *count.Bus, ) data.BackupCollection { added := maps.Clone(origAdded) removed := make(map[string]struct{}, len(origRemoved)) @@ -127,6 +129,9 @@ func NewCollection( removed[r] = struct{}{} } + counter.Add(count.ItemsAdded, int64(len(added))) + counter.Add(count.ItemsRemoved, int64(len(removed))) + if !validModTimes { return &prefetchCollection{ BaseCollection: bc, @@ -145,6 +150,7 @@ func NewCollection( removed: removed, getter: items, statusUpdater: statusUpdater, + counter: counter, } } @@ -188,13 +194,17 @@ func (col *prefetchCollection) streamItems( colProgress chan<- struct{} user = col.user - log = logger.Ctx(ctx).With( - "service", path.ExchangeService.String(), - "category", col.Category().String()) ) + ctx = clues.Add( + ctx, + "category", col.Category().String()) + defer func() { close(stream) + logger.Ctx(ctx).Infow( + "finished stream backup collection items", + "stats", col.Counter.Values()) updateStatus( ctx, col.statusUpdater, @@ -228,6 +238,10 @@ func (col *prefetchCollection) streamItems( stream <- data.NewDeletedItem(id) + if col.Counter.Inc(count.StreamItemsRemoved)%1000 == 0 { + logger.Ctx(ctx).Infow("item removal stream progress", "stats", col.Counter.Values()) + } + atomic.AddInt64(&success, 1) if colProgress != nil { @@ -268,9 +282,11 @@ func (col *prefetchCollection) streamItems( // nothing else we can do, and not reporting it will make the status // investigation upset. if graph.IsErrDeletedInFlight(err) { + col.Counter.Inc(count.StreamItemsDeletedInFlight) atomic.AddInt64(&success, 1) - log.With("err", err).Infow("item not found", clues.InErr(err).Slice()...) + logger.CtxErr(ctx, err).Info("item not found") } else { + col.Counter.Inc(count.StreamItemsErrored) el.AddRecoverable(ctx, clues.Wrap(err, "fetching item").Label(fault.LabelForceNoBackupCreation)) } @@ -282,6 +298,7 @@ func (col *prefetchCollection) streamItems( id, details.ItemInfo{Exchange: info}) if err != nil { + col.Counter.Inc(count.StreamItemsErrored) el.AddRecoverable( ctx, clues.StackWC(ctx, err). @@ -292,6 +309,12 @@ func (col *prefetchCollection) streamItems( stream <- item + col.Counter.Add(count.StreamBytesAdded, info.Size) + + if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 { + logger.Ctx(ctx).Infow("item addition stream progress", "stats", col.Counter.Values()) + } + atomic.AddInt64(&success, 1) atomic.AddInt64(&totalBytes, info.Size) @@ -329,6 +352,8 @@ type lazyFetchCollection struct { getter itemGetterSerializer statusUpdater support.StatusUpdater + + counter *count.Bus } // Items utility function to asynchronously execute process to fill data channel with @@ -413,6 +438,7 @@ func (col *lazyFetchCollection) streamItems( }, id, modTime, + col.counter, errs) atomic.AddInt64(&success, 1) diff --git a/src/internal/m365/collection/exchange/collection_test.go b/src/internal/m365/collection/exchange/collection_test.go index 4f8d51921..baec20c3d 100644 --- a/src/internal/m365/collection/exchange/collection_test.go +++ b/src/internal/m365/collection/exchange/collection_test.go @@ -24,6 +24,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api/graph" @@ -146,13 +147,15 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() { test.prev, test.loc, control.DefaultOptions(), - false), + false, + count.New()), "u", mock.DefaultItemGetSerialize(), nil, nil, colType.validModTimes, - nil) + nil, + count.New()) assert.Equal(t, test.expect, c.State(), "collection state") assert.Equal(t, test.curr, c.FullPath(), "full path") assert.Equal(t, test.prev, c.PreviousPath(), "prev path") @@ -289,13 +292,15 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_Items() { nil, locPath.ToBuilder(), control.DefaultOptions(), - false), + false, + count.New()), "", &mock.ItemGetSerialize{}, test.added, maps.Keys(test.removed), false, - statusUpdater) + statusUpdater, + count.New()) for item := range col.Items(ctx, errs) { itemCount++ @@ -427,13 +432,15 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() { nil, locPath.ToBuilder(), control.DefaultOptions(), - false), + false, + count.New()), "", mlg, test.added, maps.Keys(test.removed), true, - statusUpdater) + statusUpdater, + count.New()) for item := range col.Items(ctx, errs) { itemCount++ @@ -499,6 +506,7 @@ func (suite *CollectionUnitSuite) TestLazyItem_NoRead_GetInfo_Errors() { nil, "itemID", time.Now(), + count.New(), fault.New(true)) _, err := li.Info() @@ -564,6 +572,7 @@ func (suite *CollectionUnitSuite) TestLazyItem_GetDataErrors() { }, "itemID", now, + count.New(), fault.New(true)) assert.False(t, li.Deleted(), "item shouldn't be marked deleted") @@ -604,6 +613,7 @@ func (suite *CollectionUnitSuite) TestLazyItem_ReturnsEmptyReaderOnDeletedInFlig }, "itemID", now, + count.New(), fault.New(true)) assert.False(t, li.Deleted(), "item shouldn't be marked deleted") @@ -657,6 +667,7 @@ func (suite *CollectionUnitSuite) TestLazyItem() { }, "itemID", now, + count.New(), fault.New(true)) assert.False(t, li.Deleted(), "item shouldn't be marked deleted") diff --git a/src/internal/m365/collection/groups/backup.go b/src/internal/m365/collection/groups/backup.go index a19f0e8b5..0c66a2750 100644 --- a/src/internal/m365/collection/groups/backup.go +++ b/src/internal/m365/collection/groups/backup.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/pkg/backup/metadata" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -38,6 +39,7 @@ func CreateCollections( tenantID string, scope selectors.GroupsScope, su support.StatusUpdater, + counter *count.Bus, errs *fault.Bus, ) ([]data.BackupCollection, bool, error) { ctx = clues.Add(ctx, "category", scope.Category().PathType()) @@ -64,6 +66,8 @@ func CreateCollections( return nil, false, clues.Stack(err) } + counter.Add(count.Channels, int64(len(channels))) + collections, err := populateCollections( ctx, qp, @@ -73,6 +77,7 @@ func CreateCollections( scope, cdps[scope.Category().PathType()], bpc.Options, + counter, errs) if err != nil { return nil, false, clues.Wrap(err, "filling collections") @@ -94,6 +99,7 @@ func populateCollections( scope selectors.GroupsScope, dps metadata.DeltaPaths, ctrlOpts control.Options, + counter *count.Bus, errs *fault.Bus, ) (map[string]data.BackupCollection, error) { var ( @@ -117,6 +123,7 @@ func populateCollections( } var ( + cl = counter.Local() cID = ptr.Val(c.GetId()) cName = ptr.Val(c.GetDisplayName()) err error @@ -134,15 +141,19 @@ func populateCollections( }) ) + ictx = clues.AddLabelCounter(ictx, cl.PlainAdder()) + delete(tombstones, cID) // Only create a collection if the path matches the scope. if !bh.includeContainer(ictx, qp, c, scope) { + cl.Inc(count.SkippedContainers) continue } if len(prevPathStr) > 0 { if prevPath, err = pathFromPrevString(prevPathStr); err != nil { + err = clues.StackWC(ctx, err).Label(count.BadPrevPath) logger.CtxErr(ictx, err).Error("parsing prev path") // if the previous path is unusable, then the delta must be, too. prevDelta = "" @@ -166,6 +177,9 @@ func populateCollections( added := str.SliceToMap(maps.Keys(addAndRem.Added)) removed := str.SliceToMap(addAndRem.Removed) + cl.Add(count.ItemsAdded, int64(len(added))) + cl.Add(count.ItemsRemoved, int64(len(removed))) + if len(addAndRem.DU.URL) > 0 { deltaURLs[cID] = addAndRem.DU.URL } else if !addAndRem.DU.Reset { @@ -174,7 +188,9 @@ func populateCollections( currPath, err := bh.canonicalPath(path.Builder{}.Append(cID), qp.TenantID) if err != nil { - el.AddRecoverable(ctx, clues.Stack(err)) + err = clues.StackWC(ctx, err).Label(count.BadCollPath) + el.AddRecoverable(ctx, err) + continue } @@ -191,7 +207,8 @@ func populateCollections( prevPath, path.Builder{}.Append(cName), ctrlOpts, - addAndRem.DU.Reset), + addAndRem.DU.Reset, + cl), bh, qp.ProtectedResource.ID(), added, @@ -219,7 +236,9 @@ func populateCollections( ) if collections[id] != nil { - el.AddRecoverable(ictx, clues.WrapWC(ictx, err, "conflict: tombstone exists for a live collection")) + err := clues.NewWC(ictx, "conflict: tombstone exists for a live collection").Label(count.CollectionTombstoneConflict) + el.AddRecoverable(ctx, err) + continue } @@ -231,12 +250,14 @@ func populateCollections( prevPath, err := pathFromPrevString(p) if err != nil { + err := clues.StackWC(ctx, err).Label(count.BadPrevPath) // technically shouldn't ever happen. But just in case... logger.CtxErr(ictx, err).Error("parsing tombstone prev path") + continue } - collections[id] = data.NewTombstoneCollection(prevPath, ctrlOpts) + collections[id] = data.NewTombstoneCollection(prevPath, ctrlOpts, counter.Local()) } logger.Ctx(ctx).Infow( @@ -251,7 +272,8 @@ func populateCollections( qp.Category, false) if err != nil { - return nil, clues.Wrap(err, "making metadata path prefix") + return nil, clues.WrapWC(ctx, err, "making metadata path prefix"). + Label(count.BadPathPrefix) } col, err := graph.MakeMetadataCollection( @@ -260,14 +282,13 @@ func populateCollections( graph.NewMetadataEntry(metadata.PreviousPathFileName, currPaths), graph.NewMetadataEntry(metadata.DeltaURLsFileName, deltaURLs), }, - statusUpdater) + statusUpdater, + counter.Local()) if err != nil { - return nil, clues.Wrap(err, "making metadata collection") + return nil, clues.WrapWC(ctx, err, "making metadata collection") } collections["metadata"] = col - logger.Ctx(ctx).Infow("produced collections", "count_collections", len(collections)) - return collections, el.Failure() } diff --git a/src/internal/m365/collection/groups/backup_test.go b/src/internal/m365/collection/groups/backup_test.go index 399c0a069..f27d58008 100644 --- a/src/internal/m365/collection/groups/backup_test.go +++ b/src/internal/m365/collection/groups/backup_test.go @@ -246,6 +246,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() { selectors.NewGroupsBackup(nil).Channels(selectors.Any())[0], nil, ctrlOpts, + count.New(), fault.New(true)) test.expectErr(t, err, clues.ToCore(err)) assert.Len(t, collections, test.expectColls, "number of collections") @@ -405,6 +406,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections_incremental() { allScope, test.deltaPaths, ctrlOpts, + count.New(), fault.New(true)) test.expectErr(t, err, clues.ToCore(err)) assert.Len(t, collections, test.expectColls, "number of collections") @@ -527,6 +529,7 @@ func (suite *BackupIntgSuite) TestCreateCollections() { suite.tenantID, test.scope, func(status *support.ControllerOperationStatus) {}, + count.New(), fault.New(true)) require.NoError(t, err, clues.ToCore(err)) require.NotEmpty(t, collections, "must have at least one collection") diff --git a/src/internal/m365/collection/groups/collection.go b/src/internal/m365/collection/groups/collection.go index c57149f6a..8ded1b8de 100644 --- a/src/internal/m365/collection/groups/collection.go +++ b/src/internal/m365/collection/groups/collection.go @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" ) @@ -90,6 +91,9 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { ctx = clues.Add(ctx, "category", col.Category().String()) defer func() { + logger.Ctx(ctx).Infow( + "finished stream backup collection items", + "stats", col.Counter.Values()) col.finishPopulation(ctx, streamedItems, totalBytes, errs.Failure()) }() @@ -117,7 +121,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { col.stream <- data.NewDeletedItem(id) atomic.AddInt64(&streamedItems, 1) - atomic.AddInt64(&totalBytes, 0) + col.Counter.Inc(count.StreamItemsRemoved) if colProgress != nil { colProgress <- struct{}{} @@ -150,26 +154,23 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { parentFolderID, id) if err != nil { - el.AddRecoverable( - ctx, - clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)) + err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation) + el.AddRecoverable(ctx, err) return } if err := writer.WriteObjectValue("", item); err != nil { - el.AddRecoverable( - ctx, - clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)) + err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation) + el.AddRecoverable(ctx, err) return } itemData, err := writer.GetSerializedContent() if err != nil { - el.AddRecoverable( - ctx, - clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation)) + err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation) + el.AddRecoverable(ctx, err) return } @@ -181,9 +182,8 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { id, details.ItemInfo{Groups: info}) if err != nil { - el.AddRecoverable( - ctx, - clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation)) + err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation) + el.AddRecoverable(ctx, err) return } @@ -193,6 +193,12 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) { atomic.AddInt64(&streamedItems, 1) atomic.AddInt64(&totalBytes, info.Size) + if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 { + logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values()) + } + + col.Counter.Add(count.StreamBytesAdded, info.Size) + if colProgress != nil { colProgress <- struct{}{} } diff --git a/src/internal/m365/collection/groups/collection_test.go b/src/internal/m365/collection/groups/collection_test.go index 1f0c17d25..19c1df8ad 100644 --- a/src/internal/m365/collection/groups/collection_test.go +++ b/src/internal/m365/collection/groups/collection_test.go @@ -18,6 +18,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -121,7 +122,8 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() { test.prev, test.loc, control.DefaultOptions(), - false), + false, + count.New()), nil, "g", nil, nil, @@ -202,7 +204,8 @@ func (suite *CollectionUnitSuite) TestCollection_streamItems() { nil, locPath.ToBuilder(), control.DefaultOptions(), - false), + false, + count.New()), added: test.added, removed: test.removed, getter: mock.GetChannelMessage{}, diff --git a/src/internal/m365/collection/site/backup.go b/src/internal/m365/collection/site/backup.go index 17ad650a0..c759462cd 100644 --- a/src/internal/m365/collection/site/backup.go +++ b/src/internal/m365/collection/site/backup.go @@ -33,6 +33,7 @@ func CollectLibraries( tenantID string, ssmb *prefixmatcher.StringSetMatchBuilder, su support.StatusUpdater, + counter *count.Bus, errs *fault.Bus, ) ([]data.BackupCollection, bool, error) { logger.Ctx(ctx).Debug("creating SharePoint Library collections") @@ -44,7 +45,8 @@ func CollectLibraries( tenantID, bpc.ProtectedResource, su, - bpc.Options) + bpc.Options, + counter) ) msg := fmt.Sprintf( diff --git a/src/internal/m365/debug.go b/src/internal/m365/debug.go index cecbcc09c..038706e79 100644 --- a/src/internal/m365/debug.go +++ b/src/internal/m365/debug.go @@ -9,6 +9,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/collection/drive" "github.com/alcionai/corso/src/internal/m365/collection/exchange" "github.com/alcionai/corso/src/internal/m365/collection/groups" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/store" ) @@ -28,9 +29,9 @@ func (ctrl *Controller) DeserializeMetadataFiles( case path.ExchangeService, path.ExchangeMetadataService: return exchange.DeserializeMetadataFiles(ctx, colls) case path.OneDriveService, path.OneDriveMetadataService: - return drive.DeserializeMetadataFiles(ctx, colls) + return drive.DeserializeMetadataFiles(ctx, colls, count.New()) case path.SharePointService, path.SharePointMetadataService: - return drive.DeserializeMetadataFiles(ctx, colls) + return drive.DeserializeMetadataFiles(ctx, colls, count.New()) case path.GroupsService, path.GroupsMetadataService: return groups.DeserializeMetadataFiles(ctx, colls) default: diff --git a/src/internal/m365/service/exchange/backup.go b/src/internal/m365/service/exchange/backup.go index c4b0880e2..879160375 100644 --- a/src/internal/m365/service/exchange/backup.go +++ b/src/internal/m365/service/exchange/backup.go @@ -10,6 +10,8 @@ import ( "github.com/alcionai/corso/src/internal/m365/collection/exchange" "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/operations/inject" + "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -23,8 +25,9 @@ func ProduceBackupCollections( ctx context.Context, bpc inject.BackupProducerConfig, ac api.Client, - tenantID string, + creds account.M365Config, su support.StatusUpdater, + counter *count.Bus, errs *fault.Bus, ) ([]data.BackupCollection, *prefixmatcher.StringSetMatcher, bool, error) { eb, err := bpc.Selector.ToExchangeBackup() @@ -35,6 +38,7 @@ func ProduceBackupCollections( var ( collections = []data.BackupCollection{} el = errs.Local() + tenantID = creds.AzureTenantID categories = map[path.CategoryType]struct{}{} handlers = exchange.BackupHandlers(ac) ) @@ -46,6 +50,7 @@ func ProduceBackupCollections( if !canMakeDeltaQueries { logger.Ctx(ctx).Info("delta requests not available") + counter.Inc(count.NoDeltaQueries) bpc.Options.ToggleFeatures.DisableDelta = true } @@ -75,6 +80,7 @@ func ProduceBackupCollections( scope, cdps[scope.Category().PathType()], su, + counter, errs) if err != nil { el.AddRecoverable(ctx, err) @@ -95,6 +101,7 @@ func ProduceBackupCollections( path.ExchangeService, categories, su, + counter, errs) if err != nil { return nil, nil, false, err @@ -103,6 +110,8 @@ func ProduceBackupCollections( collections = append(collections, baseCols...) } + logger.Ctx(ctx).Infow("produced collections", "stats", counter.Values()) + return collections, nil, canUsePreviousBackup, el.Failure() } diff --git a/src/internal/m365/service/groups/backup.go b/src/internal/m365/service/groups/backup.go index 11af8e5e9..a962d39d8 100644 --- a/src/internal/m365/service/groups/backup.go +++ b/src/internal/m365/service/groups/backup.go @@ -23,6 +23,7 @@ import ( "github.com/alcionai/corso/src/pkg/backup/identity" "github.com/alcionai/corso/src/pkg/backup/metadata" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -36,6 +37,7 @@ func ProduceBackupCollections( ac api.Client, creds account.M365Config, su support.StatusUpdater, + counter *count.Bus, errs *fault.Bus, ) ([]data.BackupCollection, *prefixmatcher.StringSetMatcher, error) { b, err := bpc.Selector.ToGroupsBackup() @@ -71,15 +73,20 @@ func ProduceBackupCollections( break } + cl := counter.Local() + ictx := clues.AddLabelCounter(ctx, cl.PlainAdder()) + var dbcs []data.BackupCollection switch scope.Category().PathType() { case path.LibrariesCategory: - sites, err := ac.Groups().GetAllSites(ctx, bpc.ProtectedResource.ID(), errs) + sites, err := ac.Groups().GetAllSites(ictx, bpc.ProtectedResource.ID(), errs) if err != nil { return nil, nil, err } + cl.Add(count.Sites, int64(len(sites))) + siteMetadataCollection := map[string][]data.RestoreCollection{} // Once we have metadata collections for chat as well, we will have to filter those out @@ -89,43 +96,51 @@ func ProduceBackupCollections( } for _, s := range sites { - pr := idname.NewProvider(ptr.Val(s.GetId()), ptr.Val(s.GetWebUrl())) - sbpc := inject.BackupProducerConfig{ - LastBackupVersion: bpc.LastBackupVersion, - Options: bpc.Options, - ProtectedResource: pr, - Selector: bpc.Selector, - MetadataCollections: siteMetadataCollection[ptr.Val(s.GetId())], - } + var ( + scl = cl.Local() + pr = idname.NewProvider(ptr.Val(s.GetId()), ptr.Val(s.GetWebUrl())) + sbpc = inject.BackupProducerConfig{ + LastBackupVersion: bpc.LastBackupVersion, + Options: bpc.Options, + ProtectedResource: pr, + Selector: bpc.Selector, + MetadataCollections: siteMetadataCollection[ptr.Val(s.GetId())], + } + bh = drive.NewGroupBackupHandler( + bpc.ProtectedResource.ID(), + ptr.Val(s.GetId()), + ac.Drives(), + scope) + ) - bh := drive.NewGroupBackupHandler( - bpc.ProtectedResource.ID(), - ptr.Val(s.GetId()), - ac.Drives(), - scope) + ictx = clues.Add( + ictx, + "site_id", ptr.Val(s.GetId()), + "site_weburl", graph.LoggableURL(ptr.Val(s.GetWebUrl()))) sp, err := bh.SitePathPrefix(creds.AzureTenantID) if err != nil { - return nil, nil, clues.Wrap(err, "getting site path") + return nil, nil, clues.WrapWC(ictx, err, "getting site path").Label(count.BadPathPrefix) } sitesPreviousPaths[ptr.Val(s.GetId())] = sp.String() cs, canUsePreviousBackup, err := site.CollectLibraries( - ctx, + ictx, sbpc, bh, creds.AzureTenantID, ssmb, su, + scl, errs) if err != nil { - el.AddRecoverable(ctx, err) + el.AddRecoverable(ictx, err) continue } if !canUsePreviousBackup { - dbcs = append(dbcs, data.NewTombstoneCollection(sp, control.Options{})) + dbcs = append(dbcs, data.NewTombstoneCollection(sp, control.Options{}, scl)) } dbcs = append(dbcs, cs...) @@ -142,7 +157,7 @@ func ProduceBackupCollections( // TODO(meain): Use number of messages and not channels CompletionMessage: func() string { return fmt.Sprintf("(found %d channels)", len(cs)) }, } - progressBar := observe.MessageWithCompletion(ctx, pcfg, scope.Category().PathType().HumanString()) + progressBar := observe.MessageWithCompletion(ictx, pcfg, scope.Category().PathType().HumanString()) if !isTeam { continue @@ -151,25 +166,26 @@ func ProduceBackupCollections( bh := groups.NewChannelBackupHandler(bpc.ProtectedResource.ID(), ac.Channels()) cs, canUsePreviousBackup, err = groups.CreateCollections( - ctx, + ictx, bpc, bh, creds.AzureTenantID, scope, su, + cl, errs) if err != nil { - el.AddRecoverable(ctx, err) + el.AddRecoverable(ictx, err) continue } if !canUsePreviousBackup { tp, err := bh.PathPrefix(creds.AzureTenantID) if err != nil { - return nil, nil, clues.Wrap(err, "getting message path") + return nil, nil, clues.WrapWC(ictx, err, "getting message path").Label(count.BadPathPrefix) } - dbcs = append(dbcs, data.NewTombstoneCollection(tp, control.Options{})) + dbcs = append(dbcs, data.NewTombstoneCollection(tp, control.Options{}, cl)) } dbcs = append(dbcs, cs...) @@ -191,6 +207,7 @@ func ProduceBackupCollections( path.GroupsService, categories, su, + counter, errs) if err != nil { return nil, nil, err @@ -204,13 +221,18 @@ func ProduceBackupCollections( creds.AzureTenantID, bpc.ProtectedResource.ID(), sitesPreviousPaths, - su) + su, + counter) if err != nil { return nil, nil, err } collections = append(collections, md) + counter.Add(count.Collections, int64(len(collections))) + + logger.Ctx(ctx).Infow("produced collections", "stats", counter.Values()) + return collections, ssmb.ToReader(), el.Failure() } @@ -218,6 +240,7 @@ func getSitesMetadataCollection( tenantID, groupID string, sites map[string]string, su support.StatusUpdater, + counter *count.Bus, ) (data.BackupCollection, error) { p, err := path.BuildMetadata( tenantID, @@ -239,7 +262,8 @@ func getSitesMetadataCollection( []graph.MetadataCollectionEntry{ graph.NewMetadataEntry(metadata.PreviousPathFileName, sites), }, - su) + su, + counter.Local()) return md, err } diff --git a/src/internal/m365/service/onedrive/backup.go b/src/internal/m365/service/onedrive/backup.go index 8015d7c58..e373a66b2 100644 --- a/src/internal/m365/service/onedrive/backup.go +++ b/src/internal/m365/service/onedrive/backup.go @@ -13,6 +13,8 @@ import ( "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/internal/version" + "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -24,8 +26,9 @@ func ProduceBackupCollections( ctx context.Context, bpc inject.BackupProducerConfig, ac api.Client, - tenant string, + creds account.M365Config, su support.StatusUpdater, + counter *count.Bus, errs *fault.Bus, ) ([]data.BackupCollection, *prefixmatcher.StringSetMatcher, bool, error) { odb, err := bpc.Selector.ToOneDriveBackup() @@ -35,6 +38,7 @@ func ProduceBackupCollections( var ( el = errs.Local() + tenantID = creds.AzureTenantID categories = map[path.CategoryType]struct{}{} collections = []data.BackupCollection{} ssmb = prefixmatcher.NewStringSetBuilder() @@ -52,10 +56,11 @@ func ProduceBackupCollections( nc := drive.NewCollections( drive.NewUserDriveBackupHandler(ac.Drives(), bpc.ProtectedResource.ID(), scope), - tenant, + tenantID, bpc.ProtectedResource, su, - bpc.Options) + bpc.Options, + counter) pcfg := observe.ProgressCfg{ Indent: 1, @@ -75,7 +80,7 @@ func ProduceBackupCollections( collections = append(collections, odcs...) } - mcs, err := migrationCollections(bpc, tenant, su) + mcs, err := migrationCollections(bpc, tenantID, su, counter) if err != nil { return nil, nil, false, err } @@ -86,11 +91,12 @@ func ProduceBackupCollections( baseCols, err := graph.BaseCollections( ctx, collections, - tenant, + tenantID, bpc.ProtectedResource.ID(), path.OneDriveService, categories, su, + counter, errs) if err != nil { return nil, nil, false, err @@ -99,6 +105,8 @@ func ProduceBackupCollections( collections = append(collections, baseCols...) } + logger.Ctx(ctx).Infow("produced collections", "stats", counter.Values()) + return collections, ssmb.ToReader(), canUsePreviousBackup, el.Failure() } @@ -107,6 +115,7 @@ func migrationCollections( bpc inject.BackupProducerConfig, tenant string, su support.StatusUpdater, + counter *count.Bus, ) ([]data.BackupCollection, error) { // assume a version < 0 implies no prior backup, thus nothing to migrate. if version.IsNoBackup(bpc.LastBackupVersion) { @@ -117,6 +126,8 @@ func migrationCollections( return nil, nil } + counter.Inc(count.RequiresUserPnToIDMigration) + // unlike exchange, which enumerates all folders on every // backup, onedrive needs to force the owner PN -> ID migration mc, err := path.BuildPrefix( @@ -137,7 +148,7 @@ func migrationCollections( return nil, clues.Wrap(err, "creating user name migration path") } - mgn, err := graph.NewPrefixCollection(mpc, mc, su) + mgn, err := graph.NewPrefixCollection(mpc, mc, su, counter) if err != nil { return nil, clues.Wrap(err, "creating migration collection") } diff --git a/src/internal/m365/service/onedrive/backup_test.go b/src/internal/m365/service/onedrive/backup_test.go index 64970a7ab..2a5c82216 100644 --- a/src/internal/m365/service/onedrive/backup_test.go +++ b/src/internal/m365/service/onedrive/backup_test.go @@ -13,6 +13,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/version" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" ) @@ -92,7 +93,7 @@ func (suite *BackupUnitSuite) TestMigrationCollections() { ProtectedResource: u, } - mc, err := migrationCollections(bpc, "t", nil) + mc, err := migrationCollections(bpc, "t", nil, count.New()) require.NoError(t, err, clues.ToCore(err)) if test.expectLen == 0 { diff --git a/src/internal/m365/service/sharepoint/backup.go b/src/internal/m365/service/sharepoint/backup.go index 770970874..9961578a9 100644 --- a/src/internal/m365/service/sharepoint/backup.go +++ b/src/internal/m365/service/sharepoint/backup.go @@ -84,6 +84,7 @@ func ProduceBackupCollections( creds.AzureTenantID, ssmb, su, + counter, errs) if err != nil { el.AddRecoverable(ctx, err) @@ -124,6 +125,7 @@ func ProduceBackupCollections( path.SharePointService, categories, su, + counter, errs) if err != nil { return nil, nil, false, err diff --git a/src/internal/m365/service/sharepoint/backup_test.go b/src/internal/m365/service/sharepoint/backup_test.go index 198e6247b..19ef8794c 100644 --- a/src/internal/m365/service/sharepoint/backup_test.go +++ b/src/internal/m365/service/sharepoint/backup_test.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/internal/m365/service/onedrive/mock" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" @@ -119,7 +120,8 @@ func (suite *LibrariesBackupUnitSuite) TestUpdateCollections() { tenantID, idname.NewProvider(siteID, siteID), nil, - control.DefaultOptions()) + control.DefaultOptions(), + count.New()) c.CollectionMap = collMap @@ -131,6 +133,7 @@ func (suite *LibrariesBackupUnitSuite) TestUpdateCollections() { excluded, topLevelPackages, "notempty", + count.New(), fault.New(true)) test.expect(t, err, clues.ToCore(err)) diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 0864d6088..b1aca42e5 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -197,6 +197,8 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { } }() + ctx = clues.AddLabelCounter(ctx, op.Counter.PlainAdder()) + ctx, end := diagnostics.Span(ctx, "operations:backup:run") defer end() @@ -543,7 +545,7 @@ func produceBackupDataCollections( Selector: sel, } - return bp.ProduceBackupCollections(ctx, bpc, counter, errs) + return bp.ProduceBackupCollections(ctx, bpc, counter.Local(), errs) } // --------------------------------------------------------------------------- diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index d55ddd132..dd653733a 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -1915,7 +1915,8 @@ func (suite *AssistBackupIntegrationSuite) TestBackupTypesForFailureModes() { mc, err := graph.MakeMetadataCollection( pathPrefix, makeMetadataCollectionEntries("url/1", driveID, folderID, tmp), - func(*support.ControllerOperationStatus) {}) + func(*support.ControllerOperationStatus) {}, + count.New()) require.NoError(t, err, clues.ToCore(err)) cs = append(cs, mc) @@ -2233,7 +2234,8 @@ func (suite *AssistBackupIntegrationSuite) TestExtensionsIncrementals() { mc, err := graph.MakeMetadataCollection( pathPrefix, makeMetadataCollectionEntries("url/1", driveID, folderID, tmp), - func(*support.ControllerOperationStatus) {}) + func(*support.ControllerOperationStatus) {}, + count.New()) require.NoError(t, err, clues.ToCore(err)) cs = append(cs, mc) diff --git a/src/internal/operations/maintenance_test.go b/src/internal/operations/maintenance_test.go index 81269d6dd..2c7c71906 100644 --- a/src/internal/operations/maintenance_test.go +++ b/src/internal/operations/maintenance_test.go @@ -167,7 +167,8 @@ func (suite *MaintenanceOpNightlySuite) TestRepoMaintenance_GarbageCollection() mc, err := graph.MakeMetadataCollection( prefixPath, makeMetadataCollectionEntries("url/1", driveID, folderID, tmp), - func(*support.ControllerOperationStatus) {}) + func(*support.ControllerOperationStatus) {}, + count.New()) require.NoError(t, err, clues.ToCore(err)) cs = append(cs, mc) diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 9f0eb3132..514c592c0 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -128,6 +128,8 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De // Setup // ----- + ctx = clues.AddLabelCounter(ctx, op.Counter.PlainAdder()) + ctx, end := diagnostics.Span(ctx, "operations:restore:run") defer end() diff --git a/src/pkg/count/count.go b/src/pkg/count/count.go index 9bd89fcff..06c3544c5 100644 --- a/src/pkg/count/count.go +++ b/src/pkg/count/count.go @@ -27,24 +27,24 @@ func (b *Bus) Local() *Bus { return bus } -func (b *Bus) getCounter(k key) *xsync.Counter { +func (b *Bus) getCounter(k Key) *xsync.Counter { xc, _ := b.stats.LoadOrStore(string(k), xsync.NewCounter()) return xc } // Inc increases the count by 1. -func (b *Bus) Inc(k key) { +func (b *Bus) Inc(k Key) int64 { if b == nil { - return + return -1 } - b.Add(k, 1) + return b.Add(k, 1) } -// Inc increases the count by n. -func (b *Bus) Add(k key, n int64) { +// Add increases the count by n. +func (b *Bus) Add(k Key, n int64) int64 { if b == nil { - return + return -1 } b.getCounter(k).Add(n) @@ -52,18 +52,12 @@ func (b *Bus) Add(k key, n int64) { if b.parent != nil { b.parent.Add(k, n) } -} -// AdderFor returns a func that adds any value of i -// to the bus using the given key. -func (b *Bus) AdderFor(k key) func(i int64) { - return func(i int64) { - b.Add(k, i) - } + return b.Get(k) } // Get returns the local count. -func (b *Bus) Get(k key) int64 { +func (b *Bus) Get(k Key) int64 { if b == nil { return -1 } @@ -72,7 +66,7 @@ func (b *Bus) Get(k key) int64 { } // Total returns the global count. -func (b *Bus) Total(k key) int64 { +func (b *Bus) Total(k Key) int64 { if b == nil { return -1 } @@ -114,3 +108,33 @@ func (b *Bus) TotalValues() map[string]int64 { return b.Values() } + +// --------------------------------------------------------------------------- +// compliance with callbacks and external packages +// --------------------------------------------------------------------------- + +// AdderFor returns a func that adds any value of i +// to the bus using the given key. +func (b *Bus) AdderFor(k Key) func(i int64) { + return func(i int64) { + b.Add(k, i) + } +} + +type plainAdder struct { + bus *Bus +} + +func (pa plainAdder) Add(k string, n int64) { + if pa.bus == nil { + return + } + + pa.bus.Add(Key(k), n) +} + +// PlainAdder provides support to external packages that could take in a count.Bus +// but don't recognize the `Key` type, and would prefer a string type key. +func (b *Bus) PlainAdder() *plainAdder { + return &plainAdder{b} +} diff --git a/src/pkg/count/count_test.go b/src/pkg/count/count_test.go index 4f4d948ae..b1283b88a 100644 --- a/src/pkg/count/count_test.go +++ b/src/pkg/count/count_test.go @@ -17,7 +17,7 @@ func TestCountUnitSuite(t *testing.T) { suite.Run(t, &CountUnitSuite{Suite: tester.NewUnitSuite(t)}) } -const testKey = key("just-for-testing") +const testKey = Key("just-for-testing") func (suite *CountUnitSuite) TestBus_Inc() { newParent := func() *Bus { @@ -71,6 +71,46 @@ func (suite *CountUnitSuite) TestBus_Inc() { } } +func (suite *CountUnitSuite) TestBus_Inc_result() { + newParent := func() *Bus { + parent := New() + parent.Inc(testKey) + + return parent + } + + table := []struct { + name string + bus *Bus + expect int64 + expectTotal int64 + }{ + { + name: "nil", + bus: nil, + expect: -1, + expectTotal: -1, + }, + { + name: "one", + bus: newParent().Local(), + expect: 1, + expectTotal: 2, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + result := test.bus.Inc(testKey) + assert.Equal(t, test.expect, result) + + resultTotal := test.bus.Total(testKey) + assert.Equal(t, test.expectTotal, resultTotal) + }) + } +} + func (suite *CountUnitSuite) TestBus_Add() { newParent := func() *Bus { parent := New() @@ -123,6 +163,47 @@ func (suite *CountUnitSuite) TestBus_Add() { } } +func (suite *CountUnitSuite) TestBus_Add_result() { + newParent := func() *Bus { + parent := New() + parent.Add(testKey, 2) + + return parent + } + + table := []struct { + name string + skip bool + bus *Bus + expect int64 + expectTotal int64 + }{ + { + name: "nil", + bus: nil, + expect: -1, + expectTotal: -1, + }, + { + name: "some", + bus: newParent().Local(), + expect: 4, + expectTotal: 6, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + result := test.bus.Add(testKey, 4) + assert.Equal(t, test.expect, result) + + resultTotal := test.bus.Total(testKey) + assert.Equal(t, test.expectTotal, resultTotal) + }) + } +} + func (suite *CountUnitSuite) TestBus_Values() { table := []struct { name string diff --git a/src/pkg/count/keys.go b/src/pkg/count/keys.go index bdfd03ca7..a3a02e999 100644 --- a/src/pkg/count/keys.go +++ b/src/pkg/count/keys.go @@ -1,42 +1,99 @@ package count -type key string +type Key string const ( // count of bucket-tokens consumed by api calls. - APICallTokensConsumed key = "api-call-tokens-consumed" + APICallTokensConsumed Key = "api-call-tokens-consumed" // count of api calls that resulted in failure due to throttling. - ThrottledAPICalls key = "throttled-api-calls" + ThrottledAPICalls Key = "throttled-api-calls" ) // Tracked during backup const ( // amounts reported by kopia - PersistedCachedFiles key = "persisted-cached-files" - PersistedDirectories key = "persisted-directories" - PersistedFiles key = "persisted-files" - PersistedHashedBytes key = "persisted-hashed-bytes" - PersistedNonCachedFiles key = "persisted-non-cached-files" - PersistedNonMetaFiles key = "persisted-non-meta-files" - PersistedNonMetaUploadedBytes key = "persisted-non-meta-uploaded-bytes" - PersistedUploadedBytes key = "persisted-uploaded-bytes" - PersistenceErrors key = "persistence-errors" - PersistenceExpectedErrors key = "persistence-expected-errors" - PersistenceIgnoredErrors key = "persistence-ignored-errors" + + PersistedCachedFiles Key = "persisted-cached-files" + PersistedDirectories Key = "persisted-directories" + PersistedFiles Key = "persisted-files" + PersistedHashedBytes Key = "persisted-hashed-bytes" + PersistedNonCachedFiles Key = "persisted-non-cached-files" + PersistedNonMetaFiles Key = "persisted-non-meta-files" + PersistedNonMetaUploadedBytes Key = "persisted-non-meta-uploaded-bytes" + PersistedUploadedBytes Key = "persisted-uploaded-bytes" + PersistenceErrors Key = "persistence-errors" + PersistenceExpectedErrors Key = "persistence-expected-errors" + PersistenceIgnoredErrors Key = "persistence-ignored-errors" + // amounts reported by data providers - ProviderItemsRead key = "provider-items-read" + + Channels Key = "channels" + CollectionMoved Key = "collection-moved" + CollectionNew Key = "collection-state-new" + CollectionNotMoved Key = "collection-state-not-moved" + CollectionTombstoned Key = "collection-state-tombstoned" + Collections Key = "collections" + DeleteFolderMarker Key = "delete-folder-marker" + DeleteItemMarker Key = "delete-item-marker" + Drives Key = "drives" + DriveTombstones Key = "drive-tombstones" + Files Key = "files" + Folders Key = "folders" + ItemDownloadURLRefetch Key = "item-download-url-refetch" + ItemsAdded Key = "items-added" + ItemsRemoved Key = "items-removed" + LazyDeletedInFlight Key = "lazy-deleted-in-flight" + Malware Key = "malware" + MetadataItems Key = "metadata-items" + MetadataBytes Key = "metadata-bytes" + MissingDelta Key = "missing-delta-token" + NewDeltas Key = "new-delta-tokens" + NewPrevPaths Key = "new-previous-paths" + NoDeltaQueries Key = "cannot-make-delta-queries" + Packages Key = "packages" + PagerResets Key = "pager-resets" + PagesEnumerated Key = "pages-enumerated" + PrevDeltas Key = "previous-deltas" + PrevPaths Key = "previous-paths" + PreviousPathMetadataCollision Key = "previous-path-metadata-collision" + Sites Key = "sites" + SkippedContainers Key = "skipped-containers" + StreamBytesAdded Key = "stream-bytes-added" + StreamDirsAdded Key = "stream-dirs-added" + StreamDirsFound Key = "stream-dirs-found" + StreamItemsAdded Key = "stream-items-added" + StreamItemsDeletedInFlight Key = "stream-items-deleted-in-flight" + StreamItemsErrored Key = "stream-items-errored" + StreamItemsFound Key = "stream-items-found" + StreamItemsRemoved Key = "stream-items-removed" + URLCacheMiss Key = "url-cache-miss" + URLCacheRefresh Key = "url-cache-refresh" + + // miscellaneous + RequiresUserPnToIDMigration Key = "requires-user-pn-to-id-migration" +) + +// Counted using clues error labels +const ( + BadCollPath = "bad_collection_path" + BadPathPrefix = "bad_path_prefix_creation" + BadPrevPath = "unparsable_prev_path" + CollectionTombstoneConflict = "collection_tombstone_conflicts_with_live_collection" + ItemBeforeParent = "item_before_parent" + MissingParent = "missing_parent" + UnknownItemType = "unknown_item_type" ) // Tracked during restore const ( // count of times that items had collisions during restore, // and that collision was solved by replacing the item. - CollisionReplace key = "collision-replace" + CollisionReplace Key = "collision-replace" // count of times that items had collisions during restore, // and that collision was solved by skipping the item. - CollisionSkip key = "collision-skip" + CollisionSkip Key = "collision-skip" // NewItemCreated should be used for non-skip, non-replace, // non-meta item creation counting. IE: use it specifically // for counting new items (no collision) or copied items. - NewItemCreated key = "new-item-created" + NewItemCreated Key = "new-item-created" ) diff --git a/src/pkg/services/m365/api/graph/collections.go b/src/pkg/services/m365/api/graph/collections.go index 3e6f2683e..cd4353e0a 100644 --- a/src/pkg/services/m365/api/graph/collections.go +++ b/src/pkg/services/m365/api/graph/collections.go @@ -7,6 +7,7 @@ import ( "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/support" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -60,6 +61,7 @@ func BaseCollections( service path.ServiceType, categories map[path.CategoryType]struct{}, su support.StatusUpdater, + counter *count.Bus, errs *fault.Bus, ) ([]data.BackupCollection, error) { var ( @@ -82,7 +84,7 @@ func BaseCollections( full, err := path.BuildPrefix(tenant, rOwner, service, cat) if err != nil { // Shouldn't happen. - err = clues.WrapWC(ictx, err, "making path") + err = clues.WrapWC(ictx, err, "making path").Label(count.BadPathPrefix) el.AddRecoverable(ictx, err) lastErr = err @@ -95,7 +97,7 @@ func BaseCollections( prev: full, full: full, su: su, - state: data.StateOf(full, full), + state: data.StateOf(full, full, counter), }) } } @@ -111,6 +113,7 @@ func BaseCollections( func NewPrefixCollection( prev, full path.Path, su support.StatusUpdater, + counter *count.Bus, ) (*prefixCollection, error) { if prev != nil { if len(prev.Item()) > 0 { @@ -136,7 +139,7 @@ func NewPrefixCollection( prev: prev, full: full, su: su, - state: data.StateOf(prev, full), + state: data.StateOf(prev, full, counter), } if pc.state == data.DeletedState { diff --git a/src/pkg/services/m365/api/graph/collections_test.go b/src/pkg/services/m365/api/graph/collections_test.go index 1b075f30c..8bb1461ac 100644 --- a/src/pkg/services/m365/api/graph/collections_test.go +++ b/src/pkg/services/m365/api/graph/collections_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/path" ) @@ -93,7 +94,7 @@ func (suite *CollectionsUnitSuite) TestNewPrefixCollection() { } for _, test := range table { suite.Run(test.name, func() { - _, err := NewPrefixCollection(test.prev, test.full, nil) + _, err := NewPrefixCollection(test.prev, test.full, nil, count.New()) test.expectErr(suite.T(), err, clues.ToCore(err)) }) } diff --git a/src/pkg/services/m365/api/graph/metadata_collection.go b/src/pkg/services/m365/api/graph/metadata_collection.go index 9d9534c1e..8a14e67d6 100644 --- a/src/pkg/services/m365/api/graph/metadata_collection.go +++ b/src/pkg/services/m365/api/graph/metadata_collection.go @@ -11,6 +11,7 @@ import ( "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/support" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -27,6 +28,7 @@ type MetadataCollection struct { fullPath path.Path items []metadataItem statusUpdater support.StatusUpdater + counter *count.Bus } // MetadataCollectionEntry describes a file that should get added to a metadata @@ -78,6 +80,7 @@ func MakeMetadataCollection( pathPrefix path.Path, metadata []MetadataCollectionEntry, statusUpdater support.StatusUpdater, + counter *count.Bus, ) (data.BackupCollection, error) { if len(metadata) == 0 { return nil, nil @@ -94,7 +97,7 @@ func MakeMetadataCollection( items = append(items, item) } - coll := NewMetadataCollection(pathPrefix, items, statusUpdater) + coll := NewMetadataCollection(pathPrefix, items, statusUpdater, counter) return coll, nil } @@ -103,11 +106,13 @@ func NewMetadataCollection( p path.Path, items []metadataItem, statusUpdater support.StatusUpdater, + counter *count.Bus, ) *MetadataCollection { return &MetadataCollection{ fullPath: p, items: items, statusUpdater: statusUpdater, + counter: counter, } } @@ -155,11 +160,13 @@ func (md MetadataCollection) Items( }, md.fullPath.Folder(false)) + md.counter.Add(count.MetadataItems, int64(len(md.items))) md.statusUpdater(status) }() defer close(res) for _, item := range md.items { + md.counter.Add(count.MetadataBytes, item.size) totalBytes += item.size res <- item } diff --git a/src/pkg/services/m365/api/graph/metadata_collection_test.go b/src/pkg/services/m365/api/graph/metadata_collection_test.go index 1e4a087ae..46ae9bef5 100644 --- a/src/pkg/services/m365/api/graph/metadata_collection_test.go +++ b/src/pkg/services/m365/api/graph/metadata_collection_test.go @@ -17,6 +17,7 @@ import ( "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -41,7 +42,7 @@ func (suite *MetadataCollectionUnitSuite) TestFullPath() { "foo") require.NoError(t, err, clues.ToCore(err)) - c := NewMetadataCollection(p, nil, nil) + c := NewMetadataCollection(p, nil, nil, count.New()) assert.Equal(t, p.String(), c.FullPath().String()) } @@ -99,7 +100,8 @@ func (suite *MetadataCollectionUnitSuite) TestItems() { func(c *support.ControllerOperationStatus) { assert.Equal(t, len(itemNames), c.Metrics.Objects) assert.Equal(t, len(itemNames), c.Metrics.Successes) - }) + }, + count.New()) gotData := [][]byte{} gotNames := []string{} @@ -198,7 +200,8 @@ func (suite *MetadataCollectionUnitSuite) TestMakeMetadataCollection() { col, err := MakeMetadataCollection( pathPrefix, []MetadataCollectionEntry{test.metadata}, - func(*support.ControllerOperationStatus) {}) + func(*support.ControllerOperationStatus) {}, + count.New()) test.errCheck(t, err, clues.ToCore(err)) if err != nil {