Add tests to check for invalid email addresses in eml export (#4881)
<!-- PR description--> --- #### Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [ ] 🕐 Yes, but in a later PR - [x] ⛔ No #### Type of change <!--- Please check the type of change your PR introduces: ---> - [ ] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [x] 🤖 Supportability/Tests - [ ] 💻 CI/Deployment - [ ] 🧹 Tech Debt/Cleanup #### Issue(s) <!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. --> * #<issue> #### Test Plan <!-- How will this be tested prior to merging.--> - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
6f1c5c6249
commit
b896405e92
@ -131,31 +131,60 @@ func (suite *EMLUnitSuite) TestConvert_messageble_to_eml() {
|
||||
assert.Equal(t, source, target)
|
||||
}
|
||||
|
||||
func (suite *EMLUnitSuite) TestConvert_empty_attachment_no_err() {
|
||||
t := suite.T()
|
||||
func (suite *EMLUnitSuite) TestConvert_edge_cases() {
|
||||
tests := []struct {
|
||||
name string
|
||||
transform func(models.Messageable)
|
||||
}{
|
||||
{
|
||||
name: "just a name",
|
||||
transform: func(msg models.Messageable) {
|
||||
msg.GetFrom().GetEmailAddress().SetName(ptr.To("alphabob"))
|
||||
msg.GetFrom().GetEmailAddress().SetAddress(nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "incorrect address",
|
||||
transform: func(msg models.Messageable) {
|
||||
msg.GetFrom().GetEmailAddress().SetAddress(ptr.To("invalid"))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty attachment",
|
||||
transform: func(msg models.Messageable) {
|
||||
attachments := msg.GetAttachments()
|
||||
err := attachments[0].GetBackingStore().Set("contentBytes", []uint8{})
|
||||
require.NoError(suite.T(), err, "setting attachment content")
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
for _, test := range tests {
|
||||
suite.Run(test.name, func() {
|
||||
t := suite.T()
|
||||
|
||||
body := []byte(testdata.EmailWithAttachments)
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
msg, err := api.BytesToMessageable(body)
|
||||
require.NoError(t, err, "creating message")
|
||||
body := []byte(testdata.EmailWithAttachments)
|
||||
|
||||
attachments := msg.GetAttachments()
|
||||
err = attachments[0].GetBackingStore().Set("contentBytes", []uint8{})
|
||||
require.NoError(t, err, "setting content bytes")
|
||||
msg, err := api.BytesToMessageable(body)
|
||||
require.NoError(t, err, "creating message")
|
||||
|
||||
writer := kjson.NewJsonSerializationWriter()
|
||||
test.transform(msg)
|
||||
|
||||
defer writer.Close()
|
||||
writer := kjson.NewJsonSerializationWriter()
|
||||
|
||||
err = writer.WriteObjectValue("", msg)
|
||||
require.NoError(t, err, "serializing message")
|
||||
defer writer.Close()
|
||||
|
||||
nbody, err := writer.GetSerializedContent()
|
||||
require.NoError(t, err, "getting serialized content")
|
||||
err = writer.WriteObjectValue("", msg)
|
||||
require.NoError(t, err, "serializing message")
|
||||
|
||||
_, err = FromJSON(ctx, nbody)
|
||||
assert.NoError(t, err, "converting to eml")
|
||||
nbody, err := writer.GetSerializedContent()
|
||||
require.NoError(t, err, "getting serialized content")
|
||||
|
||||
_, err = FromJSON(ctx, nbody)
|
||||
assert.NoError(t, err, "converting to eml")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,12 +1,5 @@
|
||||
package data
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
type CollectionStats struct {
|
||||
Folders,
|
||||
Objects,
|
||||
@ -22,68 +15,3 @@ func (cs CollectionStats) IsZero() bool {
|
||||
func (cs CollectionStats) String() string {
|
||||
return cs.Details
|
||||
}
|
||||
|
||||
type KindStats struct {
|
||||
BytesRead int64
|
||||
ResourceCount int64
|
||||
}
|
||||
|
||||
type ExportStats struct {
|
||||
// data is kept private so that we can enforce atomic int updates
|
||||
data map[path.CategoryType]KindStats
|
||||
}
|
||||
|
||||
func (es *ExportStats) UpdateBytes(kind path.CategoryType, bytesRead int64) {
|
||||
if es.data == nil {
|
||||
es.data = map[path.CategoryType]KindStats{}
|
||||
}
|
||||
|
||||
ks := es.data[kind]
|
||||
atomic.AddInt64(&ks.BytesRead, bytesRead)
|
||||
es.data[kind] = ks
|
||||
}
|
||||
|
||||
func (es *ExportStats) UpdateResourceCount(kind path.CategoryType) {
|
||||
if es.data == nil {
|
||||
es.data = map[path.CategoryType]KindStats{}
|
||||
}
|
||||
|
||||
ks := es.data[kind]
|
||||
atomic.AddInt64(&ks.ResourceCount, 1)
|
||||
es.data[kind] = ks
|
||||
}
|
||||
|
||||
func (es *ExportStats) GetStats() map[path.CategoryType]KindStats {
|
||||
return es.data
|
||||
}
|
||||
|
||||
type statsReader struct {
|
||||
io.ReadCloser
|
||||
kind path.CategoryType
|
||||
stats *ExportStats
|
||||
}
|
||||
|
||||
func (sr *statsReader) Read(p []byte) (int, error) {
|
||||
n, err := sr.ReadCloser.Read(p)
|
||||
sr.stats.UpdateBytes(sr.kind, int64(n))
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Create a function that will take a reader and return a reader that
|
||||
// will update the stats
|
||||
func ReaderWithStats(
|
||||
reader io.ReadCloser,
|
||||
kind path.CategoryType,
|
||||
stats *ExportStats,
|
||||
) io.ReadCloser {
|
||||
if reader == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &statsReader{
|
||||
ReadCloser: reader,
|
||||
kind: kind,
|
||||
stats: stats,
|
||||
}
|
||||
}
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
@ -19,7 +20,7 @@ func NewExportCollection(
|
||||
baseDir string,
|
||||
backingCollection []data.RestoreCollection,
|
||||
backupVersion int,
|
||||
stats *data.ExportStats,
|
||||
stats *metrics.ExportStats,
|
||||
) export.Collectioner {
|
||||
return export.BaseCollection{
|
||||
BaseDir: baseDir,
|
||||
@ -37,7 +38,7 @@ func streamItems(
|
||||
backupVersion int,
|
||||
cec control.ExportConfig,
|
||||
ch chan<- export.Item,
|
||||
stats *data.ExportStats,
|
||||
stats *metrics.ExportStats,
|
||||
) {
|
||||
defer close(ch)
|
||||
|
||||
@ -61,7 +62,7 @@ func streamItems(
|
||||
}
|
||||
|
||||
stats.UpdateResourceCount(path.FilesCategory)
|
||||
body := data.ReaderWithStats(item.ToReader(), path.FilesCategory, stats)
|
||||
body := metrics.ReaderWithStats(item.ToReader(), path.FilesCategory, stats)
|
||||
|
||||
ch <- export.Item{
|
||||
ID: itemUUID,
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
@ -19,7 +20,7 @@ func NewExportCollection(
|
||||
baseDir string,
|
||||
backingCollection []data.RestoreCollection,
|
||||
backupVersion int,
|
||||
stats *data.ExportStats,
|
||||
stats *metrics.ExportStats,
|
||||
) export.Collectioner {
|
||||
return export.BaseCollection{
|
||||
BaseDir: baseDir,
|
||||
@ -37,7 +38,7 @@ func streamItems(
|
||||
backupVersion int,
|
||||
config control.ExportConfig,
|
||||
ch chan<- export.Item,
|
||||
stats *data.ExportStats,
|
||||
stats *metrics.ExportStats,
|
||||
) {
|
||||
defer close(ch)
|
||||
|
||||
@ -77,7 +78,7 @@ func streamItems(
|
||||
}
|
||||
|
||||
emlReader := io.NopCloser(bytes.NewReader([]byte(email)))
|
||||
body := data.ReaderWithStats(emlReader, path.EmailCategory, stats)
|
||||
body := metrics.ReaderWithStats(emlReader, path.EmailCategory, stats)
|
||||
|
||||
ch <- export.Item{
|
||||
ID: id,
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
)
|
||||
@ -24,7 +25,7 @@ func NewExportCollection(
|
||||
backingCollections []data.RestoreCollection,
|
||||
backupVersion int,
|
||||
cec control.ExportConfig,
|
||||
stats *data.ExportStats,
|
||||
stats *metrics.ExportStats,
|
||||
) export.Collectioner {
|
||||
return export.BaseCollection{
|
||||
BaseDir: baseDir,
|
||||
@ -43,7 +44,7 @@ func streamItems(
|
||||
backupVersion int,
|
||||
cec control.ExportConfig,
|
||||
ch chan<- export.Item,
|
||||
stats *data.ExportStats,
|
||||
stats *metrics.ExportStats,
|
||||
) {
|
||||
defer close(ch)
|
||||
|
||||
@ -59,7 +60,7 @@ func streamItems(
|
||||
}
|
||||
} else {
|
||||
stats.UpdateResourceCount(path.ChannelMessagesCategory)
|
||||
body = data.ReaderWithStats(body, path.ChannelMessagesCategory, stats)
|
||||
body = metrics.ReaderWithStats(body, path.ChannelMessagesCategory, stats)
|
||||
|
||||
// messages are exported as json and should be named as such
|
||||
name := item.ID() + ".json"
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/version"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
)
|
||||
|
||||
type ExportUnitSuite struct {
|
||||
@ -91,7 +92,7 @@ func (suite *ExportUnitSuite) TestStreamItems() {
|
||||
version.NoBackup,
|
||||
control.DefaultExportConfig(),
|
||||
ch,
|
||||
&data.ExportStats{})
|
||||
&metrics.ExportStats{})
|
||||
|
||||
var (
|
||||
itm export.Item
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/count"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
@ -87,7 +88,7 @@ func (ctrl Controller) ProduceExportCollections(
|
||||
_ int,
|
||||
_ control.ExportConfig,
|
||||
_ []data.RestoreCollection,
|
||||
_ *data.ExportStats,
|
||||
_ *metrics.ExportStats,
|
||||
_ *fault.Bus,
|
||||
) ([]export.Collectioner, error) {
|
||||
return nil, ctrl.Err
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
)
|
||||
@ -54,7 +55,7 @@ func (h *baseExchangeHandler) ProduceExportCollections(
|
||||
backupVersion int,
|
||||
exportCfg control.ExportConfig,
|
||||
dcs []data.RestoreCollection,
|
||||
stats *data.ExportStats,
|
||||
stats *metrics.ExportStats,
|
||||
errs *fault.Bus,
|
||||
) ([]export.Collectioner, error) {
|
||||
var (
|
||||
|
||||
@ -18,6 +18,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
)
|
||||
@ -144,7 +145,7 @@ func (suite *ExportUnitSuite) TestGetItems() {
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
stats := data.ExportStats{}
|
||||
stats := metrics.ExportStats{}
|
||||
ec := exchange.NewExportCollection(
|
||||
"",
|
||||
[]data.RestoreCollection{test.backingCollection},
|
||||
@ -184,10 +185,10 @@ func (suite *ExportUnitSuite) TestGetItems() {
|
||||
assert.ErrorIs(t, item.Error, test.expectedItems[i].Error)
|
||||
}
|
||||
|
||||
var expectedStats data.ExportStats
|
||||
var expectedStats metrics.ExportStats
|
||||
|
||||
if size+count > 0 { // it is only initialized if we have something
|
||||
expectedStats = data.ExportStats{}
|
||||
expectedStats = metrics.ExportStats{}
|
||||
expectedStats.UpdateBytes(path.EmailCategory, int64(size))
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
@ -379,7 +380,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
exportCfg := control.ExportConfig{}
|
||||
stats := data.ExportStats{}
|
||||
stats := metrics.ExportStats{}
|
||||
|
||||
ecs, err := NewExchangeHandler(control.DefaultOptions(), api.Client{}, nil).
|
||||
ProduceExportCollections(
|
||||
@ -398,7 +399,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
|
||||
assert.NoError(t, err, "export collections error")
|
||||
assert.Len(t, ecs, len(tt.expectedItems), "num of collections")
|
||||
|
||||
expectedStats := data.ExportStats{}
|
||||
expectedStats := metrics.ExportStats{}
|
||||
|
||||
// We are dependent on the order the collections are
|
||||
// returned in the test which is not necessary for the
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
)
|
||||
@ -68,7 +69,7 @@ func (h *baseGroupsHandler) ProduceExportCollections(
|
||||
backupVersion int,
|
||||
exportCfg control.ExportConfig,
|
||||
dcs []data.RestoreCollection,
|
||||
stats *data.ExportStats,
|
||||
stats *metrics.ExportStats,
|
||||
errs *fault.Bus,
|
||||
) ([]export.Collectioner, error) {
|
||||
var (
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
)
|
||||
@ -95,7 +96,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_messages() {
|
||||
},
|
||||
}
|
||||
|
||||
stats := data.ExportStats{}
|
||||
stats := metrics.ExportStats{}
|
||||
|
||||
ecs, err := NewGroupsHandler(control.DefaultOptions(), api.Client{}, nil).
|
||||
ProduceExportCollections(
|
||||
@ -129,7 +130,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_messages() {
|
||||
|
||||
assert.Equal(t, expectedItems, fitems, "items")
|
||||
|
||||
expectedStats := data.ExportStats{}
|
||||
expectedStats := metrics.ExportStats{}
|
||||
expectedStats.UpdateBytes(path.ChannelMessagesCategory, int64(size))
|
||||
expectedStats.UpdateResourceCount(path.ChannelMessagesCategory)
|
||||
assert.Equal(t, expectedStats, stats, "stats")
|
||||
@ -200,7 +201,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_libraries() {
|
||||
handler := NewGroupsHandler(control.DefaultOptions(), api.Client{}, nil)
|
||||
handler.CacheItemInfo(dii)
|
||||
|
||||
stats := data.ExportStats{}
|
||||
stats := metrics.ExportStats{}
|
||||
|
||||
ecs, err := handler.ProduceExportCollections(
|
||||
ctx,
|
||||
@ -232,7 +233,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_libraries() {
|
||||
|
||||
assert.Equal(t, expectedItems, fitems, "items")
|
||||
|
||||
expectedStats := data.ExportStats{}
|
||||
expectedStats := metrics.ExportStats{}
|
||||
expectedStats.UpdateBytes(path.FilesCategory, int64(size))
|
||||
expectedStats.UpdateResourceCount(path.FilesCategory)
|
||||
assert.Equal(t, expectedStats, stats, "stats")
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
)
|
||||
@ -61,7 +62,7 @@ func (h *baseOneDriveHandler) ProduceExportCollections(
|
||||
backupVersion int,
|
||||
exportCfg control.ExportConfig,
|
||||
dcs []data.RestoreCollection,
|
||||
stats *data.ExportStats,
|
||||
stats *metrics.ExportStats,
|
||||
errs *fault.Bus,
|
||||
) ([]export.Collectioner, error) {
|
||||
var (
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
)
|
||||
@ -247,7 +248,7 @@ func (suite *ExportUnitSuite) TestGetItems() {
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
stats := data.ExportStats{}
|
||||
stats := metrics.ExportStats{}
|
||||
ec := drive.NewExportCollection(
|
||||
"",
|
||||
[]data.RestoreCollection{test.backingCollection},
|
||||
@ -288,10 +289,10 @@ func (suite *ExportUnitSuite) TestGetItems() {
|
||||
assert.ErrorIs(t, item.Error, test.expectedItems[i].Error)
|
||||
}
|
||||
|
||||
var expectedStats data.ExportStats
|
||||
var expectedStats metrics.ExportStats
|
||||
|
||||
if size+count > 0 { // it is only initialized if we have something
|
||||
expectedStats = data.ExportStats{}
|
||||
expectedStats = metrics.ExportStats{}
|
||||
expectedStats.UpdateBytes(path.FilesCategory, int64(size))
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
@ -340,7 +341,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
|
||||
},
|
||||
}
|
||||
|
||||
stats := data.ExportStats{}
|
||||
stats := metrics.ExportStats{}
|
||||
|
||||
ecs, err := NewOneDriveHandler(control.DefaultOptions(), api.Client{}, nil).
|
||||
ProduceExportCollections(
|
||||
@ -370,7 +371,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
|
||||
|
||||
assert.Equal(t, expectedItems, fitems, "items")
|
||||
|
||||
expectedStats := data.ExportStats{}
|
||||
expectedStats := metrics.ExportStats{}
|
||||
expectedStats.UpdateBytes(path.FilesCategory, int64(size))
|
||||
expectedStats.UpdateResourceCount(path.FilesCategory)
|
||||
assert.Equal(t, expectedStats, stats, "stats")
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
)
|
||||
@ -65,7 +66,7 @@ func (h *baseSharePointHandler) ProduceExportCollections(
|
||||
backupVersion int,
|
||||
exportCfg control.ExportConfig,
|
||||
dcs []data.RestoreCollection,
|
||||
stats *data.ExportStats,
|
||||
stats *metrics.ExportStats,
|
||||
errs *fault.Bus,
|
||||
) ([]export.Collectioner, error) {
|
||||
var (
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
)
|
||||
@ -129,7 +130,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
|
||||
handler := NewSharePointHandler(control.DefaultOptions(), api.Client{}, nil)
|
||||
handler.CacheItemInfo(test.itemInfo)
|
||||
|
||||
stats := data.ExportStats{}
|
||||
stats := metrics.ExportStats{}
|
||||
|
||||
ecs, err := handler.ProduceExportCollections(
|
||||
ctx,
|
||||
@ -160,7 +161,7 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections() {
|
||||
|
||||
assert.Equal(t, expectedItems, fitems, "items")
|
||||
|
||||
expectedStats := data.ExportStats{}
|
||||
expectedStats := metrics.ExportStats{}
|
||||
expectedStats.UpdateBytes(path.FilesCategory, int64(size))
|
||||
expectedStats.UpdateResourceCount(path.FilesCategory)
|
||||
assert.Equal(t, expectedStats, stats, "stats")
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
"github.com/alcionai/corso/src/pkg/store"
|
||||
@ -47,7 +48,7 @@ type ExportOperation struct {
|
||||
Selectors selectors.Selector
|
||||
ExportCfg control.ExportConfig
|
||||
Version string
|
||||
stats data.ExportStats
|
||||
stats metrics.ExportStats
|
||||
|
||||
acct account.Account
|
||||
ec inject.ExportConsumer
|
||||
@ -74,7 +75,7 @@ func NewExportOperation(
|
||||
Selectors: sel,
|
||||
Version: "v0",
|
||||
ec: ec,
|
||||
stats: data.ExportStats{},
|
||||
stats: metrics.ExportStats{},
|
||||
}
|
||||
if err := op.validate(); err != nil {
|
||||
return ExportOperation{}, err
|
||||
@ -322,7 +323,7 @@ func (op *ExportOperation) finalizeMetrics(
|
||||
// be calling this once the export collections have been read and process
|
||||
// as the data that will be available here will be the data that was read
|
||||
// and processed.
|
||||
func (op *ExportOperation) GetStats() map[path.CategoryType]data.KindStats {
|
||||
func (op *ExportOperation) GetStats() map[path.CategoryType]metrics.KindStats {
|
||||
return op.stats.GetStats()
|
||||
}
|
||||
|
||||
@ -336,7 +337,7 @@ func produceExportCollections(
|
||||
backupVersion int,
|
||||
exportCfg control.ExportConfig,
|
||||
dcs []data.RestoreCollection,
|
||||
exportStats *data.ExportStats,
|
||||
exportStats *metrics.ExportStats,
|
||||
errs *fault.Bus,
|
||||
) ([]export.Collectioner, error) {
|
||||
complete := observe.MessageWithCompletion(ctx, observe.ProgressCfg{}, "Preparing export")
|
||||
|
||||
@ -16,6 +16,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/count"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
@ -96,7 +97,7 @@ type (
|
||||
backupVersion int,
|
||||
exportCfg control.ExportConfig,
|
||||
dcs []data.RestoreCollection,
|
||||
stats *data.ExportStats,
|
||||
stats *metrics.ExportStats,
|
||||
errs *fault.Bus,
|
||||
) ([]export.Collectioner, error)
|
||||
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@ -29,7 +30,7 @@ type itemStreamer func(
|
||||
backupVersion int,
|
||||
cfg control.ExportConfig,
|
||||
ch chan<- Item,
|
||||
stats *data.ExportStats)
|
||||
stats *metrics.ExportStats)
|
||||
|
||||
// BaseCollection holds the foundational details of an export collection.
|
||||
type BaseCollection struct {
|
||||
@ -47,7 +48,7 @@ type BaseCollection struct {
|
||||
|
||||
Stream itemStreamer
|
||||
|
||||
Stats *data.ExportStats
|
||||
Stats *metrics.ExportStats
|
||||
}
|
||||
|
||||
func (bc BaseCollection) BasePath() string {
|
||||
|
||||
73
src/pkg/metrics/metrics.go
Normal file
73
src/pkg/metrics/metrics.go
Normal file
@ -0,0 +1,73 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
type KindStats struct {
|
||||
BytesRead int64
|
||||
ResourceCount int64
|
||||
}
|
||||
|
||||
type ExportStats struct {
|
||||
// data is kept private so that we can enforce atomic int updates
|
||||
data map[path.CategoryType]KindStats
|
||||
}
|
||||
|
||||
func (es *ExportStats) UpdateBytes(kind path.CategoryType, bytesRead int64) {
|
||||
if es.data == nil {
|
||||
es.data = map[path.CategoryType]KindStats{}
|
||||
}
|
||||
|
||||
ks := es.data[kind]
|
||||
atomic.AddInt64(&ks.BytesRead, bytesRead)
|
||||
es.data[kind] = ks
|
||||
}
|
||||
|
||||
func (es *ExportStats) UpdateResourceCount(kind path.CategoryType) {
|
||||
if es.data == nil {
|
||||
es.data = map[path.CategoryType]KindStats{}
|
||||
}
|
||||
|
||||
ks := es.data[kind]
|
||||
atomic.AddInt64(&ks.ResourceCount, 1)
|
||||
es.data[kind] = ks
|
||||
}
|
||||
|
||||
func (es *ExportStats) GetStats() map[path.CategoryType]KindStats {
|
||||
return es.data
|
||||
}
|
||||
|
||||
type statsReader struct {
|
||||
io.ReadCloser
|
||||
kind path.CategoryType
|
||||
stats *ExportStats
|
||||
}
|
||||
|
||||
func (sr *statsReader) Read(p []byte) (int, error) {
|
||||
n, err := sr.ReadCloser.Read(p)
|
||||
sr.stats.UpdateBytes(sr.kind, int64(n))
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Create a function that will take a reader and return a reader that
|
||||
// will update the stats
|
||||
func ReaderWithStats(
|
||||
reader io.ReadCloser,
|
||||
kind path.CategoryType,
|
||||
stats *ExportStats,
|
||||
) io.ReadCloser {
|
||||
if reader == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &statsReader{
|
||||
ReadCloser: reader,
|
||||
kind: kind,
|
||||
stats: stats,
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user