Count number of resources in a slice of colls (#916)
## Description Backup and Restore metrics need to know the count of resource owners in the operation. This processor extracts that data from the collections that get passed from producer to consumer. ## Type of change - [x] 🌻 Feature ## Issue(s) * #894 ## Test Plan - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
7850272a5e
commit
c3176cf113
@ -7,6 +7,10 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
)
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// standard ifaces
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
// A Collection represents a compilation of data from the
|
||||
// same type application (e.g. mail)
|
||||
type Collection interface {
|
||||
@ -42,3 +46,26 @@ type StreamInfo interface {
|
||||
type StreamSize interface {
|
||||
Size() int64
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// functionality
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
// ResourceOwnerSet extracts the set of unique resource owners from the
|
||||
// slice of Collections.
|
||||
func ResourceOwnerSet(cs []Collection) []string {
|
||||
rs := map[string]struct{}{}
|
||||
|
||||
for _, c := range cs {
|
||||
fp := c.FullPath()
|
||||
rs[fp.ResourceOwner()] = struct{}{}
|
||||
}
|
||||
|
||||
rss := make([]string, 0, len(rs))
|
||||
|
||||
for k := range rs {
|
||||
rss = append(rss, k)
|
||||
}
|
||||
|
||||
return rss
|
||||
}
|
||||
|
||||
@ -3,13 +3,83 @@ package data
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/path"
|
||||
)
|
||||
|
||||
type mockColl struct {
|
||||
p path.Path
|
||||
}
|
||||
|
||||
func (mc mockColl) Items() <-chan Stream {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mc mockColl) FullPath() path.Path {
|
||||
return mc.p
|
||||
}
|
||||
|
||||
type CollectionSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestDataCollectionSuite(t *testing.T) {
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
// tests
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func TestCollectionSuite(t *testing.T) {
|
||||
suite.Run(t, new(CollectionSuite))
|
||||
}
|
||||
|
||||
func (suite *CollectionSuite) TestResourceOwnerSet() {
|
||||
t := suite.T()
|
||||
toColl := func(t *testing.T, resource string) Collection {
|
||||
p, err := path.Builder{}.
|
||||
Append("foo").
|
||||
ToDataLayerExchangePathForCategory("tid", resource, path.EventsCategory, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
return mockColl{p}
|
||||
}
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
input []Collection
|
||||
expect []string
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
input: []Collection{},
|
||||
expect: []string{},
|
||||
},
|
||||
{
|
||||
name: "nil",
|
||||
input: nil,
|
||||
expect: []string{},
|
||||
},
|
||||
{
|
||||
name: "single resource",
|
||||
input: []Collection{toColl(t, "fnords")},
|
||||
expect: []string{"fnords"},
|
||||
},
|
||||
{
|
||||
name: "multiple resource",
|
||||
input: []Collection{toColl(t, "fnords"), toColl(t, "smarfs")},
|
||||
expect: []string{"fnords", "smarfs"},
|
||||
},
|
||||
{
|
||||
name: "duplciate resources",
|
||||
input: []Collection{toColl(t, "fnords"), toColl(t, "smarfs"), toColl(t, "fnords")},
|
||||
expect: []string{"fnords", "smarfs"},
|
||||
},
|
||||
}
|
||||
for _, test := range table {
|
||||
suite.T().Run(test.name, func(t *testing.T) {
|
||||
rs := ResourceOwnerSet(test.input)
|
||||
assert.ElementsMatch(t, test.expect, rs)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/alcionai/corso/src/internal/connector"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/events"
|
||||
"github.com/alcionai/corso/src/internal/kopia"
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
@ -73,6 +74,7 @@ func (op BackupOperation) validate() error {
|
||||
type backupStats struct {
|
||||
k *kopia.BackupStats
|
||||
gc *support.ConnectorOperationStatus
|
||||
resourceCount int
|
||||
started bool
|
||||
readErr, writeErr error
|
||||
}
|
||||
@ -127,6 +129,8 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
opStats.resourceCount = len(data.ResourceOwnerSet(cs))
|
||||
|
||||
// hand the results to the consumer
|
||||
opStats.k, backupDetails, err = op.kopia.BackupCollections(ctx, cs)
|
||||
if err != nil {
|
||||
@ -165,6 +169,7 @@ func (op *BackupOperation) persistResults(
|
||||
op.Results.WriteErrors = opStats.writeErr
|
||||
op.Results.ItemsRead = opStats.gc.Successful
|
||||
op.Results.ItemsWritten = opStats.k.TotalFileCount
|
||||
op.Results.ResourceOwners = opStats.resourceCount
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -202,12 +207,12 @@ func (op *BackupOperation) createBackupModels(
|
||||
ctx,
|
||||
events.BackupEnd,
|
||||
map[string]any{
|
||||
events.BackupID: b.ID,
|
||||
events.Status: op.Status,
|
||||
events.StartTime: op.Results.StartedAt,
|
||||
events.EndTime: op.Results.CompletedAt,
|
||||
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
||||
// TODO: events.ExchangeResources: <count of resources>,
|
||||
events.BackupID: b.ID,
|
||||
events.Status: op.Status,
|
||||
events.StartTime: op.Results.StartedAt,
|
||||
events.EndTime: op.Results.CompletedAt,
|
||||
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
||||
events.ExchangeResources: op.Results.ResourceOwners,
|
||||
// TODO: events.ExchangeDataObserved: <amount of data retrieved>,
|
||||
// TODO: events.ExchangeDataStored: <amount of data stored>,
|
||||
},
|
||||
|
||||
@ -71,6 +71,7 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
|
||||
assert.Equal(t, op.Results.ItemsRead, stats.gc.Successful, "items read")
|
||||
assert.Equal(t, op.Results.ReadErrors, stats.readErr, "read errors")
|
||||
assert.Equal(t, op.Results.ItemsWritten, stats.k.TotalFileCount, "items written")
|
||||
assert.Equal(t, 0, op.Results.ResourceOwners, "resource owners")
|
||||
assert.Equal(t, op.Results.WriteErrors, stats.writeErr, "write errors")
|
||||
assert.Equal(t, op.Results.StartedAt, now, "started at")
|
||||
assert.Less(t, now, op.Results.CompletedAt, "completed at")
|
||||
@ -214,6 +215,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
|
||||
assert.Equal(t, bo.Status, Completed)
|
||||
assert.Greater(t, bo.Results.ItemsRead, 0)
|
||||
assert.Greater(t, bo.Results.ItemsWritten, 0)
|
||||
assert.Equal(t, 1, bo.Results.ResourceOwners)
|
||||
assert.Zero(t, bo.Results.ReadErrors)
|
||||
assert.Zero(t, bo.Results.WriteErrors)
|
||||
assert.Equal(t, 1, mb.TimesCalled[events.BackupStart], "backup-start events")
|
||||
@ -271,6 +273,7 @@ func (suite *BackupOpIntegrationSuite) TestBackupOneDrive_Run() {
|
||||
require.NotEmpty(t, bo.Results.BackupID)
|
||||
assert.Equal(t, bo.Status, Completed)
|
||||
assert.Equal(t, bo.Results.ItemsRead, bo.Results.ItemsWritten)
|
||||
assert.Equal(t, 1, bo.Results.ResourceOwners)
|
||||
assert.NoError(t, bo.Results.ReadErrors)
|
||||
assert.NoError(t, bo.Results.WriteErrors)
|
||||
assert.Equal(t, 1, mb.TimesCalled[events.BackupStart], "backup-start events")
|
||||
|
||||
@ -77,6 +77,7 @@ func (op RestoreOperation) validate() error {
|
||||
type restoreStats struct {
|
||||
cs []data.Collection
|
||||
gc *support.ConnectorOperationStatus
|
||||
resourceCount int
|
||||
started bool
|
||||
readErr, writeErr error
|
||||
}
|
||||
@ -173,6 +174,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (err error) {
|
||||
|
||||
opStats.readErr = parseErrs.ErrorOrNil()
|
||||
opStats.cs = dcs
|
||||
opStats.resourceCount = len(data.ResourceOwnerSet(dcs))
|
||||
|
||||
// restore those collections using graph
|
||||
gc, err := connector.NewGraphConnector(op.account)
|
||||
@ -222,20 +224,21 @@ func (op *RestoreOperation) persistResults(
|
||||
op.Results.WriteErrors = opStats.writeErr
|
||||
op.Results.ItemsRead = len(opStats.cs) // TODO: file count, not collection count
|
||||
op.Results.ItemsWritten = opStats.gc.Successful
|
||||
op.Results.ResourceOwners = opStats.resourceCount
|
||||
|
||||
op.bus.Event(
|
||||
ctx,
|
||||
events.RestoreEnd,
|
||||
map[string]any{
|
||||
// TODO: RestoreID
|
||||
events.BackupID: op.BackupID,
|
||||
events.Status: op.Status,
|
||||
events.StartTime: op.Results.StartedAt,
|
||||
events.EndTime: op.Results.CompletedAt,
|
||||
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
||||
events.ItemsRead: op.Results.ItemsRead,
|
||||
events.ItemsWritten: op.Results.ItemsWritten,
|
||||
// TODO: events.ExchangeResources: <count of resources>,
|
||||
events.BackupID: op.BackupID,
|
||||
events.Status: op.Status,
|
||||
events.StartTime: op.Results.StartedAt,
|
||||
events.EndTime: op.Results.CompletedAt,
|
||||
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
||||
events.ItemsRead: op.Results.ItemsRead,
|
||||
events.ItemsWritten: op.Results.ItemsWritten,
|
||||
events.ExchangeResources: op.Results.ResourceOwners,
|
||||
// TODO: events.ExchangeDataObserved: <amount of data retrieved>,
|
||||
},
|
||||
)
|
||||
|
||||
@ -75,6 +75,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
|
||||
assert.Equal(t, op.Results.ItemsRead, len(stats.cs), "items read")
|
||||
assert.Equal(t, op.Results.ReadErrors, stats.readErr, "read errors")
|
||||
assert.Equal(t, op.Results.ItemsWritten, stats.gc.Successful, "items written")
|
||||
assert.Equal(t, 0, op.Results.ResourceOwners, "resource owners")
|
||||
assert.Equal(t, op.Results.WriteErrors, stats.writeErr, "write errors")
|
||||
assert.Equal(t, op.Results.StartedAt, now, "started at")
|
||||
assert.Less(t, now, op.Results.CompletedAt, "completed at")
|
||||
@ -232,6 +233,7 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() {
|
||||
assert.Equal(t, ro.Status, Completed, "restoreOp status")
|
||||
assert.Greater(t, ro.Results.ItemsRead, 0, "restore items read")
|
||||
assert.Greater(t, ro.Results.ItemsWritten, 0, "restored items written")
|
||||
assert.Equal(t, 1, ro.Results.ResourceOwners)
|
||||
assert.Zero(t, ro.Results.ReadErrors, "errors while reading restore data")
|
||||
assert.Zero(t, ro.Results.WriteErrors, "errors while writing restore data")
|
||||
assert.Equal(t, suite.numItems, ro.Results.ItemsWritten, "backup and restore wrote the same num of items")
|
||||
@ -259,6 +261,7 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() {
|
||||
mb)
|
||||
require.NoError(t, err)
|
||||
require.Error(t, ro.Run(ctx), "restoreOp.Run() should have 0 results")
|
||||
assert.Equal(t, 0, ro.Results.ResourceOwners)
|
||||
assert.Equal(t, 1, mb.TimesCalled[events.RestoreStart], "restore-start events")
|
||||
assert.Equal(t, 0, mb.TimesCalled[events.RestoreEnd], "restore-end events")
|
||||
}
|
||||
|
||||
@ -7,10 +7,11 @@ import "time"
|
||||
// assumed to be successful, so the total count of items involved
|
||||
// would be ItemsRead+ReadErrors.
|
||||
type ReadWrites struct {
|
||||
ItemsRead int `json:"itemsRead,omitempty"`
|
||||
ItemsWritten int `json:"itemsWritten,omitempty"`
|
||||
ReadErrors error `json:"readErrors,omitempty"`
|
||||
WriteErrors error `json:"writeErrors,omitempty"`
|
||||
ItemsRead int `json:"itemsRead,omitempty"`
|
||||
ItemsWritten int `json:"itemsWritten,omitempty"`
|
||||
ReadErrors error `json:"readErrors,omitempty"`
|
||||
WriteErrors error `json:"writeErrors,omitempty"`
|
||||
ResourceOwners int `json:"resourceOwners,omitempty"`
|
||||
}
|
||||
|
||||
// StartAndEndTime tracks a paired starting time and ending time.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user