report totalFileSize on backup (#921)
## Description Extend the kopia metrics to include the snapshot's TotalFileSize parameter. This value also gets used as the metric for BytesWritten at the end of a backup. ## Type of change - [x] 🌻 Feature ## Issue(s) * #894 ## Test Plan - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
975b8b8e5b
commit
ce4d97ef48
@ -28,16 +28,17 @@ const (
|
|||||||
RestoreEnd = "restore-end"
|
RestoreEnd = "restore-end"
|
||||||
|
|
||||||
// Event Data Keys
|
// Event Data Keys
|
||||||
BackupID = "backup-id"
|
BackupID = "backup-id"
|
||||||
ExchangeResources = "exchange-resources"
|
DataRetrieved = "data-retrieved"
|
||||||
ExchangeDataRetrieved = "exchange-data-retrieved"
|
DataStored = "data-stored"
|
||||||
ExchangeDataStored = "exchange-data-stored"
|
Duration = "duration"
|
||||||
EndTime = "end-time"
|
EndTime = "end-time"
|
||||||
StartTime = "start-time"
|
ItemsRead = "items-read"
|
||||||
Duration = "duration"
|
ItemsWritten = "items-written"
|
||||||
Status = "status"
|
Resources = "resources"
|
||||||
ItemsRead = "items-read"
|
Service = "service"
|
||||||
ItemsWritten = "items-written"
|
StartTime = "start-time"
|
||||||
|
Status = "status"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Eventer interface {
|
type Eventer interface {
|
||||||
|
|||||||
@ -3,6 +3,7 @@ package kopia
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
"github.com/kopia/kopia/fs"
|
"github.com/kopia/kopia/fs"
|
||||||
@ -35,6 +36,7 @@ var (
|
|||||||
type BackupStats struct {
|
type BackupStats struct {
|
||||||
SnapshotID string
|
SnapshotID string
|
||||||
TotalFileCount int
|
TotalFileCount int
|
||||||
|
TotalHashedBytes int64
|
||||||
TotalDirectoryCount int
|
TotalDirectoryCount int
|
||||||
IgnoredErrorCount int
|
IgnoredErrorCount int
|
||||||
ErrorCount int
|
ErrorCount int
|
||||||
@ -42,10 +44,11 @@ type BackupStats struct {
|
|||||||
IncompleteReason string
|
IncompleteReason string
|
||||||
}
|
}
|
||||||
|
|
||||||
func manifestToStats(man *snapshot.Manifest) BackupStats {
|
func manifestToStats(man *snapshot.Manifest, progress *corsoProgress) BackupStats {
|
||||||
return BackupStats{
|
return BackupStats{
|
||||||
SnapshotID: string(man.ID),
|
SnapshotID: string(man.ID),
|
||||||
TotalFileCount: int(man.Stats.TotalFileCount),
|
TotalFileCount: int(man.Stats.TotalFileCount),
|
||||||
|
TotalHashedBytes: progress.totalBytes,
|
||||||
TotalDirectoryCount: int(man.Stats.TotalDirectoryCount),
|
TotalDirectoryCount: int(man.Stats.TotalDirectoryCount),
|
||||||
IgnoredErrorCount: int(man.Stats.IgnoredErrorCount),
|
IgnoredErrorCount: int(man.Stats.IgnoredErrorCount),
|
||||||
ErrorCount: int(man.Stats.ErrorCount),
|
ErrorCount: int(man.Stats.ErrorCount),
|
||||||
@ -61,9 +64,10 @@ type itemDetails struct {
|
|||||||
|
|
||||||
type corsoProgress struct {
|
type corsoProgress struct {
|
||||||
snapshotfs.UploadProgress
|
snapshotfs.UploadProgress
|
||||||
pending map[string]*itemDetails
|
pending map[string]*itemDetails
|
||||||
deets *details.Details
|
deets *details.Details
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
|
totalBytes int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Kopia interface function used as a callback when kopia finishes processing a
|
// Kopia interface function used as a callback when kopia finishes processing a
|
||||||
@ -120,6 +124,14 @@ func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
|
|||||||
cp.deets.AddFolders(folders)
|
cp.deets.AddFolders(folders)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Kopia interface function used as a callback when kopia finishes hashing a file.
|
||||||
|
func (cp *corsoProgress) FinishedHashingFile(fname string, bytes int64) {
|
||||||
|
// Pass the call through as well so we don't break expected functionality.
|
||||||
|
defer cp.UploadProgress.FinishedHashingFile(fname, bytes)
|
||||||
|
|
||||||
|
atomic.AddInt64(&cp.totalBytes, bytes)
|
||||||
|
}
|
||||||
|
|
||||||
func (cp *corsoProgress) put(k string, v *itemDetails) {
|
func (cp *corsoProgress) put(k string, v *itemDetails) {
|
||||||
cp.mu.Lock()
|
cp.mu.Lock()
|
||||||
defer cp.mu.Unlock()
|
defer cp.mu.Unlock()
|
||||||
@ -441,7 +453,7 @@ func (w Wrapper) makeSnapshotWithRoot(
|
|||||||
return nil, errors.Wrap(err, "kopia backup")
|
return nil, errors.Wrap(err, "kopia backup")
|
||||||
}
|
}
|
||||||
|
|
||||||
res := manifestToStats(man)
|
res := manifestToStats(man, progress)
|
||||||
|
|
||||||
return &res, nil
|
return &res, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -143,56 +143,69 @@ func (suite *CorsoProgressUnitSuite) SetupSuite() {
|
|||||||
suite.targetFileName = suite.targetFilePath.ToBuilder().Dir().String()
|
suite.targetFileName = suite.targetFilePath.ToBuilder().Dir().String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *CorsoProgressUnitSuite) TestFinishedFile() {
|
type testInfo struct {
|
||||||
type testInfo struct {
|
info *itemDetails
|
||||||
info *itemDetails
|
err error
|
||||||
err error
|
totalBytes int64
|
||||||
}
|
}
|
||||||
|
|
||||||
deets := &itemDetails{details.ItemInfo{}, suite.targetFilePath}
|
var finishedFileTable = []struct {
|
||||||
|
name string
|
||||||
table := []struct {
|
cachedItems func(fname string, fpath path.Path) map[string]testInfo
|
||||||
name string
|
expectedBytes int64
|
||||||
cachedItems map[string]testInfo
|
expectedNumEntries int
|
||||||
expectedLen int
|
err error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "DetailsExist",
|
name: "DetailsExist",
|
||||||
cachedItems: map[string]testInfo{
|
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
|
||||||
suite.targetFileName: {
|
return map[string]testInfo{
|
||||||
info: deets,
|
fname: {
|
||||||
err: nil,
|
info: &itemDetails{details.ItemInfo{}, fpath},
|
||||||
|
err: nil,
|
||||||
|
totalBytes: 100,
|
||||||
},
|
},
|
||||||
},
|
}
|
||||||
// 1 file and 5 folders.
|
|
||||||
expectedLen: 6,
|
|
||||||
},
|
},
|
||||||
{
|
expectedBytes: 100,
|
||||||
name: "PendingNoDetails",
|
// 1 file and 5 folders.
|
||||||
cachedItems: map[string]testInfo{
|
expectedNumEntries: 6,
|
||||||
suite.targetFileName: {
|
},
|
||||||
|
{
|
||||||
|
name: "PendingNoDetails",
|
||||||
|
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
|
||||||
|
return map[string]testInfo{
|
||||||
|
fname: {
|
||||||
info: nil,
|
info: nil,
|
||||||
err: nil,
|
err: nil,
|
||||||
},
|
},
|
||||||
},
|
}
|
||||||
expectedLen: 0,
|
|
||||||
},
|
},
|
||||||
{
|
expectedNumEntries: 0,
|
||||||
name: "HadError",
|
},
|
||||||
cachedItems: map[string]testInfo{
|
{
|
||||||
suite.targetFileName: {
|
name: "HadError",
|
||||||
info: deets,
|
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
|
||||||
|
return map[string]testInfo{
|
||||||
|
fname: {
|
||||||
|
info: &itemDetails{details.ItemInfo{}, fpath},
|
||||||
err: assert.AnError,
|
err: assert.AnError,
|
||||||
},
|
},
|
||||||
},
|
}
|
||||||
expectedLen: 0,
|
|
||||||
},
|
},
|
||||||
{
|
expectedNumEntries: 0,
|
||||||
name: "NotPending",
|
},
|
||||||
expectedLen: 0,
|
{
|
||||||
|
name: "NotPending",
|
||||||
|
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
|
||||||
|
return nil
|
||||||
},
|
},
|
||||||
}
|
expectedNumEntries: 0,
|
||||||
for _, test := range table {
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *CorsoProgressUnitSuite) TestFinishedFile() {
|
||||||
|
for _, test := range finishedFileTable {
|
||||||
suite.T().Run(test.name, func(t *testing.T) {
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
bd := &details.Details{}
|
bd := &details.Details{}
|
||||||
cp := corsoProgress{
|
cp := corsoProgress{
|
||||||
@ -201,18 +214,20 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFile() {
|
|||||||
pending: map[string]*itemDetails{},
|
pending: map[string]*itemDetails{},
|
||||||
}
|
}
|
||||||
|
|
||||||
for k, v := range test.cachedItems {
|
ci := test.cachedItems(suite.targetFileName, suite.targetFilePath)
|
||||||
|
|
||||||
|
for k, v := range ci {
|
||||||
cp.put(k, v.info)
|
cp.put(k, v.info)
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Len(t, cp.pending, len(test.cachedItems))
|
require.Len(t, cp.pending, len(ci))
|
||||||
|
|
||||||
for k, v := range test.cachedItems {
|
for k, v := range ci {
|
||||||
cp.FinishedFile(k, v.err)
|
cp.FinishedFile(k, v.err)
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Empty(t, cp.pending)
|
assert.Empty(t, cp.pending)
|
||||||
assert.Len(t, bd.Entries, test.expectedLen)
|
assert.Len(t, bd.Entries, test.expectedNumEntries)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -276,6 +291,28 @@ func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchy() {
|
|||||||
assert.Empty(t, rootRef.ParentRef)
|
assert.Empty(t, rootRef.ParentRef)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *CorsoProgressUnitSuite) TestFinishedHashingFile() {
|
||||||
|
for _, test := range finishedFileTable {
|
||||||
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
|
bd := &details.Details{}
|
||||||
|
cp := corsoProgress{
|
||||||
|
UploadProgress: &snapshotfs.NullUploadProgress{},
|
||||||
|
deets: bd,
|
||||||
|
pending: map[string]*itemDetails{},
|
||||||
|
}
|
||||||
|
|
||||||
|
ci := test.cachedItems(suite.targetFileName, suite.targetFilePath)
|
||||||
|
|
||||||
|
for k, v := range ci {
|
||||||
|
cp.FinishedHashingFile(k, v.totalBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Empty(t, cp.pending)
|
||||||
|
assert.Equal(t, test.expectedBytes, cp.totalBytes)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type KopiaUnitSuite struct {
|
type KopiaUnitSuite struct {
|
||||||
suite.Suite
|
suite.Suite
|
||||||
testPath path.Path
|
testPath path.Path
|
||||||
|
|||||||
@ -93,6 +93,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
events.BackupStart,
|
events.BackupStart,
|
||||||
map[string]any{
|
map[string]any{
|
||||||
events.StartTime: startTime,
|
events.StartTime: startTime,
|
||||||
|
events.Service: op.Selectors.Service.String(),
|
||||||
// TODO: initial backup ID,
|
// TODO: initial backup ID,
|
||||||
// TODO: events.ExchangeResources: <count of resources>,
|
// TODO: events.ExchangeResources: <count of resources>,
|
||||||
},
|
},
|
||||||
@ -167,6 +168,7 @@ func (op *BackupOperation) persistResults(
|
|||||||
|
|
||||||
op.Results.ReadErrors = opStats.readErr
|
op.Results.ReadErrors = opStats.readErr
|
||||||
op.Results.WriteErrors = opStats.writeErr
|
op.Results.WriteErrors = opStats.writeErr
|
||||||
|
op.Results.BytesWritten = opStats.k.TotalHashedBytes
|
||||||
op.Results.ItemsRead = opStats.gc.Successful
|
op.Results.ItemsRead = opStats.gc.Successful
|
||||||
op.Results.ItemsWritten = opStats.k.TotalFileCount
|
op.Results.ItemsWritten = opStats.k.TotalFileCount
|
||||||
op.Results.ResourceOwners = opStats.resourceCount
|
op.Results.ResourceOwners = opStats.resourceCount
|
||||||
@ -207,14 +209,15 @@ func (op *BackupOperation) createBackupModels(
|
|||||||
ctx,
|
ctx,
|
||||||
events.BackupEnd,
|
events.BackupEnd,
|
||||||
map[string]any{
|
map[string]any{
|
||||||
events.BackupID: b.ID,
|
events.BackupID: b.ID,
|
||||||
events.Status: op.Status,
|
events.Service: op.Selectors.Service.String(),
|
||||||
events.StartTime: op.Results.StartedAt,
|
events.Status: op.Status,
|
||||||
events.EndTime: op.Results.CompletedAt,
|
events.StartTime: op.Results.StartedAt,
|
||||||
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
events.EndTime: op.Results.CompletedAt,
|
||||||
events.ExchangeResources: op.Results.ResourceOwners,
|
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
||||||
|
events.DataStored: op.Results.BytesWritten,
|
||||||
|
events.Resources: op.Results.ResourceOwners,
|
||||||
// TODO: events.ExchangeDataObserved: <amount of data retrieved>,
|
// TODO: events.ExchangeDataObserved: <amount of data retrieved>,
|
||||||
// TODO: events.ExchangeDataStored: <amount of data stored>,
|
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@ -43,11 +43,13 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
|
|||||||
acct = account.Account{}
|
acct = account.Account{}
|
||||||
now = time.Now()
|
now = time.Now()
|
||||||
stats = backupStats{
|
stats = backupStats{
|
||||||
started: true,
|
started: true,
|
||||||
readErr: multierror.Append(nil, assert.AnError),
|
readErr: multierror.Append(nil, assert.AnError),
|
||||||
writeErr: assert.AnError,
|
writeErr: assert.AnError,
|
||||||
|
resourceCount: 1,
|
||||||
k: &kopia.BackupStats{
|
k: &kopia.BackupStats{
|
||||||
TotalFileCount: 1,
|
TotalFileCount: 1,
|
||||||
|
TotalHashedBytes: 1,
|
||||||
},
|
},
|
||||||
gc: &support.ConnectorOperationStatus{
|
gc: &support.ConnectorOperationStatus{
|
||||||
Successful: 1,
|
Successful: 1,
|
||||||
@ -71,7 +73,8 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
|
|||||||
assert.Equal(t, op.Results.ItemsRead, stats.gc.Successful, "items read")
|
assert.Equal(t, op.Results.ItemsRead, stats.gc.Successful, "items read")
|
||||||
assert.Equal(t, op.Results.ReadErrors, stats.readErr, "read errors")
|
assert.Equal(t, op.Results.ReadErrors, stats.readErr, "read errors")
|
||||||
assert.Equal(t, op.Results.ItemsWritten, stats.k.TotalFileCount, "items written")
|
assert.Equal(t, op.Results.ItemsWritten, stats.k.TotalFileCount, "items written")
|
||||||
assert.Equal(t, 0, op.Results.ResourceOwners, "resource owners")
|
assert.Equal(t, op.Results.BytesWritten, stats.k.TotalHashedBytes, "bytes written")
|
||||||
|
assert.Equal(t, op.Results.ResourceOwners, stats.resourceCount, "resource owners")
|
||||||
assert.Equal(t, op.Results.WriteErrors, stats.writeErr, "write errors")
|
assert.Equal(t, op.Results.WriteErrors, stats.writeErr, "write errors")
|
||||||
assert.Equal(t, op.Results.StartedAt, now, "started at")
|
assert.Equal(t, op.Results.StartedAt, now, "started at")
|
||||||
assert.Less(t, now, op.Results.CompletedAt, "completed at")
|
assert.Less(t, now, op.Results.CompletedAt, "completed at")
|
||||||
@ -213,8 +216,9 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
|
|||||||
require.NotEmpty(t, bo.Results)
|
require.NotEmpty(t, bo.Results)
|
||||||
require.NotEmpty(t, bo.Results.BackupID)
|
require.NotEmpty(t, bo.Results.BackupID)
|
||||||
assert.Equal(t, bo.Status, Completed)
|
assert.Equal(t, bo.Status, Completed)
|
||||||
assert.Greater(t, bo.Results.ItemsRead, 0)
|
assert.Less(t, 0, bo.Results.ItemsRead)
|
||||||
assert.Greater(t, bo.Results.ItemsWritten, 0)
|
assert.Less(t, 0, bo.Results.ItemsWritten)
|
||||||
|
assert.Less(t, int64(0), bo.Results.BytesWritten)
|
||||||
assert.Equal(t, 1, bo.Results.ResourceOwners)
|
assert.Equal(t, 1, bo.Results.ResourceOwners)
|
||||||
assert.Zero(t, bo.Results.ReadErrors)
|
assert.Zero(t, bo.Results.ReadErrors)
|
||||||
assert.Zero(t, bo.Results.WriteErrors)
|
assert.Zero(t, bo.Results.WriteErrors)
|
||||||
@ -273,6 +277,7 @@ func (suite *BackupOpIntegrationSuite) TestBackupOneDrive_Run() {
|
|||||||
require.NotEmpty(t, bo.Results.BackupID)
|
require.NotEmpty(t, bo.Results.BackupID)
|
||||||
assert.Equal(t, bo.Status, Completed)
|
assert.Equal(t, bo.Status, Completed)
|
||||||
assert.Equal(t, bo.Results.ItemsRead, bo.Results.ItemsWritten)
|
assert.Equal(t, bo.Results.ItemsRead, bo.Results.ItemsWritten)
|
||||||
|
assert.Less(t, int64(0), bo.Results.BytesWritten)
|
||||||
assert.Equal(t, 1, bo.Results.ResourceOwners)
|
assert.Equal(t, 1, bo.Results.ResourceOwners)
|
||||||
assert.NoError(t, bo.Results.ReadErrors)
|
assert.NoError(t, bo.Results.ReadErrors)
|
||||||
assert.NoError(t, bo.Results.WriteErrors)
|
assert.NoError(t, bo.Results.WriteErrors)
|
||||||
|
|||||||
@ -92,6 +92,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (err error) {
|
|||||||
events.RestoreStart,
|
events.RestoreStart,
|
||||||
map[string]any{
|
map[string]any{
|
||||||
events.StartTime: startTime,
|
events.StartTime: startTime,
|
||||||
|
events.Service: op.Selectors.Service.String(),
|
||||||
events.BackupID: op.BackupID,
|
events.BackupID: op.BackupID,
|
||||||
// TODO: initial backup ID,
|
// TODO: initial backup ID,
|
||||||
// TODO: events.ExchangeResources: <count of resources>,
|
// TODO: events.ExchangeResources: <count of resources>,
|
||||||
@ -231,14 +232,15 @@ func (op *RestoreOperation) persistResults(
|
|||||||
events.RestoreEnd,
|
events.RestoreEnd,
|
||||||
map[string]any{
|
map[string]any{
|
||||||
// TODO: RestoreID
|
// TODO: RestoreID
|
||||||
events.BackupID: op.BackupID,
|
events.BackupID: op.BackupID,
|
||||||
events.Status: op.Status,
|
events.Service: op.Selectors.Service.String(),
|
||||||
events.StartTime: op.Results.StartedAt,
|
events.Status: op.Status,
|
||||||
events.EndTime: op.Results.CompletedAt,
|
events.StartTime: op.Results.StartedAt,
|
||||||
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
events.EndTime: op.Results.CompletedAt,
|
||||||
events.ItemsRead: op.Results.ItemsRead,
|
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
||||||
events.ItemsWritten: op.Results.ItemsWritten,
|
events.ItemsRead: op.Results.ItemsRead,
|
||||||
events.ExchangeResources: op.Results.ResourceOwners,
|
events.ItemsWritten: op.Results.ItemsWritten,
|
||||||
|
events.Resources: op.Results.ResourceOwners,
|
||||||
// TODO: events.ExchangeDataObserved: <amount of data retrieved>,
|
// TODO: events.ExchangeDataObserved: <amount of data retrieved>,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@ -7,6 +7,7 @@ import "time"
|
|||||||
// assumed to be successful, so the total count of items involved
|
// assumed to be successful, so the total count of items involved
|
||||||
// would be ItemsRead+ReadErrors.
|
// would be ItemsRead+ReadErrors.
|
||||||
type ReadWrites struct {
|
type ReadWrites struct {
|
||||||
|
BytesWritten int64 `json:"bytesWritten,omitempty"`
|
||||||
ItemsRead int `json:"itemsRead,omitempty"`
|
ItemsRead int `json:"itemsRead,omitempty"`
|
||||||
ItemsWritten int `json:"itemsWritten,omitempty"`
|
ItemsWritten int `json:"itemsWritten,omitempty"`
|
||||||
ReadErrors error `json:"readErrors,omitempty"`
|
ReadErrors error `json:"readErrors,omitempty"`
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user