swap pkg/metrics to threadsafe map (#4899)

<!-- PR description-->

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🐛 Bugfix
- [x] 🧹 Tech Debt/Cleanup

#### Test Plan

- [x] 💚 E2E
This commit is contained in:
Keepers 2024-01-08 13:22:54 -07:00 committed by GitHub
parent 443aed639c
commit af64b487ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 124 additions and 40 deletions

View File

@ -1,9 +1,15 @@
package syncd
import (
"sync"
"github.com/puzpuzpuz/xsync/v3"
)
// ---------------------------------------------------------------------------
// string -> V
// ---------------------------------------------------------------------------
// MapTo produces a threadsafe map[string]V
type MapTo[V any] struct {
xmo *xsync.MapOf[string, V]
@ -27,3 +33,60 @@ func (m MapTo[V]) Load(k string) (V, bool) {
func (m MapTo[V]) Size() int {
return m.xmo.Size()
}
// ---------------------------------------------------------------------------
// K -> V
// ---------------------------------------------------------------------------
// for laxy initialization
var mu sync.Mutex
// MapOf produces a threadsafe map[K]V
type MapOf[K comparable, V any] struct {
xmo *xsync.MapOf[K, V]
}
// NewMapOf produces a new threadsafe mapOf[K]V
func NewMapOf[K comparable, V any]() MapOf[K, V] {
return MapOf[K, V]{
xmo: xsync.NewMapOf[K, V](),
}
}
// LazyInit ensures the underlying map is populated.
// no-op if already initialized.
func (m *MapOf[K, V]) LazyInit() {
mu.Lock()
defer mu.Unlock()
if m.xmo == nil {
m.xmo = xsync.NewMapOf[K, V]()
}
}
func (m MapOf[K, V]) Store(k K, v V) {
m.xmo.Store(k, v)
}
func (m MapOf[K, V]) Load(k K) (V, bool) {
return m.xmo.Load(k)
}
func (m MapOf[K, V]) Size() int {
return m.xmo.Size()
}
func (m MapOf[K, V]) Values() map[K]V {
vs := map[K]V{}
if m.xmo == nil {
return vs
}
m.xmo.Range(func(k K, v V) bool {
vs[k] = v
return true
})
return vs
}

View File

@ -169,12 +169,12 @@ func (suite *ExportUnitSuite) TestGetItems() {
ctx, flush := tester.NewContext(t)
defer flush()
stats := metrics.ExportStats{}
stats := metrics.NewExportStats()
ec := exchange.NewExportCollection(
"",
[]data.RestoreCollection{test.backingCollection},
test.version,
&stats)
stats)
items := ec.Items(ctx)
@ -220,7 +220,7 @@ func (suite *ExportUnitSuite) TestGetItems() {
}
}
assert.Equal(t, expectedStats, stats, "stats")
assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats")
})
}
}
@ -404,7 +404,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
exportCfg := control.ExportConfig{}
stats := metrics.ExportStats{}
stats := metrics.NewExportStats()
ecs, err := NewExchangeHandler(api.Client{}, nil).
ProduceExportCollections(
@ -412,7 +412,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
int(version.Backup),
exportCfg,
tt.dcs,
&stats,
stats,
fault.New(true))
if tt.hasErr {
@ -423,7 +423,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
assert.NoError(t, err, "export collections error")
assert.Len(t, ecs, len(tt.expectedItems), "num of collections")
expectedStats := metrics.ExportStats{}
expectedStats := metrics.NewExportStats()
// We are dependent on the order the collections are
// returned in the test which is not necessary for the
@ -456,7 +456,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
}
}
assert.Equal(t, expectedStats, stats, "stats")
assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats")
})
}
}

View File

