diff --git a/src/internal/converters/eml/eml_test.go b/src/internal/converters/eml/eml_test.go index cdd010978..fb8c2ef2d 100644 --- a/src/internal/converters/eml/eml_test.go +++ b/src/internal/converters/eml/eml_test.go @@ -131,31 +131,60 @@ func (suite *EMLUnitSuite) TestConvert_messageble_to_eml() { assert.Equal(t, source, target) } -func (suite *EMLUnitSuite) TestConvert_empty_attachment_no_err() { - t := suite.T() +func (suite *EMLUnitSuite) TestConvert_edge_cases() { + tests := []struct { + name string + transform func(models.Messageable) + }{ + { + name: "just a name", + transform: func(msg models.Messageable) { + msg.GetFrom().GetEmailAddress().SetName(ptr.To("alphabob")) + msg.GetFrom().GetEmailAddress().SetAddress(nil) + }, + }, + { + name: "incorrect address", + transform: func(msg models.Messageable) { + msg.GetFrom().GetEmailAddress().SetAddress(ptr.To("invalid")) + }, + }, + { + name: "empty attachment", + transform: func(msg models.Messageable) { + attachments := msg.GetAttachments() + err := attachments[0].GetBackingStore().Set("contentBytes", []uint8{}) + require.NoError(suite.T(), err, "setting attachment content") + }, + }, + } - ctx, flush := tester.NewContext(t) - defer flush() + for _, test := range tests { + suite.Run(test.name, func() { + t := suite.T() - body := []byte(testdata.EmailWithAttachments) + ctx, flush := tester.NewContext(t) + defer flush() - msg, err := api.BytesToMessageable(body) - require.NoError(t, err, "creating message") + body := []byte(testdata.EmailWithAttachments) - attachments := msg.GetAttachments() - err = attachments[0].GetBackingStore().Set("contentBytes", []uint8{}) - require.NoError(t, err, "setting content bytes") + msg, err := api.BytesToMessageable(body) + require.NoError(t, err, "creating message") - writer := kjson.NewJsonSerializationWriter() + test.transform(msg) - defer writer.Close() + writer := kjson.NewJsonSerializationWriter() - err = writer.WriteObjectValue("", msg) - require.NoError(t, err, "serializing message") + defer writer.Close() - nbody, err := writer.GetSerializedContent() - require.NoError(t, err, "getting serialized content") + err = writer.WriteObjectValue("", msg) + require.NoError(t, err, "serializing message") - _, err = FromJSON(ctx, nbody) - assert.NoError(t, err, "converting to eml") + nbody, err := writer.GetSerializedContent() + require.NoError(t, err, "getting serialized content") + + _, err = FromJSON(ctx, nbody) + assert.NoError(t, err, "converting to eml") + }) + } } diff --git a/src/internal/data/metrics.go b/src/internal/data/metrics.go index e07ad584e..f34d20a16 100644 --- a/src/internal/data/metrics.go +++ b/src/internal/data/metrics.go @@ -1,12 +1,5 @@ package data -import ( - "io" - "sync/atomic" - - "github.com/alcionai/corso/src/pkg/path" -) - type CollectionStats struct { Folders, Objects, @@ -22,68 +15,3 @@ func (cs CollectionStats) IsZero() bool { func (cs CollectionStats) String() string { return cs.Details } - -type KindStats struct { - BytesRead int64 - ResourceCount int64 -} - -type ExportStats struct { - // data is kept private so that we can enforce atomic int updates - data map[path.CategoryType]KindStats -} - -func (es *ExportStats) UpdateBytes(kind path.CategoryType, bytesRead int64) { - if es.data == nil { - es.data = map[path.CategoryType]KindStats{} - } - - ks := es.data[kind] - atomic.AddInt64(&ks.BytesRead, bytesRead) - es.data[kind] = ks -} - -func (es *ExportStats) UpdateResourceCount(kind path.CategoryType) { - if es.data == nil { - es.data = map[path.CategoryType]KindStats{} - } - - ks := es.data[kind] - atomic.AddInt64(&ks.ResourceCount, 1) - es.data[kind] = ks -} - -func (es *ExportStats) GetStats() map[path.CategoryType]KindStats { - return es.data -} - -type statsReader struct { - io.ReadCloser - kind path.CategoryType - stats *ExportStats -} - -func (sr *statsReader) Read(p []byte) (int, error) { - n, err := sr.ReadCloser.Read(p) - sr.stats.UpdateBytes(sr.kind, int64(n)) - - return n, err -} - -// Create a function that will take a reader and return a reader that -// will update the stats -func ReaderWithStats( - reader io.ReadCloser, - kind path.CategoryType, - stats *ExportStats, -) io.ReadCloser { - if reader == nil { - return nil - } - - return &statsReader{ - ReadCloser: reader, - kind: kind, - stats: stats, - } -} diff --git a/src/internal/m365/collection/drive/export.go b/src/internal/m365/collection/drive/export.go index 8e307efb6..956c4e9b3 100644 --- a/src/internal/m365/collection/drive/export.go +++ b/src/internal/m365/collection/drive/export.go @@ -12,6 +12,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" ) @@ -19,7 +20,7 @@ func NewExportCollection( baseDir string, backingCollection []data.RestoreCollection, backupVersion int, - stats *data.ExportStats, + stats *metrics.ExportStats, ) export.Collectioner { return export.BaseCollection{ BaseDir: baseDir, @@ -37,7 +38,7 @@ func streamItems( backupVersion int, cec control.ExportConfig, ch chan<- export.Item, - stats *data.ExportStats, + stats *metrics.ExportStats, ) { defer close(ch) @@ -61,7 +62,7 @@ func streamItems( } stats.UpdateResourceCount(path.FilesCategory) - body := data.ReaderWithStats(item.ToReader(), path.FilesCategory, stats) + body := metrics.ReaderWithStats(item.ToReader(), path.FilesCategory, stats) ch <- export.Item{ ID: itemUUID, diff --git a/src/internal/m365/collection/exchange/export.go b/src/internal/m365/collection/exchange/export.go index 59f24a527..811a3da4e 100644 --- a/src/internal/m365/collection/exchange/export.go +++ b/src/internal/m365/collection/exchange/export.go @@ -12,6 +12,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" ) @@ -19,7 +20,7 @@ func NewExportCollection( baseDir string, backingCollection []data.RestoreCollection, backupVersion int, - stats *data.ExportStats, + stats *metrics.ExportStats, ) export.Collectioner { return export.BaseCollection{ BaseDir: baseDir, @@ -37,7 +38,7 @@ func streamItems( backupVersion int, config control.ExportConfig, ch chan<- export.Item, - stats *data.ExportStats, + stats *metrics.ExportStats, ) { defer close(ch) @@ -77,7 +78,7 @@ func streamItems( } emlReader := io.NopCloser(bytes.NewReader([]byte(email))) - body := data.ReaderWithStats(emlReader, path.EmailCategory, stats) + body := metrics.ReaderWithStats(emlReader, path.EmailCategory, stats) ch <- export.Item{ ID: id, diff --git a/src/internal/m365/collection/groups/export.go b/src/internal/m365/collection/groups/export.go index 468cf1472..c7577296b 100644 --- a/src/internal/m365/collection/groups/export.go +++ b/src/internal/m365/collection/groups/export.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -24,7 +25,7 @@ func NewExportCollection( backingCollections []data.RestoreCollection, backupVersion int, cec control.ExportConfig, - stats *data.ExportStats, + stats *metrics.ExportStats, ) export.Collectioner { return export.BaseCollection{ BaseDir: baseDir, @@ -43,7 +44,7 @@ func streamItems( backupVersion int, cec control.ExportConfig, ch chan<- export.Item, - stats *data.ExportStats, + stats *metrics.ExportStats, ) { defer close(ch) @@ -59,7 +60,7 @@ func streamItems( } } else { stats.UpdateResourceCount(path.ChannelMessagesCategory) - body = data.ReaderWithStats(body, path.ChannelMessagesCategory, stats) + body = metrics.ReaderWithStats(body, path.ChannelMessagesCategory, stats) // messages are exported as json and should be named as such name := item.ID() + ".json" diff --git a/src/internal/m365/collection/groups/export_test.go b/src/internal/m365/collection/groups/export_test.go index 298154bec..bc6247ba7 100644 --- a/src/internal/m365/collection/groups/export_test.go +++ b/src/internal/m365/collection/groups/export_test.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/internal/version" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/export" + "github.com/alcionai/corso/src/pkg/metrics" ) type ExportUnitSuite struct { @@ -91,7 +92,7 @@ func (suite *ExportUnitSuite) TestStreamItems() { version.NoBackup, control.DefaultExportConfig(), ch, - &data.ExportStats{}) + &metrics.ExportStats{}) var ( itm export.Item diff --git a/src/internal/m365/mock/connector.go b/src/internal/m365/mock/connector.go index cebc579d7..d0ea37351 100644 --- a/src/internal/m365/mock/connector.go +++ b/src/internal/m365/mock/connector.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" ) @@ -87,7 +88,7 @@ func (ctrl Controller) ProduceExportCollections( _ int, _ control.ExportConfig, _ []data.RestoreCollection, - _ *data.ExportStats, + _ *metrics.ExportStats, _ *fault.Bus, ) ([]export.Collectioner, error) { return nil, ctrl.Err diff --git a/src/internal/m365/service/exchange/export.go b/src/internal/m365/service/exchange/export.go index 13cbeebbb..f0075c995 100644 --- a/src/internal/m365/service/exchange/export.go +++ b/src/internal/m365/service/exchange/export.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -54,7 +55,7 @@ func (h *baseExchangeHandler) ProduceExportCollections( backupVersion int, exportCfg control.ExportConfig, dcs []data.RestoreCollection, - stats *data.ExportStats, + stats *metrics.ExportStats, errs *fault.Bus, ) ([]export.Collectioner, error) { var ( diff --git a/src/internal/m365/service/exchange/export_test.go b/src/internal/m365/service/exchange/export_test.go index 995b231db..b4d16eae3 100644 --- a/src/internal/m365/service/exchange/export_test.go +++ b/src/internal/m365/service/exchange/export_test.go @@ -18,6 +18,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -144,7 +145,7 @@ func (suite *ExportUnitSuite) TestGetItems() { ctx, flush := tester.NewContext(t) defer flush() - stats := data.ExportStats{} + stats := metrics.ExportStats{} ec := exchange.NewExportCollection( "", []data.RestoreCollection{test.backingCollection}, @@ -184,10 +185,10 @@ func (suite *ExportUnitSuite) TestGetItems() { assert.ErrorIs(t, item.Error, test.expectedItems[i].Error) } - var expectedStats data.ExportStats + var expectedStats metrics.ExportStats if size+count > 0 { // it is only initialized if we have something - expectedStats = data.ExportStats{} + expectedStats = metrics.ExportStats{} expectedStats.UpdateBytes(path.EmailCategory, int64(size)) for i := 0; i < count; i++ { @@ -379,7 +380,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { exportCfg := control.ExportConfig{} - stats := data.ExportStats{} + stats := metrics.ExportStats{} ecs, err := NewExchangeHandler(control.DefaultOptions(), api.Client{}, nil). ProduceExportCollections( @@ -398,7 +399,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { assert.NoError(t, err, "export collections error") assert.Len(t, ecs, len(tt.expectedItems), "num of collections") - expectedStats := data.ExportStats{} + expectedStats := metrics.ExportStats{} // We are dependent on the order the collections are // returned in the test which is not necessary for the diff --git a/src/internal/m365/service/groups/export.go b/src/internal/m365/service/groups/export.go index 5bdc8c0a1..c95933d98 100644 --- a/src/internal/m365/service/groups/export.go +++ b/src/internal/m365/service/groups/export.go @@ -17,6 +17,7 @@ import ( "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -68,7 +69,7 @@ func (h *baseGroupsHandler) ProduceExportCollections( backupVersion int, exportCfg control.ExportConfig, dcs []data.RestoreCollection, - stats *data.ExportStats, + stats *metrics.ExportStats, errs *fault.Bus, ) ([]export.Collectioner, error) { var ( diff --git a/src/internal/m365/service/groups/export_test.go b/src/internal/m365/service/groups/export_test.go index a5a5aaa24..310442253 100644 --- a/src/internal/m365/service/groups/export_test.go +++ b/src/internal/m365/service/groups/export_test.go @@ -20,6 +20,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -95,7 +96,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_messages() { }, } - stats := data.ExportStats{} + stats := metrics.ExportStats{} ecs, err := NewGroupsHandler(control.DefaultOptions(), api.Client{}, nil). ProduceExportCollections( @@ -129,7 +130,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_messages() { assert.Equal(t, expectedItems, fitems, "items") - expectedStats := data.ExportStats{} + expectedStats := metrics.ExportStats{} expectedStats.UpdateBytes(path.ChannelMessagesCategory, int64(size)) expectedStats.UpdateResourceCount(path.ChannelMessagesCategory) assert.Equal(t, expectedStats, stats, "stats") @@ -200,7 +201,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_libraries() { handler := NewGroupsHandler(control.DefaultOptions(), api.Client{}, nil) handler.CacheItemInfo(dii) - stats := data.ExportStats{} + stats := metrics.ExportStats{} ecs, err := handler.ProduceExportCollections( ctx, @@ -232,7 +233,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_libraries() { assert.Equal(t, expectedItems, fitems, "items") - expectedStats := data.ExportStats{} + expectedStats := metrics.ExportStats{} expectedStats.UpdateBytes(path.FilesCategory, int64(size)) expectedStats.UpdateResourceCount(path.FilesCategory) assert.Equal(t, expectedStats, stats, "stats") diff --git a/src/internal/m365/service/onedrive/export.go b/src/internal/m365/service/onedrive/export.go index 304e6b4a0..6e49570cd 100644 --- a/src/internal/m365/service/onedrive/export.go +++ b/src/internal/m365/service/onedrive/export.go @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -61,7 +62,7 @@ func (h *baseOneDriveHandler) ProduceExportCollections( backupVersion int, exportCfg control.ExportConfig, dcs []data.RestoreCollection, - stats *data.ExportStats, + stats *metrics.ExportStats, errs *fault.Bus, ) ([]export.Collectioner, error) { var ( diff --git a/src/internal/m365/service/onedrive/export_test.go b/src/internal/m365/service/onedrive/export_test.go index daaa0eaf0..199bbab6f 100644 --- a/src/internal/m365/service/onedrive/export_test.go +++ b/src/internal/m365/service/onedrive/export_test.go @@ -19,6 +19,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -247,7 +248,7 @@ func (suite *ExportUnitSuite) TestGetItems() { ctx, flush := tester.NewContext(t) defer flush() - stats := data.ExportStats{} + stats := metrics.ExportStats{} ec := drive.NewExportCollection( "", []data.RestoreCollection{test.backingCollection}, @@ -288,10 +289,10 @@ func (suite *ExportUnitSuite) TestGetItems() { assert.ErrorIs(t, item.Error, test.expectedItems[i].Error) } - var expectedStats data.ExportStats + var expectedStats metrics.ExportStats if size+count > 0 { // it is only initialized if we have something - expectedStats = data.ExportStats{} + expectedStats = metrics.ExportStats{} expectedStats.UpdateBytes(path.FilesCategory, int64(size)) for i := 0; i < count; i++ { @@ -340,7 +341,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { }, } - stats := data.ExportStats{} + stats := metrics.ExportStats{} ecs, err := NewOneDriveHandler(control.DefaultOptions(), api.Client{}, nil). ProduceExportCollections( @@ -370,7 +371,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { assert.Equal(t, expectedItems, fitems, "items") - expectedStats := data.ExportStats{} + expectedStats := metrics.ExportStats{} expectedStats.UpdateBytes(path.FilesCategory, int64(size)) expectedStats.UpdateResourceCount(path.FilesCategory) assert.Equal(t, expectedStats, stats, "stats") diff --git a/src/internal/m365/service/sharepoint/export.go b/src/internal/m365/service/sharepoint/export.go index bc5ff4009..72b87b11f 100644 --- a/src/internal/m365/service/sharepoint/export.go +++ b/src/internal/m365/service/sharepoint/export.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -65,7 +66,7 @@ func (h *baseSharePointHandler) ProduceExportCollections( backupVersion int, exportCfg control.ExportConfig, dcs []data.RestoreCollection, - stats *data.ExportStats, + stats *metrics.ExportStats, errs *fault.Bus, ) ([]export.Collectioner, error) { var ( diff --git a/src/internal/m365/service/sharepoint/export_test.go b/src/internal/m365/service/sharepoint/export_test.go index a33655a51..b672a9cda 100644 --- a/src/internal/m365/service/sharepoint/export_test.go +++ b/src/internal/m365/service/sharepoint/export_test.go @@ -19,6 +19,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -129,7 +130,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { handler := NewSharePointHandler(control.DefaultOptions(), api.Client{}, nil) handler.CacheItemInfo(test.itemInfo) - stats := data.ExportStats{} + stats := metrics.ExportStats{} ecs, err := handler.ProduceExportCollections( ctx, @@ -160,7 +161,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() { assert.Equal(t, expectedItems, fitems, "items") - expectedStats := data.ExportStats{} + expectedStats := metrics.ExportStats{} expectedStats.UpdateBytes(path.FilesCategory, int64(size)) expectedStats.UpdateResourceCount(path.FilesCategory) assert.Equal(t, expectedStats, stats, "stats") diff --git a/src/internal/operations/export.go b/src/internal/operations/export.go index 26581f004..3a32f66bd 100644 --- a/src/internal/operations/export.go +++ b/src/internal/operations/export.go @@ -27,6 +27,7 @@ import ( "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/store" @@ -47,7 +48,7 @@ type ExportOperation struct { Selectors selectors.Selector ExportCfg control.ExportConfig Version string - stats data.ExportStats + stats metrics.ExportStats acct account.Account ec inject.ExportConsumer @@ -74,7 +75,7 @@ func NewExportOperation( Selectors: sel, Version: "v0", ec: ec, - stats: data.ExportStats{}, + stats: metrics.ExportStats{}, } if err := op.validate(); err != nil { return ExportOperation{}, err @@ -322,7 +323,7 @@ func (op *ExportOperation) finalizeMetrics( // be calling this once the export collections have been read and process // as the data that will be available here will be the data that was read // and processed. -func (op *ExportOperation) GetStats() map[path.CategoryType]data.KindStats { +func (op *ExportOperation) GetStats() map[path.CategoryType]metrics.KindStats { return op.stats.GetStats() } @@ -336,7 +337,7 @@ func produceExportCollections( backupVersion int, exportCfg control.ExportConfig, dcs []data.RestoreCollection, - exportStats *data.ExportStats, + exportStats *metrics.ExportStats, errs *fault.Bus, ) ([]export.Collectioner, error) { complete := observe.MessageWithCompletion(ctx, observe.ProgressCfg{}, "Preparing export") diff --git a/src/internal/operations/inject/inject.go b/src/internal/operations/inject/inject.go index 04e125a71..6d6ecf08b 100644 --- a/src/internal/operations/inject/inject.go +++ b/src/internal/operations/inject/inject.go @@ -16,6 +16,7 @@ import ( "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" ) @@ -96,7 +97,7 @@ type ( backupVersion int, exportCfg control.ExportConfig, dcs []data.RestoreCollection, - stats *data.ExportStats, + stats *metrics.ExportStats, errs *fault.Bus, ) ([]export.Collectioner, error) diff --git a/src/pkg/export/export.go b/src/pkg/export/export.go index 7b998d30e..9c26cb14b 100644 --- a/src/pkg/export/export.go +++ b/src/pkg/export/export.go @@ -6,6 +6,7 @@ import ( "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/metrics" ) // --------------------------------------------------------------------------- @@ -29,7 +30,7 @@ type itemStreamer func( backupVersion int, cfg control.ExportConfig, ch chan<- Item, - stats *data.ExportStats) + stats *metrics.ExportStats) // BaseCollection holds the foundational details of an export collection. type BaseCollection struct { @@ -47,7 +48,7 @@ type BaseCollection struct { Stream itemStreamer - Stats *data.ExportStats + Stats *metrics.ExportStats } func (bc BaseCollection) BasePath() string { diff --git a/src/pkg/metrics/metrics.go b/src/pkg/metrics/metrics.go new file mode 100644 index 000000000..b45c95439 --- /dev/null +++ b/src/pkg/metrics/metrics.go @@ -0,0 +1,73 @@ +package metrics + +import ( + "io" + "sync/atomic" + + "github.com/alcionai/corso/src/pkg/path" +) + +type KindStats struct { + BytesRead int64 + ResourceCount int64 +} + +type ExportStats struct { + // data is kept private so that we can enforce atomic int updates + data map[path.CategoryType]KindStats +} + +func (es *ExportStats) UpdateBytes(kind path.CategoryType, bytesRead int64) { + if es.data == nil { + es.data = map[path.CategoryType]KindStats{} + } + + ks := es.data[kind] + atomic.AddInt64(&ks.BytesRead, bytesRead) + es.data[kind] = ks +} + +func (es *ExportStats) UpdateResourceCount(kind path.CategoryType) { + if es.data == nil { + es.data = map[path.CategoryType]KindStats{} + } + + ks := es.data[kind] + atomic.AddInt64(&ks.ResourceCount, 1) + es.data[kind] = ks +} + +func (es *ExportStats) GetStats() map[path.CategoryType]KindStats { + return es.data +} + +type statsReader struct { + io.ReadCloser + kind path.CategoryType + stats *ExportStats +} + +func (sr *statsReader) Read(p []byte) (int, error) { + n, err := sr.ReadCloser.Read(p) + sr.stats.UpdateBytes(sr.kind, int64(n)) + + return n, err +} + +// Create a function that will take a reader and return a reader that +// will update the stats +func ReaderWithStats( + reader io.ReadCloser, + kind path.CategoryType, + stats *ExportStats, +) io.ReadCloser { + if reader == nil { + return nil + } + + return &statsReader{ + ReadCloser: reader, + kind: kind, + stats: stats, + } +}