From 0199c9ad684b97f945f822674e5cb7dc1decd594 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Sun, 28 Jan 2024 22:42:18 -0800 Subject: [PATCH] Add group mailbox exports --- CHANGELOG.md | 1 + src/cli/export/groups.go | 5 - src/internal/m365/collection/groups/export.go | 162 ++++++++++++++++- .../m365/collection/groups/export_test.go | 167 +++++++++++++++++- src/internal/m365/service/groups/export.go | 5 +- .../restore_path_transformer.go | 1 + 6 files changed, 330 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a0c4cc2cc..ac9429dba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Update repo init configuration to reduce the total number of GET requests sent to the object store when using corso. This affects repos that have many backups created in them per day the most. +- Group mailbox emails can now be exported as `.eml` files. ### Fixed - Retry transient 400 "invalidRequest" errors during onedrive & sharepoint backup. diff --git a/src/cli/export/groups.go b/src/cli/export/groups.go index 8d5a9d51c..13c820a20 100644 --- a/src/cli/export/groups.go +++ b/src/cli/export/groups.go @@ -7,7 +7,6 @@ import ( "github.com/alcionai/corso/src/cli/flags" "github.com/alcionai/corso/src/cli/utils" "github.com/alcionai/corso/src/pkg/control" - "github.com/alcionai/corso/src/pkg/selectors" ) // called by export.go to map subcommands to provider-specific handling. @@ -93,10 +92,6 @@ func exportGroupsCmd(cmd *cobra.Command, args []string) error { sel := utils.IncludeGroupsRestoreDataSelectors(ctx, opts) utils.FilterGroupsRestoreInfoSelectors(sel, opts) - // TODO(pandeyabs): Exclude conversations from export since they are not - // supported yet. https://github.com/alcionai/corso/issues/4822 - sel.Exclude(sel.Conversation(selectors.Any())) - acceptedGroupsFormatTypes := []string{ string(control.DefaultFormat), string(control.JSONFormat), diff --git a/src/internal/m365/collection/groups/export.go b/src/internal/m365/collection/groups/export.go index c7577296b..947042f9a 100644 --- a/src/internal/m365/collection/groups/export.go +++ b/src/internal/m365/collection/groups/export.go @@ -5,19 +5,24 @@ import ( "context" "encoding/json" "io" + "strings" "time" "github.com/alcionai/clues" "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/converters/eml" "github.com/alcionai/corso/src/internal/data" + groupMeta "github.com/alcionai/corso/src/internal/m365/collection/groups/metadata" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" + "github.com/alcionai/corso/src/pkg/services/m365/api/graph/metadata" ) func NewExportCollection( @@ -26,7 +31,13 @@ func NewExportCollection( backupVersion int, cec control.ExportConfig, stats *metrics.ExportStats, + cat path.CategoryType, ) export.Collectioner { + streamItems := streamChannelMessages + if cat == path.ConversationPostsCategory { + streamItems = streamConversationPosts + } + return export.BaseCollection{ BaseDir: baseDir, BackingCollection: backingCollections, @@ -37,8 +48,12 @@ func NewExportCollection( } } -// streamItems streams the items in the backingCollection into the export stream chan -func streamItems( +//------------------------------------------------------------- +// Channel Messages +//------------------------------------------------------------- + +// streamChannelMessages streams the items in the backingCollection into the export stream chan +func streamChannelMessages( ctx context.Context, drc []data.RestoreCollection, backupVersion int, @@ -198,3 +213,146 @@ func makeMinimumChannelMesasge(item models.ChatMessageable) minimumChannelMessag Subject: ptr.Val(item.GetSubject()), } } + +//------------------------------------------------------------- +// Conversation Posts +//------------------------------------------------------------- + +// streamConversationPosts adds the post items into the export stream channel. +func streamConversationPosts( + ctx context.Context, + drc []data.RestoreCollection, + backupVersion int, + cec control.ExportConfig, + ch chan<- export.Item, + stats *metrics.ExportStats, +) { + defer close(ch) + + errs := fault.New(false) + + for _, rc := range drc { + ictx := clues.Add(ctx, "path_short_ref", rc.FullPath().ShortRef()) + + for item := range rc.Items(ctx, errs) { + itemCtx := clues.Add(ictx, "stream_item_id", item.ID()) + + // Trim .data suffix from itemID. Also, we don't expect .meta files + // here since details are not persisted for metadata files. + trimmedID := strings.TrimSuffix(item.ID(), metadata.DataFileSuffix) + exportName := trimmedID + ".eml" + + postMetadata, err := fetchAndReadMetadata(itemCtx, trimmedID, rc) + if err != nil { + ch <- export.Item{ + ID: item.ID(), + Error: err, + } + + continue + } + + reader := item.ToReader() + content, err := io.ReadAll(reader) + + reader.Close() + + if err != nil { + ch <- export.Item{ + ID: item.ID(), + Error: err, + } + + continue + } + + // Convert JSON to eml. + email, err := eml.FromJSONPostToEML(itemCtx, content, postMetadata) + if err != nil { + err = clues.Wrap(err, "converting JSON to eml") + + logger.CtxErr(itemCtx, err).Info("processing collection item") + + ch <- export.Item{ + ID: item.ID(), + Error: err, + } + + continue + } + + emlReader := io.NopCloser(bytes.NewReader([]byte(email))) + + stats.UpdateResourceCount(path.ConversationPostsCategory) + body := metrics.ReaderWithStats(emlReader, path.ConversationPostsCategory, stats) + + ch <- export.Item{ + ID: item.ID(), + Name: exportName, + Body: body, + } + } + + items, recovered := errs.ItemsAndRecovered() + + // Return all the items that we failed to source from the persistence layer + for _, item := range items { + ch <- export.Item{ + ID: item.ID, + Error: &item, + } + } + + for _, err := range recovered { + ch <- export.Item{ + Error: err, + } + } + } +} + +func fetchAndReadMetadata( + ctx context.Context, + itemID string, + fin data.FetchItemByNamer, +) (groupMeta.ConversationPostMetadata, error) { + metaName := itemID + metadata.MetaFileSuffix + + ictx := clues.Add(ctx, "meta_file_name", metaName) + + meta, err := fin.FetchItemByName(ictx, metaName) + if err != nil { + return groupMeta.ConversationPostMetadata{}, + clues.WrapWC(ctx, err, "fetching metadata") + } + + metaReader := meta.ToReader() + defer metaReader.Close() + + metaFormatted, err := readMetadata(metaReader) + if err != nil { + return groupMeta.ConversationPostMetadata{}, + clues.WrapWC(ctx, err, "deserializing metadata") + } + + return metaFormatted, nil +} + +// getMetadata reads and parses the metadata info for an item +func readMetadata(metaRC io.ReadCloser) (groupMeta.ConversationPostMetadata, error) { + var meta groupMeta.ConversationPostMetadata + // `metaRC` will be nil for the top level container folder + if metaRC != nil { + metaraw, err := io.ReadAll(metaRC) + if err != nil { + return groupMeta.ConversationPostMetadata{}, err + } + + err = json.Unmarshal(metaraw, &meta) + if err != nil { + return groupMeta.ConversationPostMetadata{}, err + } + } + + return meta, nil +} diff --git a/src/internal/m365/collection/groups/export_test.go b/src/internal/m365/collection/groups/export_test.go index bc6247ba7..12d6e89c1 100644 --- a/src/internal/m365/collection/groups/export_test.go +++ b/src/internal/m365/collection/groups/export_test.go @@ -7,6 +7,7 @@ import ( "github.com/alcionai/clues" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/data" @@ -16,6 +17,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/metrics" + "github.com/alcionai/corso/src/pkg/path" ) type ExportUnitSuite struct { @@ -26,7 +28,7 @@ func TestExportUnitSuite(t *testing.T) { suite.Run(t, &ExportUnitSuite{Suite: tester.NewUnitSuite(t)}) } -func (suite *ExportUnitSuite) TestStreamItems() { +func (suite *ExportUnitSuite) TestStreamChannelMessages() { makeBody := func() io.ReadCloser { return io.NopCloser(bytes.NewReader([]byte("{}"))) } @@ -86,7 +88,7 @@ func (suite *ExportUnitSuite) TestStreamItems() { ch := make(chan export.Item) - go streamItems( + go streamChannelMessages( ctx, []data.RestoreCollection{test.backingColl}, version.NoBackup, @@ -113,3 +115,164 @@ func (suite *ExportUnitSuite) TestStreamItems() { }) } } + +func (suite *ExportUnitSuite) TestStreamConversationPosts() { + testPath, err := path.Build( + "t", + "g", + path.GroupsService, + path.ConversationPostsCategory, + true, + "convID", + "threadID") + require.NoError(suite.T(), err, clues.ToCore(err)) + + makeBody := func() io.ReadCloser { + rc := io.NopCloser(bytes.NewReader([]byte("{}"))) + + return metrics.ReaderWithStats( + rc, + path.ConversationPostsCategory, + &metrics.ExportStats{}) + } + + makeMeta := func() io.ReadCloser { + return io.NopCloser( + bytes.NewReader([]byte(`{"topic":"t", "recipients":["em@il"]}`))) + } + + table := []struct { + name string + backingColl dataMock.Collection + expectItem export.Item + expectErr assert.ErrorAssertionFunc + }{ + { + name: "no errors", + backingColl: dataMock.Collection{ + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "zim.data", + Reader: makeBody(), + }, + }, + Path: testPath, + AuxItems: map[string]data.Item{ + "zim.meta": &dataMock.Item{ + ItemID: "zim.meta", + Reader: makeMeta(), + }, + }, + }, + expectItem: export.Item{ + ID: "zim.data", + Name: "zim.eml", + Body: makeBody(), + }, + expectErr: assert.NoError, + }, + { + name: "only recoverable errors", + backingColl: dataMock.Collection{ + ItemsRecoverableErrs: []error{ + clues.New("The knowledge... it fills me! It is neat!"), + }, + Path: testPath, + }, + expectErr: assert.Error, + }, + { + name: "items and recoverable errors", + backingColl: dataMock.Collection{ + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "gir.data", + Reader: makeBody(), + }, + }, + ItemsRecoverableErrs: []error{ + clues.New("I miss my cupcake."), + }, + Path: testPath, + AuxItems: map[string]data.Item{ + "gir.meta": &dataMock.Item{ + ItemID: "gir.meta", + Reader: makeMeta(), + }, + }, + }, + expectItem: export.Item{ + ID: "gir.data", + Name: "gir.eml", + Body: makeBody(), + }, + expectErr: assert.Error, + }, + { + name: "missing metadata", + backingColl: dataMock.Collection{ + ItemData: []data.Item{ + &dataMock.Item{ + ItemID: "mir.data", + Reader: makeBody(), + }, + }, + Path: testPath, + }, + expectItem: export.Item{ + ID: "mir.data", + Name: "mir.eml", + Error: assert.AnError, + }, + expectErr: assert.Error, + }, + } + + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + ch := make(chan export.Item) + + go streamConversationPosts( + ctx, + []data.RestoreCollection{test.backingColl}, + version.NoBackup, + control.DefaultExportConfig(), + ch, + &metrics.ExportStats{}) + + var ( + itm export.Item + err error + ) + + for i := range ch { + if i.Error == nil { + itm = i + } else { + err = i.Error + } + } + + test.expectErr(t, err, clues.ToCore(err)) + + if err != nil { + return + } + + assert.Equal(t, test.expectItem.ID, itm.ID, "item ID") + assert.Equal(t, test.expectItem.Name, itm.Name, "item name") + + assert.NotNil(t, itm.Body, "body") + + _, err = io.ReadAll(itm.Body) + require.NoError(t, err, clues.ToCore(err)) + + itm.Body.Close() + }) + } +} diff --git a/src/internal/m365/service/groups/export.go b/src/internal/m365/service/groups/export.go index 7f667b8fc..15457baab 100644 --- a/src/internal/m365/service/groups/export.go +++ b/src/internal/m365/service/groups/export.go @@ -82,7 +82,7 @@ func (h *baseGroupsHandler) ProduceExportCollections( ) switch cat { - case path.ChannelMessagesCategory: + case path.ChannelMessagesCategory, path.ConversationPostsCategory: folders = append(folders, fp.Folders()...) coll = groups.NewExportCollection( @@ -90,7 +90,8 @@ func (h *baseGroupsHandler) ProduceExportCollections( []data.RestoreCollection{restoreColl}, backupVersion, exportCfg, - stats) + stats, + cat) case path.LibrariesCategory: drivePath, err := path.ToDrivePath(restoreColl.FullPath()) diff --git a/src/internal/operations/pathtransformer/restore_path_transformer.go b/src/internal/operations/pathtransformer/restore_path_transformer.go index 5cc794479..86e3a6137 100644 --- a/src/internal/operations/pathtransformer/restore_path_transformer.go +++ b/src/internal/operations/pathtransformer/restore_path_transformer.go @@ -143,6 +143,7 @@ func makeRestorePathsForEntry( switch true { case ent.Exchange != nil || (ent.Groups != nil && ent.Groups.ItemType == details.GroupsChannelMessage) || + (ent.Groups != nil && ent.Groups.ItemType == details.GroupsConversationPost) || (ent.SharePoint != nil && ent.SharePoint.ItemType == details.SharePointList): // TODO(ashmrtn): Eventually make Events have it's own function to handle // setting the restore destination properly.