diff --git a/src/internal/common/syncd/map.go b/src/internal/common/syncd/map.go index c515b1b63..45bcbba24 100644 --- a/src/internal/common/syncd/map.go +++ b/src/internal/common/syncd/map.go @@ -1,9 +1,15 @@ package syncd import ( + "sync" + "github.com/puzpuzpuz/xsync/v3" ) +// --------------------------------------------------------------------------- +// string -> V +// --------------------------------------------------------------------------- + // MapTo produces a threadsafe map[string]V type MapTo[V any] struct { xmo *xsync.MapOf[string, V] @@ -27,3 +33,60 @@ func (m MapTo[V]) Load(k string) (V, bool) { func (m MapTo[V]) Size() int { return m.xmo.Size() } + +// --------------------------------------------------------------------------- +// K -> V +// --------------------------------------------------------------------------- + +// for laxy initialization +var mu sync.Mutex + +// MapOf produces a threadsafe map[K]V +type MapOf[K comparable, V any] struct { + xmo *xsync.MapOf[K, V] +} + +// NewMapOf produces a new threadsafe mapOf[K]V +func NewMapOf[K comparable, V any]() MapOf[K, V] { + return MapOf[K, V]{ + xmo: xsync.NewMapOf[K, V](), + } +} + +// LazyInit ensures the underlying map is populated. +// no-op if already initialized. +func (m *MapOf[K, V]) LazyInit() { + mu.Lock() + defer mu.Unlock() + + if m.xmo == nil { + m.xmo = xsync.NewMapOf[K, V]() + } +} + +func (m MapOf[K, V]) Store(k K, v V) { + m.xmo.Store(k, v) +} + +func (m MapOf[K, V]) Load(k K) (V, bool) { + return m.xmo.Load(k) +} + +func (m MapOf[K, V]) Size() int { + return m.xmo.Size() +} + +func (m MapOf[K, V]) Values() map[K]V { + vs := map[K]V{} + + if m.xmo == nil { + return vs + } + + m.xmo.Range(func(k K, v V) bool { + vs[k] = v + return true + }) + + return vs +} diff --git a/src/internal/m365/service/exchange/export_test.go b/src/internal/m365/service/exchange/export_test.go index baf4e4d37..11cf2b91b 100644 --- a/src/internal/m365/service/exchange/export_test.go +++ b/src/internal/m365/service/exchange/export_test.go @@ -169,12 +169,12 @@ func (suite *ExportUnitSuite) TestGetItems() { ctx, flush := tester.NewContext(t) defer flush() - stats := metrics.ExportStats{} + stats := metrics.NewExportStats() ec := exchange.NewExportCollection( "", []data.RestoreCollection{test.backingCollection}, test.version, - &stats) + stats) items := ec.Items(ctx) @@ -220,7 +220,7 @@ func (suite *ExportUnitSuite) TestGetItems() { } } - assert.Equal(t, expectedStats, stats, "stats") + assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats") }) } } @@ -404,7 +404,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { exportCfg := control.ExportConfig{} - stats := metrics.ExportStats{} + stats := metrics.NewExportStats() ecs, err := NewExchangeHandler(api.Client{}, nil). ProduceExportCollections( @@ -412,7 +412,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { int(version.Backup), exportCfg, tt.dcs, - &stats, + stats, fault.New(true)) if tt.hasErr { @@ -423,7 +423,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { assert.NoError(t, err, "export collections error") assert.Len(t, ecs, len(tt.expectedItems), "num of collections") - expectedStats := metrics.ExportStats{} + expectedStats := metrics.NewExportStats() // We are dependent on the order the collections are // returned in the test which is not necessary for the @@ -456,7 +456,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { } } - assert.Equal(t, expectedStats, stats, "stats") + assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats") }) } } diff --git a/src/internal/m365/service/groups/export_test.go b/src/internal/m365/service/groups/export_test.go index 5cdb4199c..cb885f0d9 100644 --- a/src/internal/m365/service/groups/export_test.go +++ b/src/internal/m365/service/groups/export_test.go @@ -96,7 +96,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_messages() { }, } - stats := metrics.ExportStats{} + stats := metrics.NewExportStats() ecs, err := NewGroupsHandler(api.Client{}, nil). ProduceExportCollections( @@ -104,7 +104,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_messages() { int(version.Backup), exportCfg, dcs, - &stats, + stats, fault.New(true)) assert.NoError(t, err, "export collections error") assert.Len(t, ecs, 1, "num of collections") @@ -130,10 +130,10 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_messages() { assert.Equal(t, expectedItems, fitems, "items") - expectedStats := metrics.ExportStats{} + expectedStats := metrics.NewExportStats() expectedStats.UpdateBytes(path.ChannelMessagesCategory, int64(size)) expectedStats.UpdateResourceCount(path.ChannelMessagesCategory) - assert.Equal(t, expectedStats, stats, "stats") + assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats") } func (suite *ExportUnitSuite) TestExportRestoreCollections_libraries() { @@ -201,14 +201,14 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_libraries() { handler := NewGroupsHandler(api.Client{}, nil) handler.CacheItemInfo(dii) - stats := metrics.ExportStats{} + stats := metrics.NewExportStats() ecs, err := handler.ProduceExportCollections( ctx, int(version.Backup), exportCfg, dcs, - &stats, + stats, fault.New(true)) assert.NoError(t, err, "export collections error") assert.Len(t, ecs, 1, "num of collections") @@ -233,8 +233,8 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_libraries() { assert.Equal(t, expectedItems, fitems, "items") - expectedStats := metrics.ExportStats{} + expectedStats := metrics.NewExportStats() expectedStats.UpdateBytes(path.FilesCategory, int64(size)) expectedStats.UpdateResourceCount(path.FilesCategory) - assert.Equal(t, expectedStats, stats, "stats") + assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats") } diff --git a/src/internal/m365/service/onedrive/export_test.go b/src/internal/m365/service/onedrive/export_test.go index 817e2378f..810de9a4d 100644 --- a/src/internal/m365/service/onedrive/export_test.go +++ b/src/internal/m365/service/onedrive/export_test.go @@ -248,12 +248,12 @@ func (suite *ExportUnitSuite) TestGetItems() { ctx, flush := tester.NewContext(t) defer flush() - stats := metrics.ExportStats{} + stats := metrics.NewExportStats() ec := drive.NewExportCollection( "", []data.RestoreCollection{test.backingCollection}, test.version, - &stats) + stats) items := ec.Items(ctx) @@ -300,7 +300,7 @@ func (suite *ExportUnitSuite) TestGetItems() { } } - assert.Equal(t, expectedStats, stats, "stats") + assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats") }) } } @@ -341,7 +341,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { }, } - stats := metrics.ExportStats{} + stats := metrics.NewExportStats() ecs, err := NewOneDriveHandler(api.Client{}, nil). ProduceExportCollections( @@ -349,7 +349,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { int(version.Backup), exportCfg, dcs, - &stats, + stats, fault.New(true)) assert.NoError(t, err, "export collections error") assert.Len(t, ecs, 1, "num of collections") @@ -371,8 +371,8 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { assert.Equal(t, expectedItems, fitems, "items") - expectedStats := metrics.ExportStats{} + expectedStats := metrics.NewExportStats() expectedStats.UpdateBytes(path.FilesCategory, int64(size)) expectedStats.UpdateResourceCount(path.FilesCategory) - assert.Equal(t, expectedStats, stats, "stats") + assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats") } diff --git a/src/internal/m365/service/sharepoint/export_test.go b/src/internal/m365/service/sharepoint/export_test.go index eb680b6d5..2eb2aa4c5 100644 --- a/src/internal/m365/service/sharepoint/export_test.go +++ b/src/internal/m365/service/sharepoint/export_test.go @@ -189,14 +189,14 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { handler := NewSharePointHandler(api.Client{}, nil) handler.CacheItemInfo(test.itemInfo) - stats := metrics.ExportStats{} + stats := metrics.NewExportStats() ecs, err := handler.ProduceExportCollections( ctx, int(version.Backup), exportCfg, dcs, - &stats, + stats, fault.New(true)) assert.NoError(t, err, "export collections error") assert.Len(t, ecs, 1, "num of collections") @@ -220,10 +220,10 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { assert.Equal(t, test.expectedItems, fitems, "items") - expectedStats := metrics.ExportStats{} + expectedStats := metrics.NewExportStats() expectedStats.UpdateBytes(test.statsCat, int64(size)) expectedStats.UpdateResourceCount(test.statsCat) - assert.Equal(t, expectedStats, stats, "stats") + assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats") }) } } diff --git a/src/pkg/metrics/metrics.go b/src/pkg/metrics/metrics.go index b45c95439..9e359ea67 100644 --- a/src/pkg/metrics/metrics.go +++ b/src/pkg/metrics/metrics.go @@ -2,8 +2,9 @@ package metrics import ( "io" - "sync/atomic" + "github.com/alcionai/corso/src/internal/common/syncd" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/path" ) @@ -12,33 +13,53 @@ type KindStats struct { ResourceCount int64 } +var ( + bytesRead count.Key = "bytes-read" + resources count.Key = "resources" +) + type ExportStats struct { // data is kept private so that we can enforce atomic int updates - data map[path.CategoryType]KindStats + data syncd.MapOf[path.CategoryType, *count.Bus] } -func (es *ExportStats) UpdateBytes(kind path.CategoryType, bytesRead int64) { - if es.data == nil { - es.data = map[path.CategoryType]KindStats{} +func NewExportStats() *ExportStats { + return &ExportStats{ + data: syncd.NewMapOf[path.CategoryType, *count.Bus](), } +} - ks := es.data[kind] - atomic.AddInt64(&ks.BytesRead, bytesRead) - es.data[kind] = ks +func (es *ExportStats) UpdateBytes(kind path.CategoryType, numBytes int64) { + es.getCB(kind).Add(bytesRead, numBytes) } func (es *ExportStats) UpdateResourceCount(kind path.CategoryType) { - if es.data == nil { - es.data = map[path.CategoryType]KindStats{} + es.getCB(kind).Inc(resources) +} + +func (es *ExportStats) getCB(kind path.CategoryType) *count.Bus { + es.data.LazyInit() + + cb, ok := es.data.Load(kind) + if !ok { + cb = count.New() + es.data.Store(kind, cb) } - ks := es.data[kind] - atomic.AddInt64(&ks.ResourceCount, 1) - es.data[kind] = ks + return cb } func (es *ExportStats) GetStats() map[path.CategoryType]KindStats { - return es.data + toKindStats := map[path.CategoryType]KindStats{} + + for k, cb := range es.data.Values() { + toKindStats[k] = KindStats{ + BytesRead: cb.Get(bytesRead), + ResourceCount: cb.Get(resources), + } + } + + return toKindStats } type statsReader struct {