@ -96,7 +96,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_messages() {
},
}
stats := metrics.ExportStats{}
stats := metrics.NewExportStats()
ecs, err := NewGroupsHandler(api.Client{}, nil).
ProduceExportCollections(
@ -104,7 +104,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_messages() {
int(version.Backup),
exportCfg,
dcs,
&stats,
stats,
fault.New(true))
assert.NoError(t, err, "export collections error")
assert.Len(t, ecs, 1, "num of collections")
@ -130,10 +130,10 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_messages() {
assert.Equal(t, expectedItems, fitems, "items")
expectedStats := metrics.ExportStats{}
expectedStats := metrics.NewExportStats()
expectedStats.UpdateBytes(path.ChannelMessagesCategory, int64(size))
expectedStats.UpdateResourceCount(path.ChannelMessagesCategory)
assert.Equal(t, expectedStats, stats, "stats")
assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats")
}
func (suite *ExportUnitSuite) TestExportRestoreCollections_libraries() {
@ -201,14 +201,14 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_libraries() {
handler := NewGroupsHandler(api.Client{}, nil)
handler.CacheItemInfo(dii)
stats := metrics.ExportStats{}
stats := metrics.NewExportStats()
ecs, err := handler.ProduceExportCollections(
ctx,
int(version.Backup),
exportCfg,
dcs,
&stats,
stats,
fault.New(true))
assert.NoError(t, err, "export collections error")
assert.Len(t, ecs, 1, "num of collections")
@ -233,8 +233,8 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_libraries() {
assert.Equal(t, expectedItems, fitems, "items")
expectedStats := metrics.ExportStats{}
expectedStats := metrics.NewExportStats()
expectedStats.UpdateBytes(path.FilesCategory, int64(size))
expectedStats.UpdateResourceCount(path.FilesCategory)
assert.Equal(t, expectedStats, stats, "stats")
assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats")
}

View File

@ -248,12 +248,12 @@ func (suite *ExportUnitSuite) TestGetItems() {
ctx, flush := tester.NewContext(t)
defer flush()
stats := metrics.ExportStats{}
stats := metrics.NewExportStats()
ec := drive.NewExportCollection(
"",
[]data.RestoreCollection{test.backingCollection},
test.version,
&stats)
stats)
items := ec.Items(ctx)
@ -300,7 +300,7 @@ func (suite *ExportUnitSuite) TestGetItems() {
}
}
assert.Equal(t, expectedStats, stats, "stats")
assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats")
})
}
}
@ -341,7 +341,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
},
}
stats := metrics.ExportStats{}
stats := metrics.NewExportStats()
ecs, err := NewOneDriveHandler(api.Client{}, nil).
ProduceExportCollections(
@ -349,7 +349,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
int(version.Backup),
exportCfg,
dcs,
&stats,
stats,
fault.New(true))
assert.NoError(t, err, "export collections error")
assert.Len(t, ecs, 1, "num of collections")
@ -371,8 +371,8 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
assert.Equal(t, expectedItems, fitems, "items")
expectedStats := metrics.ExportStats{}
expectedStats := metrics.NewExportStats()
expectedStats.UpdateBytes(path.FilesCategory, int64(size))
expectedStats.UpdateResourceCount(path.FilesCategory)
assert.Equal(t, expectedStats, stats, "stats")
assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats")
}

View File

@ -189,14 +189,14 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
handler := NewSharePointHandler(api.Client{}, nil)
handler.CacheItemInfo(test.itemInfo)
stats := metrics.ExportStats{}
stats := metrics.NewExportStats()
ecs, err := handler.ProduceExportCollections(
ctx,
int(version.Backup),
exportCfg,
dcs,
&stats,
stats,
fault.New(true))
assert.NoError(t, err, "export collections error")
assert.Len(t, ecs, 1, "num of collections")
@ -220,10 +220,10 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
assert.Equal(t, test.expectedItems, fitems, "items")
expectedStats := metrics.ExportStats{}
expectedStats := metrics.NewExportStats()
expectedStats.UpdateBytes(test.statsCat, int64(size))
expectedStats.UpdateResourceCount(test.statsCat)
assert.Equal(t, expectedStats, stats, "stats")
assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats")
})
}
}

View File

@ -2,8 +2,9 @@ package metrics
import (
"io"
"sync/atomic"
"github.com/alcionai/corso/src/internal/common/syncd"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/path"
)
@ -12,33 +13,53 @@ type KindStats struct {
ResourceCount int64
}
var (
bytesRead count.Key = "bytes-read"
resources count.Key = "resources"
)
type ExportStats struct {
// data is kept private so that we can enforce atomic int updates
data map[path.CategoryType]KindStats
data syncd.MapOf[path.CategoryType, *count.Bus]
}
func (es *ExportStats) UpdateBytes(kind path.CategoryType, bytesRead int64) {
if es.data == nil {
es.data = map[path.CategoryType]KindStats{}
func NewExportStats() *ExportStats {
return &ExportStats{
data: syncd.NewMapOf[path.CategoryType, *count.Bus](),
}
}
ks := es.data[kind]
atomic.AddInt64(&ks.BytesRead, bytesRead)
es.data[kind] = ks
func (es *ExportStats) UpdateBytes(kind path.CategoryType, numBytes int64) {
es.getCB(kind).Add(bytesRead, numBytes)
}
func (es *ExportStats) UpdateResourceCount(kind path.CategoryType) {
if es.data == nil {
es.data = map[path.CategoryType]KindStats{}
es.getCB(kind).Inc(resources)
}
func (es *ExportStats) getCB(kind path.CategoryType) *count.Bus {
es.data.LazyInit()
cb, ok := es.data.Load(kind)
if !ok {
cb = count.New()
es.data.Store(kind, cb)
}
ks := es.data[kind]
atomic.AddInt64(&ks.ResourceCount, 1)
es.data[kind] = ks
return cb
}
func (es *ExportStats) GetStats() map[path.CategoryType]KindStats {
return es.data
toKindStats := map[path.CategoryType]KindStats{}
for k, cb := range es.data.Values() {
toKindStats[k] = KindStats{
BytesRead: cb.Get(bytesRead),
ResourceCount: cb.Get(resources),
}
}
return toKindStats
}
type statsReader struct {