use count bus in kopia backups (#4482)

uses the count bus in the kopia backup package.
This currently duplicates counts that we're getting
from the kopia stats.  A later pr will remove the old
stats entirely in favor of the counter.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🧹 Tech Debt/Cleanup

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-10-23 17:29:56 -06:00 committed by GitHub
parent 3aadc31201
commit 5cc68e27dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 220 additions and 69 deletions

View File

@ -16,7 +16,9 @@ import (
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data"
dataMock "github.com/alcionai/corso/src/internal/data/mock"
istats "github.com/alcionai/corso/src/internal/stats"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -395,12 +397,16 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() {
defer flush()
root := getLayout(t, test.inputSerializationVersion)
c := &i64counter{}
counter := count.New()
c := istats.ByteCounter{
Counter: counter.AdderFor(count.PersistedUploadedBytes),
}
col := &kopiaDataCollection{
path: pth,
dir: root,
counter: c,
counter: &c,
expectedVersion: readers.DefaultSerializationVersion,
}

View File

@ -8,6 +8,7 @@ import (
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/backup/identity"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -23,6 +24,7 @@ type (
tags map[string]string,
buildTreeWithBase bool,
errs *fault.Bus,
counter *count.Bus,
) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error)
}

View File

@ -15,7 +15,9 @@ import (
"github.com/alcionai/corso/src/internal/common/readers"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/service/exchange/mock"
istats "github.com/alcionai/corso/src/internal/stats"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -271,7 +273,10 @@ func (suite *MergeCollectionUnitSuite) TestFetchItemByName() {
ctx, flush := tester.NewContext(t)
defer flush()
c := &i64counter{}
counter := count.New()
c := istats.ByteCounter{
Counter: counter.AdderFor(count.PersistedUploadedBytes),
}
dc := mergeCollection{fullPath: pth}
@ -279,7 +284,7 @@ func (suite *MergeCollectionUnitSuite) TestFetchItemByName() {
col := &kopiaDataCollection{
path: pth,
dir: layout(t),
counter: c,
counter: &c,
expectedVersion: readers.DefaultSerializationVersion,
}

View File

@ -59,6 +59,7 @@ type corsoProgress struct {
mu sync.RWMutex
totalBytes int64
errs *fault.Bus
counter *count.Bus
// expectedIgnoredErrors is a count of error cases caught in the Error wrapper
// which are well known and actually ignorable. At the end of a run, if the
// manifest ignored error count is equal to this count, then everything is good.
@ -186,6 +187,7 @@ func (cp *corsoProgress) FinishedHashingFile(fname string, bs int64) {
"finished hashing file",
"path", clues.Hide(path.Elements(sl[2:])))
cp.counter.Add(count.PersistedHashedBytes, bs)
atomic.AddInt64(&cp.totalBytes, bs)
}
@ -214,7 +216,9 @@ func (cp *corsoProgress) Error(relpath string, err error, isIgnored bool) {
// delta query and a fetch. This is our next point of error
// handling, where we can identify and skip over the case.
if clues.HasLabel(err, graph.LabelsSkippable) {
cp.counter.Inc(count.PersistenceExpectedErrors)
cp.incExpectedErrs()
return
}

View File

@ -24,6 +24,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/backup/identity"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -569,6 +570,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedHashingFile() {
deets: bd,
pending: map[string]*itemDetails{},
errs: fault.New(true),
counter: count.New(),
}
ci := test.cachedItems(suite.targetFileName, suite.targetFilePath)
@ -579,6 +581,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedHashingFile() {
assert.Empty(t, cp.pending)
assert.Equal(t, test.expectedBytes, cp.totalBytes)
assert.Equal(t, test.expectedBytes, cp.counter.Get(count.PersistedHashedBytes))
})
}
}
@ -2669,7 +2672,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP
getBaseSnapshot := func() (fs.Entry, map[string]*int) {
counters := map[string]*int{}
folder, count := newMockStaticDirectory(
folder, dirCount := newMockStaticDirectory(
encodeElements(folderID3)[0],
[]fs.Entry{
virtualfs.StreamingFileWithModTimeFromReader(
@ -2681,9 +2684,9 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP
time.Time{},
io.NopCloser(bytes.NewReader(fileData6))),
})
counters[folderID3] = count
counters[folderID3] = dirCount
folder, count = newMockStaticDirectory(
folder, dirCount = newMockStaticDirectory(
encodeElements(folderID2)[0],
[]fs.Entry{
virtualfs.StreamingFileWithModTimeFromReader(
@ -2696,14 +2699,14 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP
io.NopCloser(bytes.NewReader(fileData4))),
folder,
})
counters[folderID2] = count
counters[folderID2] = dirCount
folder4, count := newMockStaticDirectory(
folder4, dirCount := newMockStaticDirectory(
encodeElements(folderID4)[0],
[]fs.Entry{})
counters[folderID4] = count
counters[folderID4] = dirCount
folder, count = newMockStaticDirectory(
folder, dirCount = newMockStaticDirectory(
encodeElements(folderID1)[0],
[]fs.Entry{
virtualfs.StreamingFileWithModTimeFromReader(
@ -2717,9 +2720,9 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP
folder,
folder4,
})
counters[folderID1] = count
counters[folderID1] = dirCount
folder5, count := newMockStaticDirectory(
folder5, dirCount := newMockStaticDirectory(
encodeElements(folderID5)[0],
[]fs.Entry{
virtualfs.StreamingFileWithModTimeFromReader(
@ -2731,7 +2734,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP
time.Time{},
io.NopCloser(bytes.NewReader(fileData8))),
})
counters[folderID5] = count
counters[folderID5] = dirCount
return baseWithChildren(
prefixFolders,

View File

@ -26,6 +26,7 @@ import (
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/backup/identity"
"github.com/alcionai/corso/src/pkg/control/repository"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -76,6 +77,13 @@ func manifestToStats(
progress *corsoProgress,
uploadCount *stats.ByteCounter,
) BackupStats {
progress.counter.Add(count.PersistedFiles, int64(man.Stats.TotalFileCount))
progress.counter.Add(count.PersistedCachedFiles, int64(man.Stats.CachedFiles))
progress.counter.Add(count.PersistedNonCachedFiles, int64(man.Stats.NonCachedFiles))
progress.counter.Add(count.PersistedDirectories, int64(man.Stats.TotalDirectoryCount))
progress.counter.Add(count.PersistenceErrors, int64(man.Stats.ErrorCount))
progress.counter.Add(count.PersistenceIgnoredErrors, int64(man.Stats.IgnoredErrorCount))
return BackupStats{
SnapshotID: string(man.ID),
@ -145,6 +153,7 @@ func (w Wrapper) ConsumeBackupCollections(
additionalTags map[string]string,
buildTreeWithBase bool,
errs *fault.Bus,
counter *count.Bus,
) (*BackupStats, *details.Builder, DetailsMergeInfoer, error) {
if w.c == nil {
return nil, nil, nil, clues.Stack(errNotConnected).WithClues(ctx)
@ -163,6 +172,7 @@ func (w Wrapper) ConsumeBackupCollections(
deets: &details.Builder{},
toMerge: newMergeDetails(),
errs: errs,
counter: counter,
}
// When running an incremental backup, we need to pass the prior
@ -227,7 +237,11 @@ func (w Wrapper) makeSnapshotWithRoot(
) (*BackupStats, error) {
var (
man *snapshot.Manifest
bc = &stats.ByteCounter{}
bc = &stats.ByteCounter{
// duplicate the count in the progress count.Bus. Later we can
// replace the ByteCounter with the progress counter entirely.
Counter: progress.counter.AdderFor(count.PersistedUploadedBytes),
}
)
snapIDs := make([]manifest.ID, 0, len(prevSnapEntries)) // just for logging

View File

@ -15,6 +15,7 @@ import (
exchMock "github.com/alcionai/corso/src/internal/m365/service/exchange/mock"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/identity"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -99,6 +100,7 @@ func BenchmarkHierarchyMerge(b *testing.B) {
base ManifestEntry,
) ManifestEntry {
bbs := test.baseBackups(base)
counter := count.New()
stats, _, _, err := w.ConsumeBackupCollections(
ctx,
@ -108,11 +110,14 @@ func BenchmarkHierarchyMerge(b *testing.B) {
nil,
nil,
true,
fault.New(true))
fault.New(true),
counter)
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, 0, stats.IgnoredErrorCount)
assert.Equal(t, 0, stats.ErrorCount)
assert.Zero(t, stats.IgnoredErrorCount)
assert.Zero(t, stats.ErrorCount)
assert.Zero(t, counter.Get(count.PersistenceIgnoredErrors))
assert.Zero(t, counter.Get(count.PersistenceErrors))
assert.False(t, stats.Incomplete)
snap, err := snapshot.LoadSnapshot(

View File

@ -27,10 +27,12 @@ import (
dataMock "github.com/alcionai/corso/src/internal/data/mock"
"github.com/alcionai/corso/src/internal/m365/collection/drive/metadata"
exchMock "github.com/alcionai/corso/src/internal/m365/service/exchange/mock"
istats "github.com/alcionai/corso/src/internal/stats"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/backup/identity"
"github.com/alcionai/corso/src/pkg/control/repository"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -73,11 +75,11 @@ func testForFiles(
) {
t.Helper()
count := 0
fCount := 0
for _, c := range collections {
for s := range c.Items(ctx, fault.New(true)) {
count++
fCount++
fullPath, err := c.FullPath().AppendItem(s.ID())
require.NoError(t, err, clues.ToCore(err))
@ -96,7 +98,7 @@ func testForFiles(
}
}
assert.Equal(t, len(expected), count)
assert.Equal(t, len(expected), fCount)
}
func checkSnapshotTags(
@ -883,6 +885,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
defer flush()
bbs := test.baseBackups(base)
counter := count.New()
stats, deets, deetsMerger, err := suite.w.ConsumeBackupCollections(
ctx,
@ -892,27 +895,45 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
nil,
tags,
true,
fault.New(true))
fault.New(true),
counter)
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, test.expectedUploadedFiles, stats.TotalFileCount, "total files")
assert.Equal(t, int64(test.expectedUploadedFiles), counter.Get(count.PersistedFiles), "total files")
assert.Equal(t, test.expectedUploadedFiles, stats.UncachedFileCount, "uncached files")
assert.Equal(t, int64(test.expectedUploadedFiles), counter.Get(count.PersistedNonCachedFiles), "uncached files")
assert.Equal(t, test.expectedCachedFiles, stats.CachedFileCount, "cached files")
assert.Equal(t, int64(test.expectedCachedFiles), counter.Get(count.PersistedCachedFiles), "cached files")
assert.Equal(t, 4+len(test.collections), stats.TotalDirectoryCount, "directory count")
assert.Equal(t, 0, stats.IgnoredErrorCount)
assert.Equal(t, 0, stats.ErrorCount)
assert.Equal(t, int64(4+len(test.collections)), counter.Get(count.PersistedDirectories), "directory count")
assert.Zero(t, stats.IgnoredErrorCount, "ignored errors")
assert.Zero(t, counter.Get(count.PersistenceIgnoredErrors), "ignored errors")
assert.Zero(t, stats.ErrorCount, "errors")
assert.Zero(t, counter.Get(count.PersistenceErrors), "errors")
assert.False(t, stats.Incomplete)
test.hashedBytesCheck(t, stats.TotalHashedBytes, "hashed bytes")
test.hashedBytesCheck(t, counter.Get(count.PersistedHashedBytes), "hashed bytes")
assert.LessOrEqual(
t,
test.uploadedBytes[0],
stats.TotalUploadedBytes,
"low end of uploaded bytes")
assert.LessOrEqual(
t,
test.uploadedBytes[0],
counter.Get(count.PersistedUploadedBytes),
"low end of uploaded bytes")
assert.GreaterOrEqual(
t,
test.uploadedBytes[1],
stats.TotalUploadedBytes,
"high end of uploaded bytes")
assert.GreaterOrEqual(
t,
test.uploadedBytes[1],
counter.Get(count.PersistedUploadedBytes),
"high end of uploaded bytes")
if test.expectMerge {
assert.Empty(t, deets.Details().Entries, "details entries")
@ -1183,6 +1204,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_NoDetailsForMeta() {
suite.Run(test.name, func() {
t := suite.T()
collections := test.cols()
counter := count.New()
stats, deets, prevShortRefs, err := suite.w.ConsumeBackupCollections(
suite.ctx,
@ -1192,15 +1214,22 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_NoDetailsForMeta() {
nil,
tags,
true,
fault.New(true))
fault.New(true),
counter)
assert.NoError(t, err, clues.ToCore(err))
assert.Equal(t, test.expectedUploadedFiles, stats.TotalFileCount, "total files")
assert.Equal(t, int64(test.expectedUploadedFiles), counter.Get(count.PersistedFiles), "total files")
assert.Equal(t, test.expectedUploadedFiles, stats.UncachedFileCount, "uncached files")
assert.Equal(t, int64(test.expectedUploadedFiles), counter.Get(count.PersistedNonCachedFiles), "uncached files")
assert.Equal(t, test.expectedCachedFiles, stats.CachedFileCount, "cached files")
assert.Equal(t, 5, stats.TotalDirectoryCount)
assert.Equal(t, 0, stats.IgnoredErrorCount)
assert.Equal(t, 0, stats.ErrorCount)
assert.Equal(t, int64(test.expectedCachedFiles), counter.Get(count.PersistedCachedFiles), "cached files")
assert.Equal(t, 5, stats.TotalDirectoryCount, "uploaded directories")
assert.Equal(t, int64(5), counter.Get(count.PersistedDirectories), "uploaded directories")
assert.Zero(t, stats.IgnoredErrorCount, "ignored errors")
assert.Zero(t, counter.Get(count.PersistenceIgnoredErrors), "ignored errors")
assert.Zero(t, stats.ErrorCount, "errors")
assert.Zero(t, counter.Get(count.PersistenceErrors), "errors")
assert.False(t, stats.Incomplete)
// 47 file and 1 folder entries.
@ -1280,7 +1309,8 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() {
nil,
nil,
true,
fault.New(true))
fault.New(true),
count.New())
require.NoError(t, err, clues.ToCore(err))
err = k.Compression(ctx, "gzip")
@ -1365,6 +1395,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() {
}
errs := fault.New(true)
counter := count.New()
stats, deets, _, err := suite.w.ConsumeBackupCollections(
suite.ctx,
@ -1374,12 +1405,17 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() {
nil,
nil,
true,
errs)
errs,
counter)
require.Error(t, err, clues.ToCore(err))
assert.Equal(t, 0, stats.ErrorCount, "error count")
assert.Zero(t, stats.ErrorCount, "error count")
assert.Zero(t, counter.Get(count.PersistenceErrors), "error count")
assert.Equal(t, 5, stats.TotalFileCount, "total files")
assert.Equal(t, int64(5), counter.Get(count.PersistedFiles), "total files")
assert.Equal(t, 6, stats.TotalDirectoryCount, "total directories")
assert.Equal(t, 0, stats.IgnoredErrorCount, "ignored errors")
assert.Equal(t, int64(6), counter.Get(count.PersistedDirectories), "total directories")
assert.Zero(t, stats.IgnoredErrorCount, "ignored errors")
assert.Zero(t, counter.Get(count.PersistenceIgnoredErrors), "ignored errors")
assert.Equal(t, 1, len(errs.Errors().Recovered), "recovered errors")
assert.False(t, stats.Incomplete, "incomplete")
// 5 file and 2 folder entries.
@ -1388,7 +1424,9 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() {
failedPath, err := suite.storePath2.AppendItem(testFileName4)
require.NoError(t, err, clues.ToCore(err))
ic := i64counter{}
ic := istats.ByteCounter{
Counter: counter.AdderFor(count.PersistedUploadedBytes),
}
dcs, err := suite.w.ProduceRestoreCollections(
suite.ctx,
@ -1411,7 +1449,12 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() {
// Files that had an error shouldn't make a dir entry in kopia. If they do we
// may run into kopia-assisted incrementals issues because only mod time and
// not file size is checked for StreamingFiles.
assert.ErrorIs(t, errs.Failure(), data.ErrNotFound, "errored file is restorable", clues.ToCore(err))
assert.ErrorIs(
t,
errs.Failure(),
data.ErrNotFound,
"errored file is restorable",
clues.ToCore(err))
}
type backedupFile struct {
@ -1450,7 +1493,8 @@ func (suite *KopiaIntegrationSuite) TestBackupCollectionsHandlesNoCollections()
nil,
nil,
true,
fault.New(true))
fault.New(true),
count.New())
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, BackupStats{}, *s)
@ -1600,6 +1644,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
}
r := identity.NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory)
counter := count.New()
// Other tests check basic things about deets so not doing that again here.
stats, _, _, err := suite.w.ConsumeBackupCollections(
@ -1610,12 +1655,17 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
nil,
nil,
false,
fault.New(true))
fault.New(true),
counter)
require.NoError(t, err, clues.ToCore(err))
require.Equal(t, stats.ErrorCount, 0)
require.Equal(t, stats.TotalFileCount, expectedFiles)
require.Equal(t, stats.TotalDirectoryCount, expectedDirs)
require.Equal(t, stats.IgnoredErrorCount, 0)
require.Zero(t, stats.ErrorCount)
require.Zero(t, counter.Get(count.PersistenceErrors))
require.Zero(t, stats.IgnoredErrorCount)
require.Zero(t, counter.Get(count.PersistenceIgnoredErrors))
require.Equal(t, expectedFiles, stats.TotalFileCount)
require.Equal(t, int64(expectedFiles), counter.Get(count.PersistedFiles))
require.Equal(t, expectedDirs, stats.TotalDirectoryCount)
require.Equal(t, int64(expectedFiles), counter.Get(count.PersistedDirectories))
require.False(t, stats.Incomplete)
suite.snapshotID = manifest.ID(stats.SnapshotID)
@ -1627,14 +1677,6 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TearDownTest() {
logger.Flush(suite.ctx)
}
type i64counter struct {
i int64
}
func (c *i64counter) Count(i int64) {
c.i += i
}
func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
r := identity.NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory)
@ -1729,6 +1771,8 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
})
}
counter := count.New()
stats, _, _, err := suite.w.ConsumeBackupCollections(
suite.ctx,
[]identity.Reasoner{r},
@ -1741,10 +1785,13 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
excluded,
nil,
true,
fault.New(true))
fault.New(true),
counter)
require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, test.expectedCachedItems, stats.CachedFileCount)
assert.Equal(t, test.expectedUncachedItems, stats.UncachedFileCount)
assert.Equal(t, int64(test.expectedCachedItems), counter.Get(count.PersistedCachedFiles))
assert.Equal(t, int64(test.expectedUncachedItems), counter.Get(count.PersistedNonCachedFiles))
test.backupIDCheck(t, stats.SnapshotID)
@ -1752,7 +1799,9 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
return
}
ic := i64counter{}
ic := istats.ByteCounter{
Counter: counter.AdderFor(count.PersistedUploadedBytes),
}
dcs, err := suite.w.ProduceRestoreCollections(
suite.ctx,
@ -1871,7 +1920,10 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections() {
expected[pth.String()] = item.data
}
ic := i64counter{}
counter := count.New()
ic := istats.ByteCounter{
Counter: counter.AdderFor(count.PersistedUploadedBytes),
}
result, err := suite.w.ProduceRestoreCollections(
suite.ctx,
@ -1902,7 +1954,8 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections() {
}
assert.Len(t, result, test.expectedCollections)
assert.Less(t, int64(0), ic.i)
assert.Less(t, int64(0), ic.NumBytes)
assert.Less(t, int64(0), counter.Get(count.PersistedUploadedBytes))
testForFiles(t, ctx, expected, result)
})
}
@ -2010,7 +2063,10 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_Path
expected[itemPath.String()] = item.data
}
ic := i64counter{}
counter := count.New()
ic := istats.ByteCounter{
Counter: counter.AdderFor(count.PersistedUploadedBytes),
}
result, err := suite.w.ProduceRestoreCollections(
suite.ctx,
@ -2055,9 +2111,11 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_Fetc
},
}
// Really only interested in getting the collection so we can call fetch on
// it.
ic := i64counter{}
// Really only interested in getting the collection so we can call fetch on it.
counter := count.New()
ic := istats.ByteCounter{
Counter: counter.AdderFor(count.PersistedUploadedBytes),
}
result, err := suite.w.ProduceRestoreCollections(
suite.ctx,

View File

@ -68,6 +68,8 @@ type BackupResults struct {
stats.ReadWrites
stats.StartAndEndTime
BackupID model.StableID `json:"backupID"`
// keys are found in /pkg/count/keys.go
Counts map[string]int64 `json:"counts"`
}
// NewBackupOperation constructs and validates a backup operation.
@ -202,6 +204,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx})
defer flushMetrics()
// for cases where we can't pass the counter down as part of a func call.
ctx = count.Embed(ctx, op.Counter)
// Check if the protected resource has the service enabled in order for us
@ -294,7 +297,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
// Persistence
// -----
err = op.persistResults(startTime, &opStats)
err = op.persistResults(startTime, &opStats, op.Counter)
if err != nil {
op.Errors.Fail(clues.Wrap(err, "persisting backup results"))
return op.Errors.Failure()
@ -347,6 +350,7 @@ func (op *BackupOperation) do(
Info("backing up selection")
// should always be 1, since backups are 1:1 with resourceOwners.
// TODO: this is outdated and needs to be removed.
opStats.resourceCount = 1
kbf, err := op.kopia.NewBaseFinder(op.store)
@ -409,7 +413,8 @@ func (op *BackupOperation) do(
ssmb,
backupID,
op.incremental && canUseMetadata && canUsePreviousBackup,
op.Errors)
op.Errors,
op.Counter)
if err != nil {
return nil, clues.Wrap(err, "persisting collection backups")
}
@ -499,6 +504,7 @@ func consumeBackupCollections(
backupID model.StableID,
isIncremental bool,
errs *fault.Bus,
counter *count.Bus,
) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error) {
ctx = clues.Add(
ctx,
@ -521,7 +527,8 @@ func consumeBackupCollections(
pmr,
tags,
isIncremental,
errs)
errs,
counter)
if err != nil {
if kopiaStats == nil {
return nil, nil, nil, clues.Stack(err)
@ -799,6 +806,7 @@ func mergeDetails(
func (op *BackupOperation) persistResults(
started time.Time,
opStats *backupStats,
counter *count.Bus,
) error {
op.Results.StartedAt = started
op.Results.CompletedAt = time.Now()
@ -816,6 +824,10 @@ func (op *BackupOperation) persistResults(
return clues.New("backup persistence never completed")
}
// the summary of all counts collected during backup
op.Results.Counts = counter.TotalValues()
// legacy counting system
op.Results.BytesRead = opStats.k.TotalHashedBytes
op.Results.BytesUploaded = opStats.k.TotalUploadedBytes
op.Results.ItemsWritten = opStats.k.TotalFileCount

View File

@ -39,6 +39,7 @@ import (
"github.com/alcionai/corso/src/pkg/backup/metadata"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/control/repository"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/extensions"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
@ -136,6 +137,7 @@ func (mbu mockBackupConsumer) ConsumeBackupCollections(
tags map[string]string,
buildTreeWithBase bool,
errs *fault.Bus,
counter *count.Bus,
) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error) {
if mbu.checkFunc != nil {
mbu.checkFunc(backupReasons, bases, cs, tags, buildTreeWithBase)
@ -432,14 +434,14 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() {
op.Errors.Fail(test.fail)
test.expectErr(t, op.persistResults(now, &test.stats))
// op.Counter is not incremented in this test.
test.expectErr(t, op.persistResults(now, &test.stats, count.New()))
assert.Equal(t, test.expectStatus.String(), op.Status.String(), "status")
assert.Equal(t, test.stats.ctrl.Successes, op.Results.ItemsRead, "items read")
assert.Equal(t, test.stats.k.TotalFileCount, op.Results.ItemsWritten, "items written")
assert.Equal(t, test.stats.k.TotalHashedBytes, op.Results.BytesRead, "bytes read")
assert.Equal(t, test.stats.k.TotalUploadedBytes, op.Results.BytesUploaded, "bytes written")
assert.Equal(t, test.stats.resourceCount, op.Results.ResourceOwners, "resource owners")
assert.Equal(t, op.Results.ItemsRead, test.stats.ctrl.Successes, "items read")
assert.Equal(t, op.Results.ItemsWritten, test.stats.k.TotalFileCount, "items written")
assert.Equal(t, op.Results.BytesRead, test.stats.k.TotalHashedBytes, "bytes read")
assert.Equal(t, op.Results.BytesUploaded, test.stats.k.TotalUploadedBytes, "bytes written")
assert.Equal(t, now, op.Results.StartedAt, "started at")
assert.Less(t, now, op.Results.CompletedAt, "completed at")
})
@ -525,7 +527,8 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_ConsumeBackupDataCollections
nil,
backupID,
true,
fault.New(true))
fault.New(true),
count.New())
}
func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsItems() {

View File

@ -23,12 +23,19 @@ type StartAndEndTime struct {
CompletedAt time.Time `json:"completedAt"`
}
type Counter func(numBytes int64)
type ByteCounter struct {
NumBytes int64
Counter Counter
}
func (bc *ByteCounter) Count(i int64) {
atomic.AddInt64(&bc.NumBytes, i)
if bc.Counter != nil {
bc.Counter(i)
}
}
type SkippedCounts struct {

View File

@ -15,6 +15,7 @@ import (
"github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/kopia/inject"
"github.com/alcionai/corso/src/internal/stats"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -215,7 +216,8 @@ func write(
prefixmatcher.NopReader[map[string]struct{}](),
nil,
false,
errs)
errs,
count.New())
if err != nil {
return "", clues.Wrap(err, "storing marshalled bytes in repository")
}

View File

@ -56,6 +56,14 @@ func (b *Bus) Add(k key, n int64) {
}
}
// AdderFor returns a func that adds any value of i
// to the bus using the given key.
func (b *Bus) AdderFor(k key) func(i int64) {
return func(i int64) {
b.Add(k, i)
}
}
// Get returns the local count.
func (b *Bus) Get(k key) int64 {
if b == nil {

View File

@ -5,6 +5,30 @@ type key string
const (
// count of bucket-tokens consumed by api calls.
APICallTokensConsumed key = "api-call-tokens-consumed"
// count of api calls that resulted in failure due to throttling.
ThrottledAPICalls key = "throttled-api-calls"
)
// Tracked during backup
const (
// amounts reported by kopia
PersistedCachedFiles key = "persisted-cached-files"
PersistedDirectories key = "persisted-directories"
PersistedFiles key = "persisted-files"
PersistedHashedBytes key = "persisted-hashed-bytes"
PersistedNonCachedFiles key = "persisted-non-cached-files"
PersistedNonMetaFiles key = "persisted-non-meta-files"
PersistedNonMetaUploadedBytes key = "persisted-non-meta-uploaded-bytes"
PersistedUploadedBytes key = "persisted-uploaded-bytes"
PersistenceErrors key = "persistence-errors"
PersistenceExpectedErrors key = "persistence-expected-errors"
PersistenceIgnoredErrors key = "persistence-ignored-errors"
// amounts reported by data providers
ProviderItemsRead key = "provider-items-read"
)
// Tracked during restore
const (
// count of times that items had collisions during restore,
// and that collision was solved by replacing the item.
CollisionReplace key = "collision-replace"
@ -15,6 +39,4 @@ const (
// non-meta item creation counting. IE: use it specifically
// for counting new items (no collision) or copied items.
NewItemCreated key = "new-item-created"
// count of api calls that resulted in failure due to throttling.
ThrottledAPICalls key = "throttled-api-calls"
)