Add support for email exports (#4642)

<!-- PR description-->

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [x] 🕐 Yes, but in a later PR
- [ ]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* https://github.com/alcionai/corso/issues/3893

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abin Simon 2023-11-16 11:13:05 +05:30 committed by GitHub
parent fd9c431bea
commit 140402361a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 636 additions and 10 deletions

View File

@ -25,8 +25,8 @@ const (
dateFormat = "2006-01-02 15:04:05 MST" // from xhit/go-simple-mail
)
// toEml converts a Messageable to .eml format
func toEml(data models.Messageable) (string, error) {
// ToEml converts a Messageable to .eml format
func ToEml(ctx context.Context, data models.Messageable) (string, error) {
email := mail.NewMSG()
if data.GetFrom() != nil {
@ -70,7 +70,7 @@ func toEml(data models.Messageable) (string, error) {
if data.GetReplyTo() != nil {
rts := data.GetReplyTo()
if len(rts) > 1 {
logger.Ctx(context.TODO()).
logger.Ctx(ctx).
With("id", ptr.Val(data.GetId()),
"reply_to_count", len(rts)).
Warn("more than 1 reply to")
@ -103,7 +103,7 @@ func toEml(data models.Messageable) (string, error) {
default:
// https://learn.microsoft.com/en-us/graph/api/resources/itembody?view=graph-rest-1.0#properties
// This should not be possible according to the documentation
logger.Ctx(context.TODO()).
logger.Ctx(ctx).
With("body_type", data.GetBody().GetContentType().String(),
"id", ptr.Val(data.GetId())).
Info("unknown body content type")

View File

@ -1,13 +1,13 @@
package eml
import (
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/converters/eml/testdata"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -23,14 +23,13 @@ func TestEMLUnitSuite(t *testing.T) {
func (suite *EMLUnitSuite) TestConvert_messageble_to_eml() {
t := suite.T()
// read test file into body as []bytes
body, err := os.ReadFile("testdata/email-with-attachments.json")
require.NoError(t, err, "reading test file")
ctx, flush := tester.NewContext(t)
defer flush()
msg, err := api.BytesToMessageable(body)
msg, err := api.BytesToMessageable([]byte(testdata.EmailWithAttachments))
require.NoError(t, err, "creating message")
_, err = toEml(msg)
_, err = ToEml(ctx, msg)
// TODO(meain): add more tests on the generated content
// Cannot test output directly as it contains a random boundary
assert.NoError(t, err, "converting to eml")

View File

@ -0,0 +1,6 @@
package testdata
import _ "embed"
//go:embed email-with-attachments.json
var EmailWithAttachments string

View File

@ -0,0 +1,114 @@
package exchange
import (
"bytes"
"context"
"io"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/converters/eml"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
func NewExportCollection(
baseDir string,
backingCollection []data.RestoreCollection,
backupVersion int,
stats *data.ExportStats,
) export.Collectioner {
return export.BaseCollection{
BaseDir: baseDir,
BackingCollection: backingCollection,
BackupVersion: backupVersion,
Stream: streamItems,
Stats: stats,
}
}
// streamItems streams the streamItems in the backingCollection into the export stream chan
func streamItems(
ctx context.Context,
drc []data.RestoreCollection,
backupVersion int,
config control.ExportConfig,
ch chan<- export.Item,
stats *data.ExportStats,
) {
defer close(ch)
errs := fault.New(false)
for _, rc := range drc {
for item := range rc.Items(ctx, errs) {
id := item.ID()
name := id + ".eml"
stats.UpdateResourceCount(path.EmailCategory)
reader := item.ToReader()
content, err := io.ReadAll(reader)
reader.Close()
if err != nil {
ch <- export.Item{
ID: id,
Error: clues.Wrap(err, "reading data"),
}
continue
}
msg, err := api.BytesToMessageable(content)
if err != nil {
ch <- export.Item{
ID: id,
Error: clues.Wrap(err, "parsing email"),
}
continue
}
email, err := eml.ToEml(ctx, msg)
if err != nil {
ch <- export.Item{
ID: id,
Error: clues.Wrap(err, "converting to eml"),
}
continue
}
emlReader := io.NopCloser(bytes.NewReader([]byte(email)))
body := data.ReaderWithStats(emlReader, path.EmailCategory, stats)
ch <- export.Item{
ID: id,
Name: name,
Body: body,
}
}
items, recovered := errs.ItemsAndRecovered()
// Return all the items that we failed to source from the persistence layer
for _, err := range items {
ch <- export.Item{
ID: err.ID,
Error: &err,
}
}
for _, err := range recovered {
ch <- export.Item{
Error: err,
}
}
}
}

View File

@ -3,6 +3,7 @@ package m365
import (
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/m365/service/exchange"
"github.com/alcionai/corso/src/internal/m365/service/groups"
"github.com/alcionai/corso/src/internal/m365/service/onedrive"
"github.com/alcionai/corso/src/internal/m365/service/sharepoint"
@ -28,6 +29,9 @@ func (ctrl *Controller) NewServiceHandler(
case path.GroupsService:
return groups.NewGroupsHandler(opts, ctrl.AC, ctrl.resourceHandler), nil
case path.ExchangeService:
return exchange.NewExchangeHandler(opts), nil
}
return nil, clues.New("unrecognized service").

View File

@ -0,0 +1,74 @@
package exchange
import (
"context"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/collection/exchange"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
)
var _ inject.ServiceHandler = &baseExchangeHandler{}
func NewExchangeHandler(
opts control.Options,
) *baseExchangeHandler {
return &baseExchangeHandler{
opts: opts,
}
}
type baseExchangeHandler struct {
opts control.Options
}
func (h *baseExchangeHandler) CacheItemInfo(v details.ItemInfo) {}
// ProduceExportCollections will create the export collections for the
// given restore collections.
func (h *baseExchangeHandler) ProduceExportCollections(
ctx context.Context,
backupVersion int,
exportCfg control.ExportConfig,
dcs []data.RestoreCollection,
stats *data.ExportStats,
errs *fault.Bus,
) ([]export.Collectioner, error) {
var (
el = errs.Local()
ec = make([]export.Collectioner, 0, len(dcs))
)
for _, dc := range dcs {
category := dc.FullPath().Category()
switch category {
case path.EmailCategory:
folders := dc.FullPath().Folders()
pth := path.Builder{}.Append(path.EmailCategory.HumanString()).Append(folders...)
ec = append(
ec,
exchange.NewExportCollection(
pth.String(),
[]data.RestoreCollection{dc},
backupVersion,
stats))
case path.EventsCategory, path.ContactsCategory:
logger.Ctx(ctx).With("category", category.String()).Debugw("Skipping restore for category")
default:
return nil, clues.NewWC(ctx, "data category not supported").
With("category", category)
}
}
return ec, el.Failure()
}

View File

@ -0,0 +1,429 @@
package exchange
import (
"bytes"
"io"
"testing"
"github.com/alcionai/clues"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/converters/eml/testdata"
"github.com/alcionai/corso/src/internal/data"
dataMock "github.com/alcionai/corso/src/internal/data/mock"
"github.com/alcionai/corso/src/internal/m365/collection/exchange"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/version"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
type ExportUnitSuite struct {
tester.Suite
}
func TestExportUnitSuite(t *testing.T) {
suite.Run(t, &ExportUnitSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *ExportUnitSuite) TestGetItems() {
emailBodyBytes := []byte(testdata.EmailWithAttachments)
table := []struct {
name string
version int
backingCollection data.RestoreCollection
expectedItems []export.Item
}{
{
name: "single item",
version: 1,
backingCollection: data.NoFetchRestoreCollection{
Collection: dataMock.Collection{
ItemData: []data.Item{
&dataMock.Item{
ItemID: "id1",
Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
},
expectedItems: []export.Item{
{
ID: "id1",
Name: "id1.eml",
Body: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
{
name: "multiple items",
version: 1,
backingCollection: data.NoFetchRestoreCollection{
Collection: dataMock.Collection{
ItemData: []data.Item{
&dataMock.Item{
ItemID: "id1",
Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
&dataMock.Item{
ItemID: "id2",
Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
},
expectedItems: []export.Item{
{
ID: "id1",
Name: "id1.eml",
Body: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
{
ID: "id2",
Name: "id2.eml",
Body: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
{
name: "items with success and fetch error",
version: version.Groups9Update,
backingCollection: data.FetchRestoreCollection{
Collection: dataMock.Collection{
ItemData: []data.Item{
&dataMock.Item{
ItemID: "id0",
Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
&dataMock.Item{
ItemID: "id1",
ReadErr: assert.AnError,
},
&dataMock.Item{
ItemID: "id2",
Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
},
expectedItems: []export.Item{
{
ID: "id0",
Name: "id0.eml",
Body: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
{
ID: "id2",
Name: "id2.eml",
Body: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
{
ID: "",
Error: assert.AnError,
},
},
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
stats := data.ExportStats{}
ec := exchange.NewExportCollection(
"",
[]data.RestoreCollection{test.backingCollection},
test.version,
&stats)
items := ec.Items(ctx)
count := 0
size := 0
fitems := []export.Item{}
for item := range items {
if item.Error == nil {
count++
}
if item.Body != nil {
b, err := io.ReadAll(item.Body)
assert.NoError(t, err, clues.ToCore(err))
size += len(b)
item.Body = io.NopCloser(bytes.NewBuffer(b))
}
fitems = append(fitems, item)
}
assert.Len(t, fitems, len(test.expectedItems), "num of items")
// We do not have any grantees about the ordering of the
// items in the SDK, but leaving the test this way for now
// to simplify testing.
for i, item := range fitems {
assert.Equal(t, test.expectedItems[i].ID, item.ID, "id")
assert.Equal(t, test.expectedItems[i].Name, item.Name, "name")
assert.ErrorIs(t, item.Error, test.expectedItems[i].Error)
}
var expectedStats data.ExportStats
if size+count > 0 { // it is only initialized if we have something
expectedStats = data.ExportStats{}
expectedStats.UpdateBytes(path.EmailCategory, int64(size))
for i := 0; i < count; i++ {
expectedStats.UpdateResourceCount(path.EmailCategory)
}
}
assert.Equal(t, expectedStats, stats, "stats")
})
}
}
func (suite *ExportUnitSuite) TestExportRestoreCollections() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
emailBodyBytes := []byte(testdata.EmailWithAttachments)
pb := path.Builder{}.Append("Inbox")
p, err := pb.ToDataLayerPath("t", "r", path.ExchangeService, path.EmailCategory, false)
assert.NoError(t, err, "build path")
p2, err := pb.ToDataLayerPath("t", "r", path.OneDriveService, path.FilesCategory, false)
assert.NoError(t, err, "build path")
tests := []struct {
name string
dcs []data.RestoreCollection
expectedItems [][]export.Item
hasErr bool
}{
{
name: "single item",
dcs: []data.RestoreCollection{
data.FetchRestoreCollection{
Collection: dataMock.Collection{
Path: p,
ItemData: []data.Item{
&dataMock.Item{
ItemID: "id1",
Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
},
},
expectedItems: [][]export.Item{
{
{
ID: "id1",
Name: "id1.eml",
Body: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
},
{
name: "multiple items",
dcs: []data.RestoreCollection{
data.FetchRestoreCollection{
Collection: dataMock.Collection{
Path: p,
ItemData: []data.Item{
&dataMock.Item{
ItemID: "id1",
Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
&dataMock.Item{
ItemID: "id2",
Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
},
},
expectedItems: [][]export.Item{
{
{
ID: "id1",
Name: "id1.eml",
Body: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
{
ID: "id2",
Name: "id2.eml",
Body: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
},
{
name: "items with success and fetch error",
dcs: []data.RestoreCollection{
data.FetchRestoreCollection{
Collection: dataMock.Collection{
Path: p,
ItemData: []data.Item{
&dataMock.Item{
ItemID: "id1",
ReadErr: assert.AnError,
},
&dataMock.Item{
ItemID: "id2",
Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
},
},
expectedItems: [][]export.Item{
{
{
ID: "id2",
Name: "id2.eml",
Body: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
{
ID: "",
Error: assert.AnError,
},
},
},
},
{
name: "multiple collections",
dcs: []data.RestoreCollection{
data.FetchRestoreCollection{
Collection: dataMock.Collection{
Path: p,
ItemData: []data.Item{
&dataMock.Item{
ItemID: "id1",
Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
},
data.FetchRestoreCollection{
Collection: dataMock.Collection{
Path: p,
ItemData: []data.Item{
&dataMock.Item{
ItemID: "id2",
Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
},
},
expectedItems: [][]export.Item{
{
{
ID: "id1",
Name: "id1.eml",
Body: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
{
{
ID: "id2",
Name: "id2.eml",
Body: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
},
{
name: "collection without exchange category",
dcs: []data.RestoreCollection{
data.FetchRestoreCollection{
Collection: dataMock.Collection{
Path: p2,
ItemData: []data.Item{
&dataMock.Item{
ItemID: "id1",
Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)),
},
},
},
},
},
expectedItems: [][]export.Item{},
hasErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
exportCfg := control.ExportConfig{}
stats := data.ExportStats{}
ecs, err := NewExchangeHandler(control.DefaultOptions()).
ProduceExportCollections(
ctx,
int(version.Backup),
exportCfg,
tt.dcs,
&stats,
fault.New(true))
if tt.hasErr {
assert.Error(t, err, "export collections error")
return
}
assert.NoError(t, err, "export collections error")
assert.Len(t, ecs, len(tt.expectedItems), "num of collections")
expectedStats := data.ExportStats{}
// We are dependent on the order the collections are
// returned in the test which is not necessary for the
// correctness out the output.
for c := range ecs {
i := -1
for item := range ecs[c].Items(ctx) {
i++
size := 0
if item.Body == nil {
assert.ErrorIs(t, item.Error, tt.expectedItems[c][i].Error)
continue
}
// unwrap the body from stats reader
b, err := io.ReadAll(item.Body)
assert.NoError(t, err, clues.ToCore(err))
size += len(b)
expectedStats.UpdateBytes(path.EmailCategory, int64(size))
expectedStats.UpdateResourceCount(path.EmailCategory)
assert.Equal(t, tt.expectedItems[c][i].ID, item.ID, "id")
assert.Equal(t, tt.expectedItems[c][i].Name, item.Name, "name")
assert.NoError(t, item.Error, "error")
}
}
assert.Equal(t, expectedStats, stats, "stats")
})
}
}