diff --git a/src/internal/converters/eml/eml.go b/src/internal/converters/eml/eml.go index 551110d3f..bda1dddd9 100644 --- a/src/internal/converters/eml/eml.go +++ b/src/internal/converters/eml/eml.go @@ -25,8 +25,8 @@ const ( dateFormat = "2006-01-02 15:04:05 MST" // from xhit/go-simple-mail ) -// toEml converts a Messageable to .eml format -func toEml(data models.Messageable) (string, error) { +// ToEml converts a Messageable to .eml format +func ToEml(ctx context.Context, data models.Messageable) (string, error) { email := mail.NewMSG() if data.GetFrom() != nil { @@ -70,7 +70,7 @@ func toEml(data models.Messageable) (string, error) { if data.GetReplyTo() != nil { rts := data.GetReplyTo() if len(rts) > 1 { - logger.Ctx(context.TODO()). + logger.Ctx(ctx). With("id", ptr.Val(data.GetId()), "reply_to_count", len(rts)). Warn("more than 1 reply to") @@ -103,7 +103,7 @@ func toEml(data models.Messageable) (string, error) { default: // https://learn.microsoft.com/en-us/graph/api/resources/itembody?view=graph-rest-1.0#properties // This should not be possible according to the documentation - logger.Ctx(context.TODO()). + logger.Ctx(ctx). With("body_type", data.GetBody().GetContentType().String(), "id", ptr.Val(data.GetId())). Info("unknown body content type") diff --git a/src/internal/converters/eml/eml_test.go b/src/internal/converters/eml/eml_test.go index 8b83ecbdb..3615e1383 100644 --- a/src/internal/converters/eml/eml_test.go +++ b/src/internal/converters/eml/eml_test.go @@ -1,13 +1,13 @@ package eml import ( - "os" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/converters/eml/testdata" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -23,14 +23,13 @@ func TestEMLUnitSuite(t *testing.T) { func (suite *EMLUnitSuite) TestConvert_messageble_to_eml() { t := suite.T() - // read test file into body as []bytes - body, err := os.ReadFile("testdata/email-with-attachments.json") - require.NoError(t, err, "reading test file") + ctx, flush := tester.NewContext(t) + defer flush() - msg, err := api.BytesToMessageable(body) + msg, err := api.BytesToMessageable([]byte(testdata.EmailWithAttachments)) require.NoError(t, err, "creating message") - _, err = toEml(msg) + _, err = ToEml(ctx, msg) // TODO(meain): add more tests on the generated content // Cannot test output directly as it contains a random boundary assert.NoError(t, err, "converting to eml") diff --git a/src/internal/converters/eml/testdata/testdata.go b/src/internal/converters/eml/testdata/testdata.go new file mode 100644 index 000000000..93dc66872 --- /dev/null +++ b/src/internal/converters/eml/testdata/testdata.go @@ -0,0 +1,6 @@ +package testdata + +import _ "embed" + +//go:embed email-with-attachments.json +var EmailWithAttachments string diff --git a/src/internal/m365/collection/exchange/export.go b/src/internal/m365/collection/exchange/export.go new file mode 100644 index 000000000..2a3b2b838 --- /dev/null +++ b/src/internal/m365/collection/exchange/export.go @@ -0,0 +1,114 @@ +package exchange + +import ( + "bytes" + "context" + "io" + + "github.com/alcionai/clues" + + "github.com/alcionai/corso/src/internal/converters/eml" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/export" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/services/m365/api" +) + +func NewExportCollection( + baseDir string, + backingCollection []data.RestoreCollection, + backupVersion int, + stats *data.ExportStats, +) export.Collectioner { + return export.BaseCollection{ + BaseDir: baseDir, + BackingCollection: backingCollection, + BackupVersion: backupVersion, + Stream: streamItems, + Stats: stats, + } +} + +// streamItems streams the streamItems in the backingCollection into the export stream chan +func streamItems( + ctx context.Context, + drc []data.RestoreCollection, + backupVersion int, + config control.ExportConfig, + ch chan<- export.Item, + stats *data.ExportStats, +) { + defer close(ch) + + errs := fault.New(false) + + for _, rc := range drc { + for item := range rc.Items(ctx, errs) { + id := item.ID() + name := id + ".eml" + + stats.UpdateResourceCount(path.EmailCategory) + + reader := item.ToReader() + content, err := io.ReadAll(reader) + + reader.Close() + + if err != nil { + ch <- export.Item{ + ID: id, + Error: clues.Wrap(err, "reading data"), + } + + continue + } + + msg, err := api.BytesToMessageable(content) + if err != nil { + ch <- export.Item{ + ID: id, + Error: clues.Wrap(err, "parsing email"), + } + + continue + } + + email, err := eml.ToEml(ctx, msg) + if err != nil { + ch <- export.Item{ + ID: id, + Error: clues.Wrap(err, "converting to eml"), + } + + continue + } + + emlReader := io.NopCloser(bytes.NewReader([]byte(email))) + body := data.ReaderWithStats(emlReader, path.EmailCategory, stats) + + ch <- export.Item{ + ID: id, + Name: name, + Body: body, + } + } + + items, recovered := errs.ItemsAndRecovered() + + // Return all the items that we failed to source from the persistence layer + for _, err := range items { + ch <- export.Item{ + ID: err.ID, + Error: &err, + } + } + + for _, err := range recovered { + ch <- export.Item{ + Error: err, + } + } + } +} diff --git a/src/internal/m365/export.go b/src/internal/m365/export.go index 7ec8de2da..30ef85b58 100644 --- a/src/internal/m365/export.go +++ b/src/internal/m365/export.go @@ -3,6 +3,7 @@ package m365 import ( "github.com/alcionai/clues" + "github.com/alcionai/corso/src/internal/m365/service/exchange" "github.com/alcionai/corso/src/internal/m365/service/groups" "github.com/alcionai/corso/src/internal/m365/service/onedrive" "github.com/alcionai/corso/src/internal/m365/service/sharepoint" @@ -28,6 +29,9 @@ func (ctrl *Controller) NewServiceHandler( case path.GroupsService: return groups.NewGroupsHandler(opts, ctrl.AC, ctrl.resourceHandler), nil + + case path.ExchangeService: + return exchange.NewExchangeHandler(opts), nil } return nil, clues.New("unrecognized service"). diff --git a/src/internal/m365/service/exchange/export.go b/src/internal/m365/service/exchange/export.go new file mode 100644 index 000000000..4cb194f8a --- /dev/null +++ b/src/internal/m365/service/exchange/export.go @@ -0,0 +1,74 @@ +package exchange + +import ( + "context" + + "github.com/alcionai/clues" + + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/m365/collection/exchange" + "github.com/alcionai/corso/src/internal/operations/inject" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/export" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" +) + +var _ inject.ServiceHandler = &baseExchangeHandler{} + +func NewExchangeHandler( + opts control.Options, +) *baseExchangeHandler { + return &baseExchangeHandler{ + opts: opts, + } +} + +type baseExchangeHandler struct { + opts control.Options +} + +func (h *baseExchangeHandler) CacheItemInfo(v details.ItemInfo) {} + +// ProduceExportCollections will create the export collections for the +// given restore collections. +func (h *baseExchangeHandler) ProduceExportCollections( + ctx context.Context, + backupVersion int, + exportCfg control.ExportConfig, + dcs []data.RestoreCollection, + stats *data.ExportStats, + errs *fault.Bus, +) ([]export.Collectioner, error) { + var ( + el = errs.Local() + ec = make([]export.Collectioner, 0, len(dcs)) + ) + + for _, dc := range dcs { + category := dc.FullPath().Category() + + switch category { + case path.EmailCategory: + folders := dc.FullPath().Folders() + pth := path.Builder{}.Append(path.EmailCategory.HumanString()).Append(folders...) + + ec = append( + ec, + exchange.NewExportCollection( + pth.String(), + []data.RestoreCollection{dc}, + backupVersion, + stats)) + case path.EventsCategory, path.ContactsCategory: + logger.Ctx(ctx).With("category", category.String()).Debugw("Skipping restore for category") + default: + return nil, clues.NewWC(ctx, "data category not supported"). + With("category", category) + } + } + + return ec, el.Failure() +} diff --git a/src/internal/m365/service/exchange/export_test.go b/src/internal/m365/service/exchange/export_test.go new file mode 100644 index 000000000..cd7013008 --- /dev/null +++ b/src/internal/m365/service/exchange/export_test.go @@ -0,0 +1,429 @@ +package exchange + +import ( + "bytes" + "io" + "testing" + + "github.com/alcionai/clues" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/converters/eml/testdata" + "github.com/alcionai/corso/src/internal/data" + dataMock "github.com/alcionai/corso/src/internal/data/mock" + "github.com/alcionai/corso/src/internal/m365/collection/exchange" + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/internal/version" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/export" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/path" +) + +type ExportUnitSuite struct { + tester.Suite +} + +func TestExportUnitSuite(t *testing.T) { + suite.Run(t, &ExportUnitSuite{Suite: tester.NewUnitSuite(t)}) +} + +func (suite *ExportUnitSuite) TestGetItems() { + emailBodyBytes := []byte(testdata.EmailWithAttachments) + + table := []struct { + name string + version int + backingCollection data.RestoreCollection + expectedItems []export.Item + }{ + { + name: "single item", + version: 1, + backingCollection: data.NoFetchRestoreCollection{ + Collection: dataMock.Collection{ + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "id1", + Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + }, + expectedItems: []export.Item{ + { + ID: "id1", + Name: "id1.eml", + Body: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + { + name: "multiple items", + version: 1, + backingCollection: data.NoFetchRestoreCollection{ + Collection: dataMock.Collection{ + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "id1", + Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + &dataMock.Item{ + ItemID: "id2", + Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + }, + expectedItems: []export.Item{ + { + ID: "id1", + Name: "id1.eml", + Body: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + { + ID: "id2", + Name: "id2.eml", + Body: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + { + name: "items with success and fetch error", + version: version.Groups9Update, + backingCollection: data.FetchRestoreCollection{ + Collection: dataMock.Collection{ + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "id0", + Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + &dataMock.Item{ + ItemID: "id1", + ReadErr: assert.AnError, + }, + &dataMock.Item{ + ItemID: "id2", + Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + }, + expectedItems: []export.Item{ + { + ID: "id0", + Name: "id0.eml", + Body: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + { + ID: "id2", + Name: "id2.eml", + Body: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + { + ID: "", + Error: assert.AnError, + }, + }, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + stats := data.ExportStats{} + ec := exchange.NewExportCollection( + "", + []data.RestoreCollection{test.backingCollection}, + test.version, + &stats) + + items := ec.Items(ctx) + + count := 0 + size := 0 + fitems := []export.Item{} + + for item := range items { + if item.Error == nil { + count++ + } + + if item.Body != nil { + b, err := io.ReadAll(item.Body) + assert.NoError(t, err, clues.ToCore(err)) + + size += len(b) + item.Body = io.NopCloser(bytes.NewBuffer(b)) + } + + fitems = append(fitems, item) + } + + assert.Len(t, fitems, len(test.expectedItems), "num of items") + + // We do not have any grantees about the ordering of the + // items in the SDK, but leaving the test this way for now + // to simplify testing. + for i, item := range fitems { + assert.Equal(t, test.expectedItems[i].ID, item.ID, "id") + assert.Equal(t, test.expectedItems[i].Name, item.Name, "name") + assert.ErrorIs(t, item.Error, test.expectedItems[i].Error) + } + + var expectedStats data.ExportStats + + if size+count > 0 { // it is only initialized if we have something + expectedStats = data.ExportStats{} + expectedStats.UpdateBytes(path.EmailCategory, int64(size)) + + for i := 0; i < count; i++ { + expectedStats.UpdateResourceCount(path.EmailCategory) + } + } + + assert.Equal(t, expectedStats, stats, "stats") + }) + } +} + +func (suite *ExportUnitSuite) TestExportRestoreCollections() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + emailBodyBytes := []byte(testdata.EmailWithAttachments) + + pb := path.Builder{}.Append("Inbox") + p, err := pb.ToDataLayerPath("t", "r", path.ExchangeService, path.EmailCategory, false) + assert.NoError(t, err, "build path") + + p2, err := pb.ToDataLayerPath("t", "r", path.OneDriveService, path.FilesCategory, false) + assert.NoError(t, err, "build path") + + tests := []struct { + name string + dcs []data.RestoreCollection + expectedItems [][]export.Item + hasErr bool + }{ + { + name: "single item", + dcs: []data.RestoreCollection{ + data.FetchRestoreCollection{ + Collection: dataMock.Collection{ + Path: p, + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "id1", + Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + }, + }, + expectedItems: [][]export.Item{ + { + { + ID: "id1", + Name: "id1.eml", + Body: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + }, + { + name: "multiple items", + dcs: []data.RestoreCollection{ + data.FetchRestoreCollection{ + Collection: dataMock.Collection{ + Path: p, + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "id1", + Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + &dataMock.Item{ + ItemID: "id2", + Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + }, + }, + expectedItems: [][]export.Item{ + { + { + ID: "id1", + Name: "id1.eml", + Body: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + { + ID: "id2", + Name: "id2.eml", + Body: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + }, + { + name: "items with success and fetch error", + dcs: []data.RestoreCollection{ + data.FetchRestoreCollection{ + Collection: dataMock.Collection{ + Path: p, + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "id1", + ReadErr: assert.AnError, + }, + &dataMock.Item{ + ItemID: "id2", + Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + }, + }, + expectedItems: [][]export.Item{ + { + { + ID: "id2", + Name: "id2.eml", + Body: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + { + ID: "", + Error: assert.AnError, + }, + }, + }, + }, + { + name: "multiple collections", + dcs: []data.RestoreCollection{ + data.FetchRestoreCollection{ + Collection: dataMock.Collection{ + Path: p, + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "id1", + Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + }, + data.FetchRestoreCollection{ + Collection: dataMock.Collection{ + Path: p, + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "id2", + Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + }, + }, + expectedItems: [][]export.Item{ + { + { + ID: "id1", + Name: "id1.eml", + Body: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + { + { + ID: "id2", + Name: "id2.eml", + Body: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + }, + { + name: "collection without exchange category", + dcs: []data.RestoreCollection{ + data.FetchRestoreCollection{ + Collection: dataMock.Collection{ + Path: p2, + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "id1", + Reader: io.NopCloser(bytes.NewReader(emailBodyBytes)), + }, + }, + }, + }, + }, + expectedItems: [][]export.Item{}, + hasErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + exportCfg := control.ExportConfig{} + stats := data.ExportStats{} + + ecs, err := NewExchangeHandler(control.DefaultOptions()). + ProduceExportCollections( + ctx, + int(version.Backup), + exportCfg, + tt.dcs, + &stats, + fault.New(true)) + + if tt.hasErr { + assert.Error(t, err, "export collections error") + return + } + + assert.NoError(t, err, "export collections error") + assert.Len(t, ecs, len(tt.expectedItems), "num of collections") + + expectedStats := data.ExportStats{} + + // We are dependent on the order the collections are + // returned in the test which is not necessary for the + // correctness out the output. + for c := range ecs { + i := -1 + for item := range ecs[c].Items(ctx) { + i++ + + size := 0 + + if item.Body == nil { + assert.ErrorIs(t, item.Error, tt.expectedItems[c][i].Error) + continue + } + + // unwrap the body from stats reader + b, err := io.ReadAll(item.Body) + assert.NoError(t, err, clues.ToCore(err)) + + size += len(b) + + expectedStats.UpdateBytes(path.EmailCategory, int64(size)) + expectedStats.UpdateResourceCount(path.EmailCategory) + + assert.Equal(t, tt.expectedItems[c][i].ID, item.ID, "id") + assert.Equal(t, tt.expectedItems[c][i].Name, item.Name, "name") + assert.NoError(t, item.Error, "error") + + } + } + + assert.Equal(t, expectedStats, stats, "stats") + }) + } +}