use counter in exchange backup enum (#4661)

Increase proliferation of the count bus to record runtime stats.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🤖 Supportability/Tests

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-11-16 16:22:00 -07:00 committed by GitHub
parent 7a21424ca7
commit 9ecefd2569
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 676 additions and 187 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/path"
)
@ -66,7 +67,7 @@ func (suite *CollectionSuite) TestStateOf() {
}
for _, test := range table {
suite.Run(test.name, func() {
state := StateOf(test.prev, test.curr)
state := StateOf(test.prev, test.curr, count.New())
assert.Equal(suite.T(), test.expect, state)
})
}
@ -142,7 +143,8 @@ func (suite *CollectionSuite) TestNewBaseCollection() {
test.previous,
loc,
control.Options{},
test.doNotMerge)
test.doNotMerge,
count.New())
assert.Equal(t, test.expectCurrent, b.FullPath(), "full path")
assert.Equal(t, test.expectPrev, b.PreviousPath(), "previous path")
@ -160,7 +162,7 @@ func (suite *CollectionSuite) TestNewTombstoneCollection() {
fooP, err := path.Build("t", "u", path.ExchangeService, path.EmailCategory, false, "foo")
require.NoError(t, err, clues.ToCore(err))
c := NewTombstoneCollection(fooP, control.Options{})
c := NewTombstoneCollection(fooP, control.Options{}, count.New())
assert.Nil(t, c.FullPath(), "full path")
assert.Equal(t, fooP, c.PreviousPath(), "previous path")
assert.Nil(t, c.LocationPath(), "location path")

View File

@ -6,6 +6,7 @@ import (
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -38,19 +39,27 @@ func (c NoFetchRestoreCollection) FetchItemByName(context.Context, string) (Item
// StateOf lets us figure out the state of the collection from the
// previous and current path
func StateOf(prev, curr path.Path) CollectionState {
func StateOf(
prev, curr path.Path,
counter *count.Bus,
) CollectionState {
if curr == nil || len(curr.String()) == 0 {
counter.Inc(count.CollectionTombstoned)
return DeletedState
}
if prev == nil || len(prev.String()) == 0 {
counter.Inc(count.CollectionNew)
return NewState
}
if curr.String() != prev.String() {
counter.Inc(count.CollectionMoved)
return MovedState
}
counter.Inc(count.CollectionNotMoved)
return NotMovedState
}
@ -63,6 +72,7 @@ func NewBaseCollection(
location *path.Builder,
ctrlOpts control.Options,
doNotMergeItems bool,
counter *count.Bus,
) BaseCollection {
return BaseCollection{
opts: ctrlOpts,
@ -70,7 +80,8 @@ func NewBaseCollection(
fullPath: curr,
locationPath: location,
prevPath: prev,
state: StateOf(prev, curr),
state: StateOf(prev, curr, counter),
Counter: counter.Local(),
}
}
@ -98,6 +109,8 @@ type BaseCollection struct {
// doNotMergeItems should only be true if the old delta token expired.
doNotMergeItems bool
Counter *count.Bus
}
// FullPath returns the BaseCollection's fullPath []string
@ -145,9 +158,16 @@ func (col BaseCollection) Opts() control.Options {
func NewTombstoneCollection(
prev path.Path,
opts control.Options,
counter *count.Bus,
) *tombstoneCollection {
return &tombstoneCollection{
BaseCollection: NewBaseCollection(nil, prev, nil, opts, false),
BaseCollection: NewBaseCollection(
nil,
prev,
nil,
opts,
false,
counter),
}
}

View File

@ -11,6 +11,7 @@ import (
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
)
@ -136,6 +137,7 @@ func NewLazyItem(
itemGetter ItemDataGetter,
itemID string,
modTime time.Time,
counter *count.Bus,
errs *fault.Bus,
) *lazyItem {
return &lazyItem{
@ -143,6 +145,7 @@ func NewLazyItem(
id: itemID,
itemGetter: itemGetter,
modTime: modTime,
counter: counter,
errs: errs,
}
}
@ -157,6 +160,7 @@ type lazyItem struct {
ctx context.Context
mu sync.Mutex
id string
counter *count.Bus
errs *fault.Bus
itemGetter ItemDataGetter
@ -203,6 +207,7 @@ func (i *lazyItem) ToReader() io.ReadCloser {
// etc.) and the item isn't enumerated in that set.
if delInFlight {
logger.Ctx(i.ctx).Info("item not found")
i.counter.Inc(count.LazyDeletedInFlight)
i.delInFlight = true
format.DelInFlight = true
@ -232,6 +237,7 @@ func NewLazyItemWithInfo(
itemGetter ItemDataGetter,
itemID string,
modTime time.Time,
counter *count.Bus,
errs *fault.Bus,
) *lazyItemWithInfo {
return &lazyItemWithInfo{
@ -240,6 +246,7 @@ func NewLazyItemWithInfo(
itemGetter,
itemID,
modTime,
counter,
errs),
}
}

View File

@ -16,6 +16,7 @@ import (
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
)
@ -74,6 +75,7 @@ func (suite *ItemUnitSuite) TestUnindexedLazyItem() {
nil,
"foo",
time.Time{},
count.New(),
fault.New(true))
var item data.Item = lazy
@ -296,6 +298,7 @@ func (suite *ItemUnitSuite) TestLazyItem() {
test.mid,
id,
now,
count.New(),
errs)
assert.Equal(t, id, item.ID(), "ID")
@ -354,7 +357,7 @@ func (suite *ItemUnitSuite) TestLazyItem_DeletedInFlight() {
mid := &mockItemDataGetter{delInFlight: true}
defer mid.check(t, true)
item := data.NewLazyItemWithInfo(ctx, mid, id, now, errs)
item := data.NewLazyItemWithInfo(ctx, mid, id, now, count.New(), errs)
assert.Equal(t, id, item.ID(), "ID")
assert.False(t, item.Deleted(), "deleted")
@ -400,7 +403,7 @@ func (suite *ItemUnitSuite) TestLazyItem_InfoBeforeReadErrors() {
mid := &mockItemDataGetter{}
defer mid.check(t, false)
item := data.NewLazyItemWithInfo(ctx, mid, id, now, errs)
item := data.NewLazyItemWithInfo(ctx, mid, id, now, count.New(), errs)
assert.Equal(t, id, item.ID(), "ID")
assert.False(t, item.Deleted(), "deleted")

View File

@ -58,37 +58,39 @@ func (ctrl *Controller) ProduceBackupCollections(
var (
colls []data.BackupCollection
ssmb *prefixmatcher.StringSetMatcher
excludeItems *prefixmatcher.StringSetMatcher
canUsePreviousBackup bool
)
switch service {
case path.ExchangeService:
colls, ssmb, canUsePreviousBackup, err = exchange.ProduceBackupCollections(
colls, excludeItems, canUsePreviousBackup, err = exchange.ProduceBackupCollections(
ctx,
bpc,
ctrl.AC,
ctrl.credentials.AzureTenantID,
ctrl.credentials,
ctrl.UpdateStatus,
counter,
errs)
if err != nil {
return nil, nil, false, err
}
case path.OneDriveService:
colls, ssmb, canUsePreviousBackup, err = onedrive.ProduceBackupCollections(
colls, excludeItems, canUsePreviousBackup, err = onedrive.ProduceBackupCollections(
ctx,
bpc,
ctrl.AC,
ctrl.credentials.AzureTenantID,
ctrl.credentials,
ctrl.UpdateStatus,
counter,
errs)
if err != nil {
return nil, nil, false, err
}
case path.SharePointService:
colls, ssmb, canUsePreviousBackup, err = sharepoint.ProduceBackupCollections(
colls, excludeItems, canUsePreviousBackup, err = sharepoint.ProduceBackupCollections(
ctx,
bpc,
ctrl.AC,
@ -101,12 +103,13 @@ func (ctrl *Controller) ProduceBackupCollections(
}
case path.GroupsService:
colls, ssmb, err = groups.ProduceBackupCollections(
colls, excludeItems, err = groups.ProduceBackupCollections(
ctx,
bpc,
ctrl.AC,
ctrl.credentials,
ctrl.UpdateStatus,
counter,
errs)
if err != nil {
return nil, nil, false, err
@ -132,7 +135,7 @@ func (ctrl *Controller) ProduceBackupCollections(
}
}
return colls, ssmb, canUsePreviousBackup, nil
return colls, excludeItems, canUsePreviousBackup, nil
}
func (ctrl *Controller) IsServiceEnabled(

View File

@ -143,8 +143,9 @@ func (suite *DataCollectionIntgSuite) TestExchangeDataCollection() {
ctx,
bpc,
suite.ac,
suite.tenantID,
suite.ac.Credentials,
ctrl.UpdateStatus,
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.True(t, canUsePreviousBackup, "can use previous backup")

View File

@ -1,4 +1,3 @@
// Package drive provides support for retrieving M365 Drive objects
package drive
import (
@ -22,6 +21,7 @@ import (
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/extensions"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
@ -88,6 +88,8 @@ type Collection struct {
doNotMergeItems bool
urlCache getItemPropertyer
counter *count.Bus
}
func pathToLocation(p path.Path) (*path.Builder, error) {
@ -115,6 +117,7 @@ func NewCollection(
isPackageOrChildOfPackage bool,
doNotMergeItems bool,
urlCache getItemPropertyer,
counter *count.Bus,
) (*Collection, error) {
// TODO(ashmrtn): If OneDrive switches to using folder IDs then this will need
// to be changed as we won't be able to extract path information from the
@ -140,7 +143,8 @@ func NewCollection(
ctrlOpts,
isPackageOrChildOfPackage,
doNotMergeItems,
urlCache)
urlCache,
counter)
c.locPath = locPath
c.prevLocPath = prevLocPath
@ -159,6 +163,7 @@ func newColl(
isPackageOrChildOfPackage bool,
doNotMergeItems bool,
urlCache getItemPropertyer,
counter *count.Bus,
) *Collection {
dataCh := make(chan data.Item, graph.Parallelism(path.OneDriveMetadataService).CollectionBufferSize())
@ -172,10 +177,11 @@ func newColl(
data: dataCh,
statusUpdater: statusUpdater,
ctrl: ctrlOpts,
state: data.StateOf(prevPath, currPath),
state: data.StateOf(prevPath, currPath, counter),
isPackageOrChildOfPackage: isPackageOrChildOfPackage,
doNotMergeItems: doNotMergeItems,
urlCache: urlCache,
counter: counter,
}
return c
@ -228,7 +234,7 @@ func (oc Collection) PreviousPath() path.Path {
func (oc *Collection) SetFullPath(curPath path.Path) {
oc.folderPath = curPath
oc.state = data.StateOf(oc.prevPath, curPath)
oc.state = data.StateOf(oc.prevPath, curPath, oc.counter)
}
func (oc Collection) LocationPath() *path.Builder {
@ -263,7 +269,13 @@ func (oc *Collection) getDriveItemContent(
itemName = ptr.Val(item.GetName())
)
itemData, err := downloadContent(ctx, oc.handler, oc.urlCache, item, oc.driveID)
itemData, err := downloadContent(
ctx,
oc.handler,
oc.urlCache,
item,
oc.driveID,
oc.counter)
if err != nil {
if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) {
logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipMalware).Info("item flagged as malware")
@ -334,6 +346,7 @@ func downloadContent(
uc getItemPropertyer,
item models.DriveItemable,
driveID string,
counter *count.Bus,
) (io.ReadCloser, error) {
itemID := ptr.Val(item.GetId())
ctx = clues.Add(ctx, "item_id", itemID)
@ -359,6 +372,7 @@ func downloadContent(
// to preserve existing behavior. Fallback to refetching the item using the
// API.
logger.CtxErr(ctx, err).Info("url cache miss: refetching from API")
counter.Inc(count.ItemDownloadURLRefetch)
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
if err != nil {
@ -428,7 +442,9 @@ func (oc *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
// `details.OneDriveInfo`
parentPath, err := path.GetDriveFolderPath(oc.folderPath)
if err != nil {
logger.CtxErr(ctx, err).Info("getting drive folder path")
oc.reportAsCompleted(ctx, 0, 0, 0)
return
}
@ -559,10 +575,15 @@ func (oc *Collection) streamDriveItem(
if isFile {
atomic.AddInt64(&stats.itemsFound, 1)
if oc.counter.Inc(count.StreamItemsFound)%1000 == 0 {
logger.Ctx(ctx).Infow("item stream progress", "stats", oc.counter.Values())
}
metaFileName = itemID
metaSuffix = metadata.MetaFileSuffix
} else {
atomic.AddInt64(&stats.dirsFound, 1)
oc.counter.Inc(count.StreamDirsFound)
// metaFileName not set for directories so we get just ".dirmeta"
metaSuffix = metadata.DirMetaFileSuffix
@ -588,6 +609,15 @@ func (oc *Collection) streamDriveItem(
ctx = clues.Add(ctx, "item_info", itemInfo)
// Drive content download requests are also rate limited by graph api.
// Ensure that this request goes through the drive limiter & not the default
// limiter.
ctx = graph.BindRateLimiterConfig(
ctx,
graph.LimiterCfg{
Service: path.OneDriveService,
})
if isFile {
dataSuffix := metadata.DataFileSuffix
@ -607,6 +637,7 @@ func (oc *Collection) streamDriveItem(
},
itemID+dataSuffix,
itemInfo.Modified(),
oc.counter,
errs)
}
@ -640,11 +671,14 @@ func (oc *Collection) streamDriveItem(
// Item read successfully, add to collection
if isFile {
oc.counter.Inc(count.StreamItemsAdded)
atomic.AddInt64(&stats.itemsRead, 1)
} else {
oc.counter.Inc(count.StreamDirsAdded)
atomic.AddInt64(&stats.dirsRead, 1)
}
oc.counter.Add(count.StreamBytesAdded, itemSize)
atomic.AddInt64(&stats.byteCount, itemSize)
}

View File

@ -29,6 +29,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/extensions"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
@ -215,7 +216,8 @@ func (suite *CollectionUnitSuite) TestCollection() {
control.Options{ToggleFeatures: control.Toggles{}},
false,
true,
nil)
nil,
count.New())
require.NoError(t, err, clues.ToCore(err))
require.NotNil(t, coll)
assert.Equal(t, folderPath, coll.FullPath())
@ -337,7 +339,8 @@ func (suite *CollectionUnitSuite) TestCollectionReadError() {
control.Options{ToggleFeatures: control.Toggles{}},
false,
true,
nil)
nil,
count.New())
require.NoError(t, err, clues.ToCore(err))
stubItem := odTD.NewStubDriveItem(
@ -415,7 +418,8 @@ func (suite *CollectionUnitSuite) TestCollectionReadUnauthorizedErrorRetry() {
control.Options{ToggleFeatures: control.Toggles{}},
false,
true,
nil)
nil,
count.New())
require.NoError(t, err, clues.ToCore(err))
coll.Add(stubItem)
@ -471,7 +475,8 @@ func (suite *CollectionUnitSuite) TestCollectionPermissionBackupLatestModTime()
control.Options{ToggleFeatures: control.Toggles{}},
false,
true,
nil)
nil,
count.New())
require.NoError(t, err, clues.ToCore(err))
mtime := time.Now().AddDate(0, -1, 0)
@ -814,7 +819,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
mbh.GetResps = resps
mbh.GetErrs = test.getErr
r, err := downloadContent(ctx, mbh, test.muc, item, driveID)
r, err := downloadContent(ctx, mbh, test.muc, item, driveID, count.New())
test.expect(t, r)
test.expectErr(t, err, clues.ToCore(err))
})
@ -1002,7 +1007,8 @@ func (suite *CollectionUnitSuite) TestItemExtensions() {
opts,
false,
true,
nil)
nil,
count.New())
require.NoError(t, err, clues.ToCore(err))
stubItem := odTD.NewStubDriveItem(

View File

@ -19,6 +19,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/support"
bupMD "github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/filters"
"github.com/alcionai/corso/src/pkg/logger"
@ -51,6 +52,8 @@ type Collections struct {
NumItems int
NumFiles int
NumContainers int
counter *count.Bus
}
func NewCollections(
@ -59,6 +62,7 @@ func NewCollections(
protectedResource idname.Provider,
statusUpdater support.StatusUpdater,
ctrlOpts control.Options,
counter *count.Bus,
) *Collections {
return &Collections{
handler: bh,
@ -67,6 +71,7 @@ func NewCollections(
CollectionMap: map[string]map[string]*Collection{},
statusUpdater: statusUpdater,
ctrl: ctrlOpts,
counter: counter,
}
}
@ -79,6 +84,7 @@ func (c *Collections) resetStats() {
func deserializeAndValidateMetadata(
ctx context.Context,
cols []data.RestoreCollection,
counter *count.Bus,
fb *fault.Bus,
) (map[string]string, map[string]map[string]string, bool, error) {
deltas, prevs, canUse, err := DeserializeMetadata(ctx, cols)
@ -117,7 +123,7 @@ func deserializeAndValidateMetadata(
}
}
alertIfPrevPathsHaveCollisions(ctx, prevs, fb)
alertIfPrevPathsHaveCollisions(ctx, prevs, counter, fb)
return deltas, prevs, canUse, nil
}
@ -125,6 +131,7 @@ func deserializeAndValidateMetadata(
func alertIfPrevPathsHaveCollisions(
ctx context.Context,
prevs map[string]map[string]string,
counter *count.Bus,
fb *fault.Bus,
) {
for driveID, folders := range prevs {
@ -150,6 +157,8 @@ func alertIfPrevPathsHaveCollisions(
"collision_drive_id": driveID,
"collision_prev_path": prev,
}))
counter.Inc(count.PreviousPathMetadataCollision)
}
prevPathCollisions[prev] = fid
@ -274,7 +283,11 @@ func (c *Collections) Get(
ssmb *prefixmatcher.StringSetMatchBuilder,
errs *fault.Bus,
) ([]data.BackupCollection, bool, error) {
deltasByDriveID, prevPathsByDriveID, canUsePrevBackup, err := deserializeAndValidateMetadata(ctx, prevMetadata, errs)
deltasByDriveID, prevPathsByDriveID, canUsePrevBackup, err := deserializeAndValidateMetadata(
ctx,
prevMetadata,
c.counter,
errs)
if err != nil {
return nil, false, err
}
@ -295,6 +308,9 @@ func (c *Collections) Get(
return nil, false, err
}
c.counter.Add(count.Drives, int64(len(drives)))
c.counter.Add(count.PrevDeltas, int64(len(deltasByDriveID)))
var (
driveIDToDeltaLink = map[string]string{}
driveIDToPrevPaths = map[string]map[string]string{}
@ -303,6 +319,7 @@ func (c *Collections) Get(
for _, d := range drives {
var (
cl = c.counter.Local()
driveID = ptr.Val(d.GetId())
driveName = ptr.Val(d.GetName())
ictx = clues.Add(
@ -322,6 +339,8 @@ func (c *Collections) Get(
packagePaths = map[string]struct{}{}
)
ictx = clues.AddLabelCounter(ictx, cl.PlainAdder())
delete(driveTombstones, driveID)
if _, ok := driveIDToPrevPaths[driveID]; !ok {
@ -332,6 +351,7 @@ func (c *Collections) Get(
c.CollectionMap[driveID] = map[string]*Collection{}
}
cl.Add(count.PrevPaths, int64(len(oldPrevPaths)))
logger.Ctx(ictx).Infow(
"previous metadata for drive",
"count_old_prev_paths", len(oldPrevPaths))
@ -344,6 +364,7 @@ func (c *Collections) Get(
excludedItemIDs,
packagePaths,
prevDeltaLink,
cl.Local(),
errs)
if err != nil {
return nil, false, clues.Stack(err)
@ -373,6 +394,8 @@ func (c *Collections) Get(
numDriveItems := c.NumItems - numPrevItems
numPrevItems = c.NumItems
cl.Add(count.NewPrevPaths, int64(len(newPrevPaths)))
// Attach an url cache to the drive if the number of discovered items is
// below the threshold. Attaching cache to larger drives can cause
// performance issues since cache delta queries start taking up majority of
@ -387,6 +410,7 @@ func (c *Collections) Get(
prevDeltaLink,
urlCacheRefreshInterval,
c.handler,
cl,
errs)
if err != nil {
return nil, false, clues.Stack(err)
@ -446,7 +470,8 @@ func (c *Collections) Get(
c.ctrl,
false,
true,
nil)
nil,
cl.Local())
if err != nil {
return nil, false, clues.WrapWC(ictx, err, "making collection")
}
@ -464,11 +489,13 @@ func (c *Collections) Get(
}
}
c.counter.Add(count.DriveTombstones, int64(len(driveTombstones)))
// generate tombstones for drives that were removed.
for driveID := range driveTombstones {
prevDrivePath, err := c.handler.PathPrefix(c.tenantID, driveID)
if err != nil {
return nil, false, clues.WrapWC(ctx, err, "making drive tombstone for previous path")
return nil, false, clues.WrapWC(ctx, err, "making drive tombstone for previous path").Label(count.BadPathPrefix)
}
coll, err := NewCollection(
@ -481,7 +508,8 @@ func (c *Collections) Get(
c.ctrl,
false,
true,
nil)
nil,
c.counter.Local())
if err != nil {
return nil, false, clues.WrapWC(ctx, err, "making drive tombstone")
}
@ -489,7 +517,7 @@ func (c *Collections) Get(
collections = append(collections, coll)
}
alertIfPrevPathsHaveCollisions(ctx, driveIDToPrevPaths, errs)
alertIfPrevPathsHaveCollisions(ctx, driveIDToPrevPaths, c.counter, errs)
// add metadata collections
pathPrefix, err := c.handler.MetadataPathPrefix(c.tenantID)
@ -508,7 +536,8 @@ func (c *Collections) Get(
graph.NewMetadataEntry(bupMD.PreviousPathFileName, driveIDToPrevPaths),
graph.NewMetadataEntry(bupMD.DeltaURLsFileName, driveIDToDeltaLink),
},
c.statusUpdater)
c.statusUpdater,
count.New())
if err != nil {
// Technically it's safe to continue here because the logic for starting an
@ -563,13 +592,17 @@ func updateCollectionPaths(
}
func (c *Collections) handleDelete(
ctx context.Context,
itemID, driveID string,
oldPrevPaths, currPrevPaths, newPrevPaths map[string]string,
isFolder bool,
excluded map[string]struct{},
invalidPrevDelta bool,
counter *count.Bus,
) error {
if !isFolder {
counter.Inc(count.DeleteItemMarker)
// Try to remove the item from the Collection if an entry exists for this
// item. This handles cases where an item was created and deleted during the
// same delta query.
@ -597,6 +630,8 @@ func (c *Collections) handleDelete(
return nil
}
counter.Inc(count.DeleteFolderMarker)
var prevPath path.Path
prevPathStr, ok := oldPrevPaths[itemID]
@ -605,11 +640,12 @@ func (c *Collections) handleDelete(
prevPath, err = path.FromDataLayerPath(prevPathStr, false)
if err != nil {
return clues.Wrap(err, "invalid previous path").
return clues.WrapWC(ctx, err, "invalid previous path").
With(
"drive_id", driveID,
"item_id", itemID,
"path_string", prevPathStr)
"path_string", prevPathStr).
Label(count.BadPrevPath)
}
}
@ -644,7 +680,8 @@ func (c *Collections) handleDelete(
false,
// DoNotMerge is not checked for deleted items.
false,
nil)
nil,
counter.Local())
if err != nil {
return clues.Wrap(err, "making collection").With(
"drive_id", driveID,
@ -716,6 +753,7 @@ func (c *Collections) PopulateDriveCollections(
excludedItemIDs map[string]struct{},
topLevelPackages map[string]struct{},
prevDeltaLink string,
counter *count.Bus,
errs *fault.Bus,
) (pagers.DeltaUpdate, map[string]string, error) {
var (
@ -755,7 +793,11 @@ func (c *Collections) PopulateDriveCollections(
break
}
counter.Inc(count.PagesEnumerated)
if reset {
counter.Inc(count.PagerResets)
ctx = clues.Add(ctx, "delta_reset_occurred", true)
newPrevPaths = map[string]string{}
currPrevPaths = map[string]string{}
@ -783,6 +825,7 @@ func (c *Collections) PopulateDriveCollections(
excludedItemIDs,
topLevelPackages,
invalidPrevDelta,
counter,
el)
if err != nil {
el.AddRecoverable(ctx, clues.Stack(err))
@ -795,6 +838,8 @@ func (c *Collections) PopulateDriveCollections(
return du, nil, clues.Stack(err)
}
logger.Ctx(ctx).Infow("populated collection", "stats", counter.Values())
return du, newPrevPaths, el.Failure()
}
@ -807,6 +852,7 @@ func (c *Collections) processItem(
excludedItemIDs map[string]struct{},
topLevelPackages map[string]struct{},
invalidPrevDelta bool,
counter *count.Bus,
skipper fault.AddSkipper,
) error {
var (
@ -831,6 +877,7 @@ func (c *Collections) processItem(
skipper.AddSkip(ctx, skip)
logger.Ctx(ctx).Infow("malware detected", "item_details", addtl)
counter.Inc(count.Malware)
return nil
}
@ -838,6 +885,7 @@ func (c *Collections) processItem(
// Deleted file or folder.
if item.GetDeleted() != nil {
err := c.handleDelete(
ctx,
itemID,
driveID,
oldPrevPaths,
@ -845,20 +893,22 @@ func (c *Collections) processItem(
newPrevPaths,
isFolder,
excludedItemIDs,
invalidPrevDelta)
invalidPrevDelta,
counter)
return clues.StackWC(ctx, err).OrNil()
}
collectionPath, err := c.getCollectionPath(driveID, item)
if err != nil {
return clues.StackWC(ctx, err).
Label(fault.LabelForceNoBackupCreation)
return clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation, count.BadCollPath)
}
// Skip items that don't match the folder selectors we were given.
if shouldSkip(ctx, collectionPath, c.handler, driveName) {
counter.Inc(count.SkippedContainers)
logger.Ctx(ctx).Debugw("path not selected", "skipped_path", collectionPath.String())
return nil
}
@ -872,7 +922,8 @@ func (c *Collections) processItem(
prevPath, err = path.FromDataLayerPath(prevPathStr, false)
if err != nil {
return clues.WrapWC(ctx, err, "invalid previous path").
With("prev_path_string", path.LoggableDir(prevPathStr))
With("prev_path_string", path.LoggableDir(prevPathStr)).
Label(count.BadPrevPath)
}
} else if item.GetRoot() != nil {
// Root doesn't move or get renamed.
@ -899,9 +950,12 @@ func (c *Collections) processItem(
isPackage := item.GetPackageEscaped() != nil
if isPackage {
counter.Inc(count.Packages)
// mark this path as a package type for all other collections.
// any subfolder should get marked as a childOfPackage below.
topLevelPackages[collectionPath.String()] = struct{}{}
} else {
counter.Inc(count.Folders)
}
childOfPackage := filters.
@ -944,7 +998,8 @@ func (c *Collections) processItem(
c.ctrl,
isPackage || childOfPackage,
invalidPrevDelta || collPathAlreadyExists,
nil)
nil,
counter.Local())
if err != nil {
return clues.StackWC(ctx, err)
}
@ -966,9 +1021,11 @@ func (c *Collections) processItem(
}
case item.GetFile() != nil:
counter.Inc(count.Files)
// Deletions are handled above so this is just moves/renames.
if len(ptr.Val(item.GetParentReference().GetId())) == 0 {
return clues.NewWC(ctx, "file without parent ID")
return clues.NewWC(ctx, "file without parent ID").Label(count.MissingParent)
}
// Get the collection for this item.
@ -977,7 +1034,7 @@ func (c *Collections) processItem(
collection, ok := c.CollectionMap[driveID][parentID]
if !ok {
return clues.NewWC(ctx, "item seen before parent folder")
return clues.NewWC(ctx, "item seen before parent folder").Label(count.ItemBeforeParent)
}
// This will only kick in if the file was moved multiple times
@ -1021,7 +1078,7 @@ func (c *Collections) processItem(
default:
return clues.NewWC(ctx, "item is neither folder nor file").
Label(fault.LabelForceNoBackupCreation)
Label(fault.LabelForceNoBackupCreation, count.UnknownItemType)
}
return nil

View File

@ -25,6 +25,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
bupMD "github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -1158,7 +1159,8 @@ func (suite *CollectionsUnitSuite) TestPopulateDriveCollections() {
tenant,
idname.NewProvider(user, user),
nil,
control.Options{ToggleFeatures: control.Toggles{}})
control.Options{ToggleFeatures: control.Toggles{}},
count.New())
c.CollectionMap[driveID] = map[string]*Collection{}
@ -1170,6 +1172,7 @@ func (suite *CollectionsUnitSuite) TestPopulateDriveCollections() {
excludes,
test.topLevelPackages,
"prevdelta",
count.New(),
errs)
test.expect(t, err, clues.ToCore(err))
assert.ElementsMatch(
@ -1618,7 +1621,8 @@ func (suite *CollectionsUnitSuite) TestDeserializeMetadata() {
mc, err := graph.MakeMetadataCollection(
pathPrefix,
c(),
func(*support.ControllerOperationStatus) {})
func(*support.ControllerOperationStatus) {},
count.New())
require.NoError(t, err, clues.ToCore(err))
cols = append(cols, dataMock.NewUnversionedRestoreCollection(
@ -1628,7 +1632,7 @@ func (suite *CollectionsUnitSuite) TestDeserializeMetadata() {
fb := fault.New(true)
deltas, paths, canUsePreviousBackup, err := deserializeAndValidateMetadata(ctx, cols, fb)
deltas, paths, canUsePreviousBackup, err := deserializeAndValidateMetadata(ctx, cols, count.New(), fb)
test.errCheck(t, err)
assert.Equal(t, test.canUsePreviousBackup, canUsePreviousBackup, "can use previous backup")
@ -1656,7 +1660,7 @@ func (suite *CollectionsUnitSuite) TestDeserializeMetadata_ReadFailure() {
fc := failingColl{}
_, _, canUsePreviousBackup, err := deserializeAndValidateMetadata(ctx, []data.RestoreCollection{fc}, fault.New(true))
_, _, canUsePreviousBackup, err := deserializeAndValidateMetadata(ctx, []data.RestoreCollection{fc}, count.New(), fault.New(true))
require.NoError(t, err)
require.False(t, canUsePreviousBackup)
}
@ -3427,7 +3431,8 @@ func (suite *CollectionsUnitSuite) TestGet() {
tenant,
idname.NewProvider(user, user),
func(*support.ControllerOperationStatus) {},
control.Options{ToggleFeatures: control.Toggles{}})
control.Options{ToggleFeatures: control.Toggles{}},
count.New())
prevDelta := "prev-delta"
@ -3447,7 +3452,8 @@ func (suite *CollectionsUnitSuite) TestGet() {
bupMD.PreviousPathFileName,
test.previousPaths),
},
func(*support.ControllerOperationStatus) {})
func(*support.ControllerOperationStatus) {},
count.New())
assert.NoError(t, err, "creating metadata collection", clues.ToCore(err))
prevMetadata := []data.RestoreCollection{
@ -3484,6 +3490,7 @@ func (suite *CollectionsUnitSuite) TestGet() {
t,
data.NoFetchRestoreCollection{Collection: baseCol}),
},
count.New(),
errs)
if !assert.NoError(t, err, "deserializing metadata", clues.ToCore(err)) {
continue
@ -3616,7 +3623,8 @@ func (suite *CollectionsUnitSuite) TestAddURLCacheToDriveCollections() {
tenant,
idname.NewProvider(user, user),
func(*support.ControllerOperationStatus) {},
control.Options{ToggleFeatures: control.Toggles{}})
control.Options{ToggleFeatures: control.Toggles{}},
count.New())
errs := fault.New(true)
delList := prefixmatcher.NewStringSetBuilder()

View File

@ -7,6 +7,7 @@ import (
"github.com/alcionai/corso/src/internal/data"
bupMD "github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/store"
)
@ -14,8 +15,9 @@ import (
func DeserializeMetadataFiles(
ctx context.Context,
colls []data.RestoreCollection,
counter *count.Bus,
) ([]store.MetadataFile, error) {
deltas, prevs, _, err := deserializeAndValidateMetadata(ctx, colls, fault.New(true))
deltas, prevs, _, err := deserializeAndValidateMetadata(ctx, colls, counter, fault.New(true))
files := []store.MetadataFile{
{

View File

@ -284,7 +284,8 @@ func (suite *OneDriveIntgSuite) TestOneDriveNewCollections() {
service.updateStatus,
control.Options{
ToggleFeatures: control.Toggles{},
})
},
count.New())
ssmb := prefixmatcher.NewStringSetBuilder()

View File

@ -10,6 +10,7 @@ import (
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/str"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -49,7 +50,8 @@ type urlCache struct {
enumerator EnumerateDriveItemsDeltaer
errs *fault.Bus
counter *count.Bus
errs *fault.Bus
}
// newURLache creates a new URL cache for the specified drive ID
@ -57,6 +59,7 @@ func newURLCache(
driveID, prevDelta string,
refreshInterval time.Duration,
enumerator EnumerateDriveItemsDeltaer,
counter *count.Bus,
errs *fault.Bus,
) (*urlCache, error) {
err := validateCacheParams(driveID, refreshInterval, enumerator)
@ -71,6 +74,7 @@ func newURLCache(
enumerator: enumerator,
prevDelta: prevDelta,
refreshInterval: refreshInterval,
counter: counter,
errs: errs,
},
nil
@ -148,6 +152,8 @@ func (uc *urlCache) refreshCache(
return nil
}
uc.counter.Inc(count.URLCacheRefresh)
// Hold cache lock in write mode for the entire duration of the refresh.
// This is to prevent other threads from reading the cache while it is
// being updated page by page
@ -201,6 +207,7 @@ func (uc *urlCache) readCache(
props, ok := uc.idToProps[itemID]
if !ok {
uc.counter.Inc(count.URLCacheMiss)
return itemProps{}, clues.NewWC(ctx, "item not found in cache")
}

View File

@ -148,6 +148,7 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
du.URL,
1*time.Hour,
suite.ac.Drives(),
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -578,6 +579,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
"",
1*time.Hour,
&medi,
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -622,6 +624,7 @@ func (suite *URLCacheUnitSuite) TestNeedsRefresh() {
"",
refreshInterval,
&mock.EnumerateItemsDeltaByDrive{},
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -694,6 +697,7 @@ func (suite *URLCacheUnitSuite) TestNewURLCache() {
"",
test.refreshInt,
test.itemPager,
count.New(),
test.errors)
test.expectErr(t, err, clues.ToCore(err))

View File

@ -14,6 +14,7 @@ import (
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -31,6 +32,7 @@ func CreateCollections(
scope selectors.ExchangeScope,
dps metadata.DeltaPaths,
su support.StatusUpdater,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, error) {
ctx = clues.Add(ctx, "category", scope.Category().PathType())
@ -78,11 +80,14 @@ func CreateCollections(
scope,
dps,
bpc.Options,
counter,
errs)
if err != nil {
return nil, clues.Wrap(err, "filling collections")
}
counter.Add(count.Collections, int64(len(collections)))
for _, coll := range collections {
allCollections = append(allCollections, coll)
}
@ -108,6 +113,7 @@ func populateCollections(
scope selectors.ExchangeScope,
dps metadata.DeltaPaths,
ctrlOpts control.Options,
counter *count.Bus,
errs *fault.Bus,
) (map[string]data.BackupCollection, error) {
var (
@ -123,6 +129,7 @@ func populateCollections(
)
logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps))
counter.Add(count.PrevDeltas, int64(len(dps)))
el := errs.Local()
@ -133,6 +140,7 @@ func populateCollections(
var (
err error
cl = counter.Local()
itemConfig = api.CallConfig{
CanMakeDeltaQueries: !ctrlOpts.ToggleFeatures.DisableDelta,
UseImmutableIDs: ctrlOpts.ToggleFeatures.ExchangeImmutableIDs,
@ -152,9 +160,12 @@ func populateCollections(
})
)
ictx = clues.AddLabelCounter(ictx, cl.PlainAdder())
// Only create a collection if the path matches the scope.
currPath, locPath, ok := includeContainer(ictx, qp, c, scope, category)
if !ok {
cl.Inc(count.SkippedContainers)
continue
}
@ -167,6 +178,7 @@ func populateCollections(
if len(prevPathStr) > 0 {
if prevPath, err = pathFromPrevString(prevPathStr); err != nil {
err = clues.Stack(err).Label(count.BadPrevPath)
logger.CtxErr(ictx, err).Error("parsing prev path")
// if the previous path is unusable, then the delta must be, too.
prevDelta = ""
@ -200,6 +212,7 @@ func populateCollections(
deltaURLs[cID] = addAndRem.DU.URL
} else if !addAndRem.DU.Reset {
logger.Ctx(ictx).Info("missing delta url")
cl.Inc(count.MissingDelta)
}
edc := NewCollection(
@ -208,17 +221,19 @@ func populateCollections(
prevPath,
locPath,
ctrlOpts,
addAndRem.DU.Reset),
addAndRem.DU.Reset,
cl),
qp.ProtectedResource.ID(),
bh.itemHandler(),
addAndRem.Added,
addAndRem.Removed,
// TODO: produce a feature flag that allows selective
// enabling of valid modTimes. This currently produces
// rare-case failures with incorrect details merging.
// rare failures with incorrect details merging.
// Root cause is not yet known.
false,
statusUpdater)
statusUpdater,
cl)
collections[cID] = edc
@ -242,7 +257,10 @@ func populateCollections(
)
if collections[id] != nil {
el.AddRecoverable(ctx, clues.WrapWC(ictx, err, "conflict: tombstone exists for a live collection"))
err := clues.WrapWC(ictx, err, "conflict: tombstone exists for a live collection").
Label(count.CollectionTombstoneConflict)
el.AddRecoverable(ctx, err)
continue
}
@ -254,18 +272,18 @@ func populateCollections(
prevPath, err := pathFromPrevString(p)
if err != nil {
err = clues.StackWC(ctx, err).Label(count.BadPrevPath)
// technically shouldn't ever happen. But just in case...
logger.CtxErr(ictx, err).Error("parsing tombstone prev path")
continue
}
collections[id] = data.NewTombstoneCollection(prevPath, ctrlOpts)
collections[id] = data.NewTombstoneCollection(prevPath, ctrlOpts, counter)
}
logger.Ctx(ctx).Infow(
"adding metadata collection entries",
"num_paths_entries", len(currPaths),
"num_deltas_entries", len(deltaURLs))
counter.Add(count.NewDeltas, int64(len(deltaURLs)))
counter.Add(count.NewPrevPaths, int64(len(currPaths)))
pathPrefix, err := path.BuildMetadata(
qp.TenantID,
@ -283,15 +301,14 @@ func populateCollections(
graph.NewMetadataEntry(metadata.PreviousPathFileName, currPaths),
graph.NewMetadataEntry(metadata.DeltaURLsFileName, deltaURLs),
},
statusUpdater)
statusUpdater,
count.New())
if err != nil {
return nil, clues.Wrap(err, "making metadata collection")
}
collections["metadata"] = col
logger.Ctx(ctx).Infow("produced collections", "count_collections", len(collections))
return collections, el.Failure()
}

View File

@ -332,7 +332,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
coll, err := graph.MakeMetadataCollection(
pathPrefix,
entries,
func(cos *support.ControllerOperationStatus) {})
func(cos *support.ControllerOperationStatus) {},
count.New())
require.NoError(t, err, clues.ToCore(err))
cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{
@ -513,6 +514,7 @@ func (suite *BackupIntgSuite) TestMailFetch() {
test.scope,
metadata.DeltaPaths{},
func(status *support.ControllerOperationStatus) {},
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -593,6 +595,7 @@ func (suite *BackupIntgSuite) TestDelta() {
test.scope,
metadata.DeltaPaths{},
func(status *support.ControllerOperationStatus) {},
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.Less(t, 1, len(collections), "retrieved metadata and data collections")
@ -625,6 +628,7 @@ func (suite *BackupIntgSuite) TestDelta() {
test.scope,
dps,
func(status *support.ControllerOperationStatus) {},
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
})
@ -664,6 +668,7 @@ func (suite *BackupIntgSuite) TestMailSerializationRegression() {
sel.Scopes()[0],
metadata.DeltaPaths{},
newStatusUpdater(t, &wg),
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -750,6 +755,7 @@ func (suite *BackupIntgSuite) TestContactSerializationRegression() {
test.scope,
metadata.DeltaPaths{},
newStatusUpdater(t, &wg),
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -859,6 +865,7 @@ func (suite *BackupIntgSuite) TestEventsSerializationRegression() {
test.scope,
metadata.DeltaPaths{},
newStatusUpdater(t, &wg),
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
require.Len(t, collections, 2)
@ -1135,6 +1142,7 @@ func (suite *CollectionPopulationSuite) TestPopulateCollections() {
test.scope,
dps,
ctrlOpts,
count.New(),
fault.New(test.failFast == control.FailFast))
test.expectErr(t, err, clues.ToCore(err))
@ -1477,6 +1485,7 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_D
sc.scope,
test.inputMetadata(t, qp.Category),
control.Options{FailureHandling: control.FailFast},
count.New(),
fault.New(true))
require.NoError(t, err, "getting collections", clues.ToCore(err))
@ -1643,6 +1652,7 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_r
allScope,
dps,
control.Options{FailureHandling: control.FailFast},
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -2064,6 +2074,7 @@ func (suite *CollectionPopulationSuite) TestFilterContainersAndFillCollections_i
allScope,
test.dps,
ctrlOpts,
count.New(),
fault.New(true))
assert.NoError(t, err, clues.ToCore(err))

View File

@ -18,6 +18,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -110,6 +111,7 @@ func NewCollection(
origRemoved []string,
validModTimes bool,
statusUpdater support.StatusUpdater,
counter *count.Bus,
) data.BackupCollection {
added := maps.Clone(origAdded)
removed := make(map[string]struct{}, len(origRemoved))
@ -127,6 +129,9 @@ func NewCollection(
removed[r] = struct{}{}
}
counter.Add(count.ItemsAdded, int64(len(added)))
counter.Add(count.ItemsRemoved, int64(len(removed)))
if !validModTimes {
return &prefetchCollection{
BaseCollection: bc,
@ -145,6 +150,7 @@ func NewCollection(
removed: removed,
getter: items,
statusUpdater: statusUpdater,
counter: counter,
}
}
@ -188,13 +194,17 @@ func (col *prefetchCollection) streamItems(
colProgress chan<- struct{}
user = col.user
log = logger.Ctx(ctx).With(
"service", path.ExchangeService.String(),
"category", col.Category().String())
)
ctx = clues.Add(
ctx,
"category", col.Category().String())
defer func() {
close(stream)
logger.Ctx(ctx).Infow(
"finished stream backup collection items",
"stats", col.Counter.Values())
updateStatus(
ctx,
col.statusUpdater,
@ -228,6 +238,10 @@ func (col *prefetchCollection) streamItems(
stream <- data.NewDeletedItem(id)
if col.Counter.Inc(count.StreamItemsRemoved)%1000 == 0 {
logger.Ctx(ctx).Infow("item removal stream progress", "stats", col.Counter.Values())
}
atomic.AddInt64(&success, 1)
if colProgress != nil {
@ -268,9 +282,11 @@ func (col *prefetchCollection) streamItems(
// nothing else we can do, and not reporting it will make the status
// investigation upset.
if graph.IsErrDeletedInFlight(err) {
col.Counter.Inc(count.StreamItemsDeletedInFlight)
atomic.AddInt64(&success, 1)
log.With("err", err).Infow("item not found", clues.InErr(err).Slice()...)
logger.CtxErr(ctx, err).Info("item not found")
} else {
col.Counter.Inc(count.StreamItemsErrored)
el.AddRecoverable(ctx, clues.Wrap(err, "fetching item").Label(fault.LabelForceNoBackupCreation))
}
@ -282,6 +298,7 @@ func (col *prefetchCollection) streamItems(
id,
details.ItemInfo{Exchange: info})
if err != nil {
col.Counter.Inc(count.StreamItemsErrored)
el.AddRecoverable(
ctx,
clues.StackWC(ctx, err).
@ -292,6 +309,12 @@ func (col *prefetchCollection) streamItems(
stream <- item
col.Counter.Add(count.StreamBytesAdded, info.Size)
if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 {
logger.Ctx(ctx).Infow("item addition stream progress", "stats", col.Counter.Values())
}
atomic.AddInt64(&success, 1)
atomic.AddInt64(&totalBytes, info.Size)
@ -329,6 +352,8 @@ type lazyFetchCollection struct {
getter itemGetterSerializer
statusUpdater support.StatusUpdater
counter *count.Bus
}
// Items utility function to asynchronously execute process to fill data channel with
@ -413,6 +438,7 @@ func (col *lazyFetchCollection) streamItems(
},
id,
modTime,
col.counter,
errs)
atomic.AddInt64(&success, 1)

View File

@ -24,6 +24,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
@ -146,13 +147,15 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() {
test.prev,
test.loc,
control.DefaultOptions(),
false),
false,
count.New()),
"u",
mock.DefaultItemGetSerialize(),
nil,
nil,
colType.validModTimes,
nil)
nil,
count.New())
assert.Equal(t, test.expect, c.State(), "collection state")
assert.Equal(t, test.curr, c.FullPath(), "full path")
assert.Equal(t, test.prev, c.PreviousPath(), "prev path")
@ -289,13 +292,15 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_Items() {
nil,
locPath.ToBuilder(),
control.DefaultOptions(),
false),
false,
count.New()),
"",
&mock.ItemGetSerialize{},
test.added,
maps.Keys(test.removed),
false,
statusUpdater)
statusUpdater,
count.New())
for item := range col.Items(ctx, errs) {
itemCount++
@ -427,13 +432,15 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
nil,
locPath.ToBuilder(),
control.DefaultOptions(),
false),
false,
count.New()),
"",
mlg,
test.added,
maps.Keys(test.removed),
true,
statusUpdater)
statusUpdater,
count.New())
for item := range col.Items(ctx, errs) {
itemCount++
@ -499,6 +506,7 @@ func (suite *CollectionUnitSuite) TestLazyItem_NoRead_GetInfo_Errors() {
nil,
"itemID",
time.Now(),
count.New(),
fault.New(true))
_, err := li.Info()
@ -564,6 +572,7 @@ func (suite *CollectionUnitSuite) TestLazyItem_GetDataErrors() {
},
"itemID",
now,
count.New(),
fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted")
@ -604,6 +613,7 @@ func (suite *CollectionUnitSuite) TestLazyItem_ReturnsEmptyReaderOnDeletedInFlig
},
"itemID",
now,
count.New(),
fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted")
@ -657,6 +667,7 @@ func (suite *CollectionUnitSuite) TestLazyItem() {
},
"itemID",
now,
count.New(),
fault.New(true))
assert.False(t, li.Deleted(), "item shouldn't be marked deleted")

View File

@ -15,6 +15,7 @@ import (
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -38,6 +39,7 @@ func CreateCollections(
tenantID string,
scope selectors.GroupsScope,
su support.StatusUpdater,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, bool, error) {
ctx = clues.Add(ctx, "category", scope.Category().PathType())
@ -64,6 +66,8 @@ func CreateCollections(
return nil, false, clues.Stack(err)
}
counter.Add(count.Channels, int64(len(channels)))
collections, err := populateCollections(
ctx,
qp,
@ -73,6 +77,7 @@ func CreateCollections(
scope,
cdps[scope.Category().PathType()],
bpc.Options,
counter,
errs)
if err != nil {
return nil, false, clues.Wrap(err, "filling collections")
@ -94,6 +99,7 @@ func populateCollections(
scope selectors.GroupsScope,
dps metadata.DeltaPaths,
ctrlOpts control.Options,
counter *count.Bus,
errs *fault.Bus,
) (map[string]data.BackupCollection, error) {
var (
@ -117,6 +123,7 @@ func populateCollections(
}
var (
cl = counter.Local()
cID = ptr.Val(c.GetId())
cName = ptr.Val(c.GetDisplayName())
err error
@ -134,15 +141,19 @@ func populateCollections(
})
)
ictx = clues.AddLabelCounter(ictx, cl.PlainAdder())
delete(tombstones, cID)
// Only create a collection if the path matches the scope.
if !bh.includeContainer(ictx, qp, c, scope) {
cl.Inc(count.SkippedContainers)
continue
}
if len(prevPathStr) > 0 {
if prevPath, err = pathFromPrevString(prevPathStr); err != nil {
err = clues.StackWC(ctx, err).Label(count.BadPrevPath)
logger.CtxErr(ictx, err).Error("parsing prev path")
// if the previous path is unusable, then the delta must be, too.
prevDelta = ""
@ -166,6 +177,9 @@ func populateCollections(
added := str.SliceToMap(maps.Keys(addAndRem.Added))
removed := str.SliceToMap(addAndRem.Removed)
cl.Add(count.ItemsAdded, int64(len(added)))
cl.Add(count.ItemsRemoved, int64(len(removed)))
if len(addAndRem.DU.URL) > 0 {
deltaURLs[cID] = addAndRem.DU.URL
} else if !addAndRem.DU.Reset {
@ -174,7 +188,9 @@ func populateCollections(
currPath, err := bh.canonicalPath(path.Builder{}.Append(cID), qp.TenantID)
if err != nil {
el.AddRecoverable(ctx, clues.Stack(err))
err = clues.StackWC(ctx, err).Label(count.BadCollPath)
el.AddRecoverable(ctx, err)
continue
}
@ -191,7 +207,8 @@ func populateCollections(
prevPath,
path.Builder{}.Append(cName),
ctrlOpts,
addAndRem.DU.Reset),
addAndRem.DU.Reset,
cl),
bh,
qp.ProtectedResource.ID(),
added,
@ -219,7 +236,9 @@ func populateCollections(
)
if collections[id] != nil {
el.AddRecoverable(ictx, clues.WrapWC(ictx, err, "conflict: tombstone exists for a live collection"))
err := clues.NewWC(ictx, "conflict: tombstone exists for a live collection").Label(count.CollectionTombstoneConflict)
el.AddRecoverable(ctx, err)
continue
}
@ -231,12 +250,14 @@ func populateCollections(
prevPath, err := pathFromPrevString(p)
if err != nil {
err := clues.StackWC(ctx, err).Label(count.BadPrevPath)
// technically shouldn't ever happen. But just in case...
logger.CtxErr(ictx, err).Error("parsing tombstone prev path")
continue
}
collections[id] = data.NewTombstoneCollection(prevPath, ctrlOpts)
collections[id] = data.NewTombstoneCollection(prevPath, ctrlOpts, counter.Local())
}
logger.Ctx(ctx).Infow(
@ -251,7 +272,8 @@ func populateCollections(
qp.Category,
false)
if err != nil {
return nil, clues.Wrap(err, "making metadata path prefix")
return nil, clues.WrapWC(ctx, err, "making metadata path prefix").
Label(count.BadPathPrefix)
}
col, err := graph.MakeMetadataCollection(
@ -260,14 +282,13 @@ func populateCollections(
graph.NewMetadataEntry(metadata.PreviousPathFileName, currPaths),
graph.NewMetadataEntry(metadata.DeltaURLsFileName, deltaURLs),
},
statusUpdater)
statusUpdater,
counter.Local())
if err != nil {
return nil, clues.Wrap(err, "making metadata collection")
return nil, clues.WrapWC(ctx, err, "making metadata collection")
}
collections["metadata"] = col
logger.Ctx(ctx).Infow("produced collections", "count_collections", len(collections))
return collections, el.Failure()
}

View File

@ -246,6 +246,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() {
selectors.NewGroupsBackup(nil).Channels(selectors.Any())[0],
nil,
ctrlOpts,
count.New(),
fault.New(true))
test.expectErr(t, err, clues.ToCore(err))
assert.Len(t, collections, test.expectColls, "number of collections")
@ -405,6 +406,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections_incremental() {
allScope,
test.deltaPaths,
ctrlOpts,
count.New(),
fault.New(true))
test.expectErr(t, err, clues.ToCore(err))
assert.Len(t, collections, test.expectColls, "number of collections")
@ -527,6 +529,7 @@ func (suite *BackupIntgSuite) TestCreateCollections() {
suite.tenantID,
test.scope,
func(status *support.ControllerOperationStatus) {},
count.New(),
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, collections, "must have at least one collection")

View File

@ -14,6 +14,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
)
@ -90,6 +91,9 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
ctx = clues.Add(ctx, "category", col.Category().String())
defer func() {
logger.Ctx(ctx).Infow(
"finished stream backup collection items",
"stats", col.Counter.Values())
col.finishPopulation(ctx, streamedItems, totalBytes, errs.Failure())
}()
@ -117,7 +121,7 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
col.stream <- data.NewDeletedItem(id)
atomic.AddInt64(&streamedItems, 1)
atomic.AddInt64(&totalBytes, 0)
col.Counter.Inc(count.StreamItemsRemoved)
if colProgress != nil {
colProgress <- struct{}{}
@ -150,26 +154,23 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
parentFolderID,
id)
if err != nil {
el.AddRecoverable(
ctx,
clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation))
err = clues.Wrap(err, "getting channel message data").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return
}
if err := writer.WriteObjectValue("", item); err != nil {
el.AddRecoverable(
ctx,
clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation))
err = clues.Wrap(err, "writing channel message to serializer").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return
}
itemData, err := writer.GetSerializedContent()
if err != nil {
el.AddRecoverable(
ctx,
clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation))
err = clues.Wrap(err, "serializing channel message").Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return
}
@ -181,9 +182,8 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
id,
details.ItemInfo{Groups: info})
if err != nil {
el.AddRecoverable(
ctx,
clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation))
err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
return
}
@ -193,6 +193,12 @@ func (col *Collection) streamItems(ctx context.Context, errs *fault.Bus) {
atomic.AddInt64(&streamedItems, 1)
atomic.AddInt64(&totalBytes, info.Size)
if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 {
logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values())
}
col.Counter.Add(count.StreamBytesAdded, info.Size)
if colProgress != nil {
colProgress <- struct{}{}
}

View File

@ -18,6 +18,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -121,7 +122,8 @@ func (suite *CollectionUnitSuite) TestNewCollection_state() {
test.prev,
test.loc,
control.DefaultOptions(),
false),
false,
count.New()),
nil,
"g",
nil, nil,
@ -202,7 +204,8 @@ func (suite *CollectionUnitSuite) TestCollection_streamItems() {
nil,
locPath.ToBuilder(),
control.DefaultOptions(),
false),
false,
count.New()),
added: test.added,
removed: test.removed,
getter: mock.GetChannelMessage{},

View File

@ -33,6 +33,7 @@ func CollectLibraries(
tenantID string,
ssmb *prefixmatcher.StringSetMatchBuilder,
su support.StatusUpdater,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, bool, error) {
logger.Ctx(ctx).Debug("creating SharePoint Library collections")
@ -44,7 +45,8 @@ func CollectLibraries(
tenantID,
bpc.ProtectedResource,
su,
bpc.Options)
bpc.Options,
counter)
)
msg := fmt.Sprintf(

View File

@ -9,6 +9,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/collection/drive"
"github.com/alcionai/corso/src/internal/m365/collection/exchange"
"github.com/alcionai/corso/src/internal/m365/collection/groups"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/store"
)
@ -28,9 +29,9 @@ func (ctrl *Controller) DeserializeMetadataFiles(
case path.ExchangeService, path.ExchangeMetadataService:
return exchange.DeserializeMetadataFiles(ctx, colls)
case path.OneDriveService, path.OneDriveMetadataService:
return drive.DeserializeMetadataFiles(ctx, colls)
return drive.DeserializeMetadataFiles(ctx, colls, count.New())
case path.SharePointService, path.SharePointMetadataService:
return drive.DeserializeMetadataFiles(ctx, colls)
return drive.DeserializeMetadataFiles(ctx, colls, count.New())
case path.GroupsService, path.GroupsMetadataService:
return groups.DeserializeMetadataFiles(ctx, colls)
default:

View File

@ -10,6 +10,8 @@ import (
"github.com/alcionai/corso/src/internal/m365/collection/exchange"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -23,8 +25,9 @@ func ProduceBackupCollections(
ctx context.Context,
bpc inject.BackupProducerConfig,
ac api.Client,
tenantID string,
creds account.M365Config,
su support.StatusUpdater,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, *prefixmatcher.StringSetMatcher, bool, error) {
eb, err := bpc.Selector.ToExchangeBackup()
@ -35,6 +38,7 @@ func ProduceBackupCollections(
var (
collections = []data.BackupCollection{}
el = errs.Local()
tenantID = creds.AzureTenantID
categories = map[path.CategoryType]struct{}{}
handlers = exchange.BackupHandlers(ac)
)
@ -46,6 +50,7 @@ func ProduceBackupCollections(
if !canMakeDeltaQueries {
logger.Ctx(ctx).Info("delta requests not available")
counter.Inc(count.NoDeltaQueries)
bpc.Options.ToggleFeatures.DisableDelta = true
}
@ -75,6 +80,7 @@ func ProduceBackupCollections(
scope,
cdps[scope.Category().PathType()],
su,
counter,
errs)
if err != nil {
el.AddRecoverable(ctx, err)
@ -95,6 +101,7 @@ func ProduceBackupCollections(
path.ExchangeService,
categories,
su,
counter,
errs)
if err != nil {
return nil, nil, false, err
@ -103,6 +110,8 @@ func ProduceBackupCollections(
collections = append(collections, baseCols...)
}
logger.Ctx(ctx).Infow("produced collections", "stats", counter.Values())
return collections, nil, canUsePreviousBackup, el.Failure()
}

View File

@ -23,6 +23,7 @@ import (
"github.com/alcionai/corso/src/pkg/backup/identity"
"github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -36,6 +37,7 @@ func ProduceBackupCollections(
ac api.Client,
creds account.M365Config,
su support.StatusUpdater,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, *prefixmatcher.StringSetMatcher, error) {
b, err := bpc.Selector.ToGroupsBackup()
@ -71,15 +73,20 @@ func ProduceBackupCollections(
break
}
cl := counter.Local()
ictx := clues.AddLabelCounter(ctx, cl.PlainAdder())
var dbcs []data.BackupCollection
switch scope.Category().PathType() {
case path.LibrariesCategory:
sites, err := ac.Groups().GetAllSites(ctx, bpc.ProtectedResource.ID(), errs)
sites, err := ac.Groups().GetAllSites(ictx, bpc.ProtectedResource.ID(), errs)
if err != nil {
return nil, nil, err
}
cl.Add(count.Sites, int64(len(sites)))
siteMetadataCollection := map[string][]data.RestoreCollection{}
// Once we have metadata collections for chat as well, we will have to filter those out
@ -89,43 +96,51 @@ func ProduceBackupCollections(
}
for _, s := range sites {
pr := idname.NewProvider(ptr.Val(s.GetId()), ptr.Val(s.GetWebUrl()))
sbpc := inject.BackupProducerConfig{
LastBackupVersion: bpc.LastBackupVersion,
Options: bpc.Options,
ProtectedResource: pr,
Selector: bpc.Selector,
MetadataCollections: siteMetadataCollection[ptr.Val(s.GetId())],
}
var (
scl = cl.Local()
pr = idname.NewProvider(ptr.Val(s.GetId()), ptr.Val(s.GetWebUrl()))
sbpc = inject.BackupProducerConfig{
LastBackupVersion: bpc.LastBackupVersion,
Options: bpc.Options,
ProtectedResource: pr,
Selector: bpc.Selector,
MetadataCollections: siteMetadataCollection[ptr.Val(s.GetId())],
}
bh = drive.NewGroupBackupHandler(
bpc.ProtectedResource.ID(),
ptr.Val(s.GetId()),
ac.Drives(),
scope)
)
bh := drive.NewGroupBackupHandler(
bpc.ProtectedResource.ID(),
ptr.Val(s.GetId()),
ac.Drives(),
scope)
ictx = clues.Add(
ictx,
"site_id", ptr.Val(s.GetId()),
"site_weburl", graph.LoggableURL(ptr.Val(s.GetWebUrl())))
sp, err := bh.SitePathPrefix(creds.AzureTenantID)
if err != nil {
return nil, nil, clues.Wrap(err, "getting site path")
return nil, nil, clues.WrapWC(ictx, err, "getting site path").Label(count.BadPathPrefix)
}
sitesPreviousPaths[ptr.Val(s.GetId())] = sp.String()
cs, canUsePreviousBackup, err := site.CollectLibraries(
ctx,
ictx,
sbpc,
bh,
creds.AzureTenantID,
ssmb,
su,
scl,
errs)
if err != nil {
el.AddRecoverable(ctx, err)
el.AddRecoverable(ictx, err)
continue
}
if !canUsePreviousBackup {
dbcs = append(dbcs, data.NewTombstoneCollection(sp, control.Options{}))
dbcs = append(dbcs, data.NewTombstoneCollection(sp, control.Options{}, scl))
}
dbcs = append(dbcs, cs...)
@ -142,7 +157,7 @@ func ProduceBackupCollections(
// TODO(meain): Use number of messages and not channels
CompletionMessage: func() string { return fmt.Sprintf("(found %d channels)", len(cs)) },
}
progressBar := observe.MessageWithCompletion(ctx, pcfg, scope.Category().PathType().HumanString())
progressBar := observe.MessageWithCompletion(ictx, pcfg, scope.Category().PathType().HumanString())
if !isTeam {
continue
@ -151,25 +166,26 @@ func ProduceBackupCollections(
bh := groups.NewChannelBackupHandler(bpc.ProtectedResource.ID(), ac.Channels())
cs, canUsePreviousBackup, err = groups.CreateCollections(
ctx,
ictx,
bpc,
bh,
creds.AzureTenantID,
scope,
su,
cl,
errs)
if err != nil {
el.AddRecoverable(ctx, err)
el.AddRecoverable(ictx, err)
continue
}
if !canUsePreviousBackup {
tp, err := bh.PathPrefix(creds.AzureTenantID)
if err != nil {
return nil, nil, clues.Wrap(err, "getting message path")
return nil, nil, clues.WrapWC(ictx, err, "getting message path").Label(count.BadPathPrefix)
}
dbcs = append(dbcs, data.NewTombstoneCollection(tp, control.Options{}))
dbcs = append(dbcs, data.NewTombstoneCollection(tp, control.Options{}, cl))
}
dbcs = append(dbcs, cs...)
@ -191,6 +207,7 @@ func ProduceBackupCollections(
path.GroupsService,
categories,
su,
counter,
errs)
if err != nil {
return nil, nil, err
@ -204,13 +221,18 @@ func ProduceBackupCollections(
creds.AzureTenantID,
bpc.ProtectedResource.ID(),
sitesPreviousPaths,
su)
su,
counter)
if err != nil {
return nil, nil, err
}
collections = append(collections, md)
counter.Add(count.Collections, int64(len(collections)))
logger.Ctx(ctx).Infow("produced collections", "stats", counter.Values())
return collections, ssmb.ToReader(), el.Failure()
}
@ -218,6 +240,7 @@ func getSitesMetadataCollection(
tenantID, groupID string,
sites map[string]string,
su support.StatusUpdater,
counter *count.Bus,
) (data.BackupCollection, error) {
p, err := path.BuildMetadata(
tenantID,
@ -239,7 +262,8 @@ func getSitesMetadataCollection(
[]graph.MetadataCollectionEntry{
graph.NewMetadataEntry(metadata.PreviousPathFileName, sites),
},
su)
su,
counter.Local())
return md, err
}

View File

@ -13,6 +13,8 @@ import (
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -24,8 +26,9 @@ func ProduceBackupCollections(
ctx context.Context,
bpc inject.BackupProducerConfig,
ac api.Client,
tenant string,
creds account.M365Config,
su support.StatusUpdater,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, *prefixmatcher.StringSetMatcher, bool, error) {
odb, err := bpc.Selector.ToOneDriveBackup()
@ -35,6 +38,7 @@ func ProduceBackupCollections(
var (
el = errs.Local()
tenantID = creds.AzureTenantID
categories = map[path.CategoryType]struct{}{}
collections = []data.BackupCollection{}
ssmb = prefixmatcher.NewStringSetBuilder()
@ -52,10 +56,11 @@ func ProduceBackupCollections(
nc := drive.NewCollections(
drive.NewUserDriveBackupHandler(ac.Drives(), bpc.ProtectedResource.ID(), scope),
tenant,
tenantID,
bpc.ProtectedResource,
su,
bpc.Options)
bpc.Options,
counter)
pcfg := observe.ProgressCfg{
Indent: 1,
@ -75,7 +80,7 @@ func ProduceBackupCollections(
collections = append(collections, odcs...)
}
mcs, err := migrationCollections(bpc, tenant, su)
mcs, err := migrationCollections(bpc, tenantID, su, counter)
if err != nil {
return nil, nil, false, err
}
@ -86,11 +91,12 @@ func ProduceBackupCollections(
baseCols, err := graph.BaseCollections(
ctx,
collections,
tenant,
tenantID,
bpc.ProtectedResource.ID(),
path.OneDriveService,
categories,
su,
counter,
errs)
if err != nil {
return nil, nil, false, err
@ -99,6 +105,8 @@ func ProduceBackupCollections(
collections = append(collections, baseCols...)
}
logger.Ctx(ctx).Infow("produced collections", "stats", counter.Values())
return collections, ssmb.ToReader(), canUsePreviousBackup, el.Failure()
}
@ -107,6 +115,7 @@ func migrationCollections(
bpc inject.BackupProducerConfig,
tenant string,
su support.StatusUpdater,
counter *count.Bus,
) ([]data.BackupCollection, error) {
// assume a version < 0 implies no prior backup, thus nothing to migrate.
if version.IsNoBackup(bpc.LastBackupVersion) {
@ -117,6 +126,8 @@ func migrationCollections(
return nil, nil
}
counter.Inc(count.RequiresUserPnToIDMigration)
// unlike exchange, which enumerates all folders on every
// backup, onedrive needs to force the owner PN -> ID migration
mc, err := path.BuildPrefix(
@ -137,7 +148,7 @@ func migrationCollections(
return nil, clues.Wrap(err, "creating user name migration path")
}
mgn, err := graph.NewPrefixCollection(mpc, mc, su)
mgn, err := graph.NewPrefixCollection(mpc, mc, su, counter)
if err != nil {
return nil, clues.Wrap(err, "creating migration collection")
}

View File

@ -13,6 +13,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
)
@ -92,7 +93,7 @@ func (suite *BackupUnitSuite) TestMigrationCollections() {
ProtectedResource: u,
}
mc, err := migrationCollections(bpc, "t", nil)
mc, err := migrationCollections(bpc, "t", nil, count.New())
require.NoError(t, err, clues.ToCore(err))
if test.expectLen == 0 {

View File

@ -84,6 +84,7 @@ func ProduceBackupCollections(
creds.AzureTenantID,
ssmb,
su,
counter,
errs)
if err != nil {
el.AddRecoverable(ctx, err)
@ -124,6 +125,7 @@ func ProduceBackupCollections(
path.SharePointService,
categories,
su,
counter,
errs)
if err != nil {
return nil, nil, false, err

View File

@ -15,6 +15,7 @@ import (
"github.com/alcionai/corso/src/internal/m365/service/onedrive/mock"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -119,7 +120,8 @@ func (suite *LibrariesBackupUnitSuite) TestUpdateCollections() {
tenantID,
idname.NewProvider(siteID, siteID),
nil,
control.DefaultOptions())
control.DefaultOptions(),
count.New())
c.CollectionMap = collMap
@ -131,6 +133,7 @@ func (suite *LibrariesBackupUnitSuite) TestUpdateCollections() {
excluded,
topLevelPackages,
"notempty",
count.New(),
fault.New(true))
test.expect(t, err, clues.ToCore(err))

View File

@ -197,6 +197,8 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
}
}()
ctx = clues.AddLabelCounter(ctx, op.Counter.PlainAdder())
ctx, end := diagnostics.Span(ctx, "operations:backup:run")
defer end()
@ -543,7 +545,7 @@ func produceBackupDataCollections(
Selector: sel,
}
return bp.ProduceBackupCollections(ctx, bpc, counter, errs)
return bp.ProduceBackupCollections(ctx, bpc, counter.Local(), errs)
}
// ---------------------------------------------------------------------------

View File

@ -1915,7 +1915,8 @@ func (suite *AssistBackupIntegrationSuite) TestBackupTypesForFailureModes() {
mc, err := graph.MakeMetadataCollection(
pathPrefix,
makeMetadataCollectionEntries("url/1", driveID, folderID, tmp),
func(*support.ControllerOperationStatus) {})
func(*support.ControllerOperationStatus) {},
count.New())
require.NoError(t, err, clues.ToCore(err))
cs = append(cs, mc)
@ -2233,7 +2234,8 @@ func (suite *AssistBackupIntegrationSuite) TestExtensionsIncrementals() {
mc, err := graph.MakeMetadataCollection(
pathPrefix,
makeMetadataCollectionEntries("url/1", driveID, folderID, tmp),
func(*support.ControllerOperationStatus) {})
func(*support.ControllerOperationStatus) {},
count.New())
require.NoError(t, err, clues.ToCore(err))
cs = append(cs, mc)

View File

@ -167,7 +167,8 @@ func (suite *MaintenanceOpNightlySuite) TestRepoMaintenance_GarbageCollection()
mc, err := graph.MakeMetadataCollection(
prefixPath,
makeMetadataCollectionEntries("url/1", driveID, folderID, tmp),
func(*support.ControllerOperationStatus) {})
func(*support.ControllerOperationStatus) {},
count.New())
require.NoError(t, err, clues.ToCore(err))
cs = append(cs, mc)

View File

@ -128,6 +128,8 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
// Setup
// -----
ctx = clues.AddLabelCounter(ctx, op.Counter.PlainAdder())
ctx, end := diagnostics.Span(ctx, "operations:restore:run")
defer end()

View File

@ -27,24 +27,24 @@ func (b *Bus) Local() *Bus {
return bus
}
func (b *Bus) getCounter(k key) *xsync.Counter {
func (b *Bus) getCounter(k Key) *xsync.Counter {
xc, _ := b.stats.LoadOrStore(string(k), xsync.NewCounter())
return xc
}
// Inc increases the count by 1.
func (b *Bus) Inc(k key) {
func (b *Bus) Inc(k Key) int64 {
if b == nil {
return
return -1
}
b.Add(k, 1)
return b.Add(k, 1)
}
// Inc increases the count by n.
func (b *Bus) Add(k key, n int64) {
// Add increases the count by n.
func (b *Bus) Add(k Key, n int64) int64 {
if b == nil {
return
return -1
}
b.getCounter(k).Add(n)
@ -52,18 +52,12 @@ func (b *Bus) Add(k key, n int64) {
if b.parent != nil {
b.parent.Add(k, n)
}
}
// AdderFor returns a func that adds any value of i
// to the bus using the given key.
func (b *Bus) AdderFor(k key) func(i int64) {
return func(i int64) {
b.Add(k, i)
}
return b.Get(k)
}
// Get returns the local count.
func (b *Bus) Get(k key) int64 {
func (b *Bus) Get(k Key) int64 {
if b == nil {
return -1
}
@ -72,7 +66,7 @@ func (b *Bus) Get(k key) int64 {
}
// Total returns the global count.
func (b *Bus) Total(k key) int64 {
func (b *Bus) Total(k Key) int64 {
if b == nil {
return -1
}
@ -114,3 +108,33 @@ func (b *Bus) TotalValues() map[string]int64 {
return b.Values()
}
// ---------------------------------------------------------------------------
// compliance with callbacks and external packages
// ---------------------------------------------------------------------------
// AdderFor returns a func that adds any value of i
// to the bus using the given key.
func (b *Bus) AdderFor(k Key) func(i int64) {
return func(i int64) {
b.Add(k, i)
}
}
type plainAdder struct {
bus *Bus
}
func (pa plainAdder) Add(k string, n int64) {
if pa.bus == nil {
return
}
pa.bus.Add(Key(k), n)
}
// PlainAdder provides support to external packages that could take in a count.Bus
// but don't recognize the `Key` type, and would prefer a string type key.
func (b *Bus) PlainAdder() *plainAdder {
return &plainAdder{b}
}

View File

@ -17,7 +17,7 @@ func TestCountUnitSuite(t *testing.T) {
suite.Run(t, &CountUnitSuite{Suite: tester.NewUnitSuite(t)})
}
const testKey = key("just-for-testing")
const testKey = Key("just-for-testing")
func (suite *CountUnitSuite) TestBus_Inc() {
newParent := func() *Bus {
@ -71,6 +71,46 @@ func (suite *CountUnitSuite) TestBus_Inc() {
}
}
func (suite *CountUnitSuite) TestBus_Inc_result() {
newParent := func() *Bus {
parent := New()
parent.Inc(testKey)
return parent
}
table := []struct {
name string
bus *Bus
expect int64
expectTotal int64
}{
{
name: "nil",
bus: nil,
expect: -1,
expectTotal: -1,
},
{
name: "one",
bus: newParent().Local(),
expect: 1,
expectTotal: 2,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
result := test.bus.Inc(testKey)
assert.Equal(t, test.expect, result)
resultTotal := test.bus.Total(testKey)
assert.Equal(t, test.expectTotal, resultTotal)
})
}
}
func (suite *CountUnitSuite) TestBus_Add() {
newParent := func() *Bus {
parent := New()
@ -123,6 +163,47 @@ func (suite *CountUnitSuite) TestBus_Add() {
}
}
func (suite *CountUnitSuite) TestBus_Add_result() {
newParent := func() *Bus {
parent := New()
parent.Add(testKey, 2)
return parent
}
table := []struct {
name string
skip bool
bus *Bus
expect int64
expectTotal int64
}{
{
name: "nil",
bus: nil,
expect: -1,
expectTotal: -1,
},
{
name: "some",
bus: newParent().Local(),
expect: 4,
expectTotal: 6,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
result := test.bus.Add(testKey, 4)
assert.Equal(t, test.expect, result)
resultTotal := test.bus.Total(testKey)
assert.Equal(t, test.expectTotal, resultTotal)
})
}
}
func (suite *CountUnitSuite) TestBus_Values() {
table := []struct {
name string

View File

@ -1,42 +1,99 @@
package count
type key string
type Key string
const (
// count of bucket-tokens consumed by api calls.
APICallTokensConsumed key = "api-call-tokens-consumed"
APICallTokensConsumed Key = "api-call-tokens-consumed"
// count of api calls that resulted in failure due to throttling.
ThrottledAPICalls key = "throttled-api-calls"
ThrottledAPICalls Key = "throttled-api-calls"
)
// Tracked during backup
const (
// amounts reported by kopia
PersistedCachedFiles key = "persisted-cached-files"
PersistedDirectories key = "persisted-directories"
PersistedFiles key = "persisted-files"
PersistedHashedBytes key = "persisted-hashed-bytes"
PersistedNonCachedFiles key = "persisted-non-cached-files"
PersistedNonMetaFiles key = "persisted-non-meta-files"
PersistedNonMetaUploadedBytes key = "persisted-non-meta-uploaded-bytes"
PersistedUploadedBytes key = "persisted-uploaded-bytes"
PersistenceErrors key = "persistence-errors"
PersistenceExpectedErrors key = "persistence-expected-errors"
PersistenceIgnoredErrors key = "persistence-ignored-errors"
PersistedCachedFiles Key = "persisted-cached-files"
PersistedDirectories Key = "persisted-directories"
PersistedFiles Key = "persisted-files"
PersistedHashedBytes Key = "persisted-hashed-bytes"
PersistedNonCachedFiles Key = "persisted-non-cached-files"
PersistedNonMetaFiles Key = "persisted-non-meta-files"
PersistedNonMetaUploadedBytes Key = "persisted-non-meta-uploaded-bytes"
PersistedUploadedBytes Key = "persisted-uploaded-bytes"
PersistenceErrors Key = "persistence-errors"
PersistenceExpectedErrors Key = "persistence-expected-errors"
PersistenceIgnoredErrors Key = "persistence-ignored-errors"
// amounts reported by data providers
ProviderItemsRead key = "provider-items-read"
Channels Key = "channels"
CollectionMoved Key = "collection-moved"
CollectionNew Key = "collection-state-new"
CollectionNotMoved Key = "collection-state-not-moved"
CollectionTombstoned Key = "collection-state-tombstoned"
Collections Key = "collections"
DeleteFolderMarker Key = "delete-folder-marker"
DeleteItemMarker Key = "delete-item-marker"
Drives Key = "drives"
DriveTombstones Key = "drive-tombstones"
Files Key = "files"
Folders Key = "folders"
ItemDownloadURLRefetch Key = "item-download-url-refetch"
ItemsAdded Key = "items-added"
ItemsRemoved Key = "items-removed"
LazyDeletedInFlight Key = "lazy-deleted-in-flight"
Malware Key = "malware"
MetadataItems Key = "metadata-items"
MetadataBytes Key = "metadata-bytes"
MissingDelta Key = "missing-delta-token"
NewDeltas Key = "new-delta-tokens"
NewPrevPaths Key = "new-previous-paths"
NoDeltaQueries Key = "cannot-make-delta-queries"
Packages Key = "packages"
PagerResets Key = "pager-resets"
PagesEnumerated Key = "pages-enumerated"
PrevDeltas Key = "previous-deltas"
PrevPaths Key = "previous-paths"
PreviousPathMetadataCollision Key = "previous-path-metadata-collision"
Sites Key = "sites"
SkippedContainers Key = "skipped-containers"
StreamBytesAdded Key = "stream-bytes-added"
StreamDirsAdded Key = "stream-dirs-added"
StreamDirsFound Key = "stream-dirs-found"
StreamItemsAdded Key = "stream-items-added"
StreamItemsDeletedInFlight Key = "stream-items-deleted-in-flight"
StreamItemsErrored Key = "stream-items-errored"
StreamItemsFound Key = "stream-items-found"
StreamItemsRemoved Key = "stream-items-removed"
URLCacheMiss Key = "url-cache-miss"
URLCacheRefresh Key = "url-cache-refresh"
// miscellaneous
RequiresUserPnToIDMigration Key = "requires-user-pn-to-id-migration"
)
// Counted using clues error labels
const (
BadCollPath = "bad_collection_path"
BadPathPrefix = "bad_path_prefix_creation"
BadPrevPath = "unparsable_prev_path"
CollectionTombstoneConflict = "collection_tombstone_conflicts_with_live_collection"
ItemBeforeParent = "item_before_parent"
MissingParent = "missing_parent"
UnknownItemType = "unknown_item_type"
)
// Tracked during restore
const (
// count of times that items had collisions during restore,
// and that collision was solved by replacing the item.
CollisionReplace key = "collision-replace"
CollisionReplace Key = "collision-replace"
// count of times that items had collisions during restore,
// and that collision was solved by skipping the item.
CollisionSkip key = "collision-skip"
CollisionSkip Key = "collision-skip"
// NewItemCreated should be used for non-skip, non-replace,
// non-meta item creation counting. IE: use it specifically
// for counting new items (no collision) or copied items.
NewItemCreated key = "new-item-created"
NewItemCreated Key = "new-item-created"
)

View File

@ -7,6 +7,7 @@ import (
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -60,6 +61,7 @@ func BaseCollections(
service path.ServiceType,
categories map[path.CategoryType]struct{},
su support.StatusUpdater,
counter *count.Bus,
errs *fault.Bus,
) ([]data.BackupCollection, error) {
var (
@ -82,7 +84,7 @@ func BaseCollections(
full, err := path.BuildPrefix(tenant, rOwner, service, cat)
if err != nil {
// Shouldn't happen.
err = clues.WrapWC(ictx, err, "making path")
err = clues.WrapWC(ictx, err, "making path").Label(count.BadPathPrefix)
el.AddRecoverable(ictx, err)
lastErr = err
@ -95,7 +97,7 @@ func BaseCollections(
prev: full,
full: full,
su: su,
state: data.StateOf(full, full),
state: data.StateOf(full, full, counter),
})
}
}
@ -111,6 +113,7 @@ func BaseCollections(
func NewPrefixCollection(
prev, full path.Path,
su support.StatusUpdater,
counter *count.Bus,
) (*prefixCollection, error) {
if prev != nil {
if len(prev.Item()) > 0 {
@ -136,7 +139,7 @@ func NewPrefixCollection(
prev: prev,
full: full,
su: su,
state: data.StateOf(prev, full),
state: data.StateOf(prev, full, counter),
}
if pc.state == data.DeletedState {

View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/path"
)
@ -93,7 +94,7 @@ func (suite *CollectionsUnitSuite) TestNewPrefixCollection() {
}
for _, test := range table {
suite.Run(test.name, func() {
_, err := NewPrefixCollection(test.prev, test.full, nil)
_, err := NewPrefixCollection(test.prev, test.full, nil, count.New())
test.expectErr(suite.T(), err, clues.ToCore(err))
})
}

View File

@ -11,6 +11,7 @@ import (
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -27,6 +28,7 @@ type MetadataCollection struct {
fullPath path.Path
items []metadataItem
statusUpdater support.StatusUpdater
counter *count.Bus
}
// MetadataCollectionEntry describes a file that should get added to a metadata
@ -78,6 +80,7 @@ func MakeMetadataCollection(
pathPrefix path.Path,
metadata []MetadataCollectionEntry,
statusUpdater support.StatusUpdater,
counter *count.Bus,
) (data.BackupCollection, error) {
if len(metadata) == 0 {
return nil, nil
@ -94,7 +97,7 @@ func MakeMetadataCollection(
items = append(items, item)
}
coll := NewMetadataCollection(pathPrefix, items, statusUpdater)
coll := NewMetadataCollection(pathPrefix, items, statusUpdater, counter)
return coll, nil
}
@ -103,11 +106,13 @@ func NewMetadataCollection(
p path.Path,
items []metadataItem,
statusUpdater support.StatusUpdater,
counter *count.Bus,
) *MetadataCollection {
return &MetadataCollection{
fullPath: p,
items: items,
statusUpdater: statusUpdater,
counter: counter,
}
}
@ -155,11 +160,13 @@ func (md MetadataCollection) Items(
},
md.fullPath.Folder(false))
md.counter.Add(count.MetadataItems, int64(len(md.items)))
md.statusUpdater(status)
}()
defer close(res)
for _, item := range md.items {
md.counter.Add(count.MetadataBytes, item.size)
totalBytes += item.size
res <- item
}

View File

@ -17,6 +17,7 @@ import (
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -41,7 +42,7 @@ func (suite *MetadataCollectionUnitSuite) TestFullPath() {
"foo")
require.NoError(t, err, clues.ToCore(err))
c := NewMetadataCollection(p, nil, nil)
c := NewMetadataCollection(p, nil, nil, count.New())
assert.Equal(t, p.String(), c.FullPath().String())
}
@ -99,7 +100,8 @@ func (suite *MetadataCollectionUnitSuite) TestItems() {
func(c *support.ControllerOperationStatus) {
assert.Equal(t, len(itemNames), c.Metrics.Objects)
assert.Equal(t, len(itemNames), c.Metrics.Successes)
})
},
count.New())
gotData := [][]byte{}
gotNames := []string{}
@ -198,7 +200,8 @@ func (suite *MetadataCollectionUnitSuite) TestMakeMetadataCollection() {
col, err := MakeMetadataCollection(
pathPrefix,
[]MetadataCollectionEntry{test.metadata},
func(*support.ControllerOperationStatus) {})
func(*support.ControllerOperationStatus) {},
count.New())
test.errCheck(t, err, clues.ToCore(err))
if err != nil {