Add group mailbox export (#5153)

<!-- PR description-->

* Add EML exports for group mailbox.
* Tested E2E manually along with unit tests added in this PR.
* Will follow it up with a sanity test PR.
---

#### Does this PR need a docs update or release note?

- [x]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [ ]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [x] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abhishek Pandey 2024-01-30 21:14:59 -08:00 committed by GitHub
parent 1537db59c4
commit 7e2b9dab62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 418 additions and 13 deletions

View File

@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Update repo init configuration to reduce the total number of GET requests sent - Update repo init configuration to reduce the total number of GET requests sent
to the object store when using corso. This affects repos that have many to the object store when using corso. This affects repos that have many
backups created in them per day the most. backups created in them per day the most.
- Group mailbox emails can now be exported as `.eml` files.
### Fixed ### Fixed
- Retry transient 400 "invalidRequest" errors during onedrive & sharepoint backup. - Retry transient 400 "invalidRequest" errors during onedrive & sharepoint backup.

View File

@ -7,7 +7,6 @@ import (
"github.com/alcionai/corso/src/cli/flags" "github.com/alcionai/corso/src/cli/flags"
"github.com/alcionai/corso/src/cli/utils" "github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/selectors"
) )
// called by export.go to map subcommands to provider-specific handling. // called by export.go to map subcommands to provider-specific handling.
@ -93,10 +92,6 @@ func exportGroupsCmd(cmd *cobra.Command, args []string) error {
sel := utils.IncludeGroupsRestoreDataSelectors(ctx, opts) sel := utils.IncludeGroupsRestoreDataSelectors(ctx, opts)
utils.FilterGroupsRestoreInfoSelectors(sel, opts) utils.FilterGroupsRestoreInfoSelectors(sel, opts)
// TODO(pandeyabs): Exclude conversations from export since they are not
// supported yet. https://github.com/alcionai/corso/issues/4822
sel.Exclude(sel.Conversation(selectors.Any()))
acceptedGroupsFormatTypes := []string{ acceptedGroupsFormatTypes := []string{
string(control.DefaultFormat), string(control.DefaultFormat),
string(control.JSONFormat), string(control.JSONFormat),

View File

@ -5,19 +5,24 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"io" "io"
"strings"
"time" "time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/converters/eml"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
groupMeta "github.com/alcionai/corso/src/internal/m365/collection/groups/metadata"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/metrics"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph/metadata"
) )
func NewExportCollection( func NewExportCollection(
@ -26,7 +31,19 @@ func NewExportCollection(
backupVersion int, backupVersion int,
cec control.ExportConfig, cec control.ExportConfig,
stats *metrics.ExportStats, stats *metrics.ExportStats,
cat path.CategoryType,
) export.Collectioner { ) export.Collectioner {
var streamItems export.ItemStreamer
switch cat {
case path.ChannelMessagesCategory:
streamItems = streamChannelMessages
case path.ConversationPostsCategory:
streamItems = streamConversationPosts
default:
return nil
}
return export.BaseCollection{ return export.BaseCollection{
BaseDir: baseDir, BaseDir: baseDir,
BackingCollection: backingCollections, BackingCollection: backingCollections,
@ -37,8 +54,12 @@ func NewExportCollection(
} }
} }
// streamItems streams the items in the backingCollection into the export stream chan //-------------------------------------------------------------
func streamItems( // Channel Messages
//-------------------------------------------------------------
// streamChannelMessages streams the items in the backingCollection into the export stream chan
func streamChannelMessages(
ctx context.Context, ctx context.Context,
drc []data.RestoreCollection, drc []data.RestoreCollection,
backupVersion int, backupVersion int,
@ -198,3 +219,145 @@ func makeMinimumChannelMesasge(item models.ChatMessageable) minimumChannelMessag
Subject: ptr.Val(item.GetSubject()), Subject: ptr.Val(item.GetSubject()),
} }
} }
//-------------------------------------------------------------
// Conversation Posts
//-------------------------------------------------------------
// streamConversationPosts adds the post items into the export stream channel.
func streamConversationPosts(
ctx context.Context,
drc []data.RestoreCollection,
backupVersion int,
cec control.ExportConfig,
ch chan<- export.Item,
stats *metrics.ExportStats,
) {
defer close(ch)
errs := fault.New(false)
for _, rc := range drc {
for item := range rc.Items(ctx, errs) {
ictx := clues.Add(
ctx,
"path_short_ref", rc.FullPath().ShortRef(),
"stream_item_id", item.ID())
// Trim .data suffix from itemID. Also, we don't expect .meta files
// here since details are not persisted for metadata files.
trimmedID := strings.TrimSuffix(item.ID(), metadata.DataFileSuffix)
exportName := trimmedID + ".eml"
postMetadata, err := fetchAndReadMetadata(ictx, trimmedID, rc)
if err != nil {
ch <- export.Item{
ID: item.ID(),
Error: err,
}
continue
}
reader := item.ToReader()
content, err := io.ReadAll(reader)
reader.Close()
if err != nil {
ch <- export.Item{
ID: item.ID(),
Error: err,
}
continue
}
// Convert JSON to eml.
email, err := eml.FromJSONPostToEML(ictx, content, postMetadata)
if err != nil {
err = clues.Wrap(err, "converting JSON to eml")
logger.CtxErr(ictx, err).Info("processing collection item")
ch <- export.Item{
ID: item.ID(),
Error: err,
}
continue
}
emlReader := io.NopCloser(bytes.NewReader([]byte(email)))
stats.UpdateResourceCount(path.ConversationPostsCategory)
body := metrics.ReaderWithStats(emlReader, path.ConversationPostsCategory, stats)
ch <- export.Item{
ID: item.ID(),
Name: exportName,
Body: body,
}
}
items, recovered := errs.ItemsAndRecovered()
// Return all the items that we failed to source from the persistence layer
for _, item := range items {
ch <- export.Item{
ID: item.ID,
Error: &item,
}
}
for _, err := range recovered {
ch <- export.Item{
Error: err,
}
}
}
}
func fetchAndReadMetadata(
ctx context.Context,
itemID string,
fin data.FetchItemByNamer,
) (groupMeta.ConversationPostMetadata, error) {
metaName := itemID + metadata.MetaFileSuffix
ctx = clues.Add(ctx, "meta_file_name", metaName)
meta, err := fin.FetchItemByName(ctx, metaName)
if err != nil {
return groupMeta.ConversationPostMetadata{},
clues.WrapWC(ctx, err, "fetching metadata")
}
metaReader := meta.ToReader()
defer metaReader.Close()
metaFormatted, err := readMetadata(metaReader)
if err != nil {
return groupMeta.ConversationPostMetadata{},
clues.WrapWC(ctx, err, "deserializing metadata")
}
return metaFormatted, nil
}
// getMetadata reads and parses the metadata info for an item
func readMetadata(metaRC io.ReadCloser) (groupMeta.ConversationPostMetadata, error) {
var meta groupMeta.ConversationPostMetadata
metaraw, err := io.ReadAll(metaRC)
if err != nil {
return groupMeta.ConversationPostMetadata{}, err
}
err = json.Unmarshal(metaraw, &meta)
if err != nil {
return groupMeta.ConversationPostMetadata{}, err
}
return meta, nil
}

View File

@ -7,6 +7,7 @@ import (
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
@ -16,6 +17,7 @@ import (
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/metrics"
"github.com/alcionai/corso/src/pkg/path"
) )
type ExportUnitSuite struct { type ExportUnitSuite struct {
@ -26,7 +28,7 @@ func TestExportUnitSuite(t *testing.T) {
suite.Run(t, &ExportUnitSuite{Suite: tester.NewUnitSuite(t)}) suite.Run(t, &ExportUnitSuite{Suite: tester.NewUnitSuite(t)})
} }
func (suite *ExportUnitSuite) TestStreamItems() { func (suite *ExportUnitSuite) TestStreamChannelMessages() {
makeBody := func() io.ReadCloser { makeBody := func() io.ReadCloser {
return io.NopCloser(bytes.NewReader([]byte("{}"))) return io.NopCloser(bytes.NewReader([]byte("{}")))
} }
@ -86,7 +88,7 @@ func (suite *ExportUnitSuite) TestStreamItems() {
ch := make(chan export.Item) ch := make(chan export.Item)
go streamItems( go streamChannelMessages(
ctx, ctx,
[]data.RestoreCollection{test.backingColl}, []data.RestoreCollection{test.backingColl},
version.NoBackup, version.NoBackup,
@ -113,3 +115,162 @@ func (suite *ExportUnitSuite) TestStreamItems() {
}) })
} }
} }
func (suite *ExportUnitSuite) TestStreamConversationPosts() {
testPath, err := path.Build(
"t",
"g",
path.GroupsService,
path.ConversationPostsCategory,
true,
"convID",
"threadID")
require.NoError(suite.T(), err, clues.ToCore(err))
makeBody := func() io.ReadCloser {
rc := io.NopCloser(bytes.NewReader([]byte("{}")))
return metrics.ReaderWithStats(
rc,
path.ConversationPostsCategory,
&metrics.ExportStats{})
}
makeMeta := func() io.ReadCloser {
return io.NopCloser(
bytes.NewReader([]byte(`{"topic":"t", "recipients":["em@il"]}`)))
}
table := []struct {
name string
backingColl dataMock.Collection
expectItem export.Item
expectErr assert.ErrorAssertionFunc
}{
{
name: "no errors",
backingColl: dataMock.Collection{
ItemData: []data.Item{
&dataMock.Item{
ItemID: "zim.data",
Reader: makeBody(),
},
},
Path: testPath,
AuxItems: map[string]data.Item{
"zim.meta": &dataMock.Item{
ItemID: "zim.meta",
Reader: makeMeta(),
},
},
},
expectItem: export.Item{
ID: "zim.data",
Name: "zim.eml",
Body: makeBody(),
},
expectErr: assert.NoError,
},
{
name: "only recoverable errors",
backingColl: dataMock.Collection{
ItemsRecoverableErrs: []error{
clues.New("The knowledge... it fills me! It is neat!"),
},
Path: testPath,
},
expectErr: assert.Error,
},
{
name: "items and recoverable errors",
backingColl: dataMock.Collection{
ItemData: []data.Item{
&dataMock.Item{
ItemID: "gir.data",
Reader: makeBody(),
},
},
ItemsRecoverableErrs: []error{
clues.New("I miss my cupcake."),
},
Path: testPath,
AuxItems: map[string]data.Item{
"gir.meta": &dataMock.Item{
ItemID: "gir.meta",
Reader: makeMeta(),
},
},
},
expectItem: export.Item{
ID: "gir.data",
Name: "gir.eml",
Body: makeBody(),
},
expectErr: assert.Error,
},
{
name: "missing metadata",
backingColl: dataMock.Collection{
ItemData: []data.Item{
&dataMock.Item{
ItemID: "mir.data",
Reader: makeBody(),
},
},
Path: testPath,
},
expectItem: export.Item{
ID: "mir.data",
Error: assert.AnError,
},
expectErr: assert.Error,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
ch := make(chan export.Item)
go streamConversationPosts(
ctx,
[]data.RestoreCollection{test.backingColl},
version.NoBackup,
control.DefaultExportConfig(),
ch,
&metrics.ExportStats{})
var (
itm export.Item
err error
)
for i := range ch {
if i.Error == nil {
itm = i
} else {
err = i.Error
}
}
test.expectErr(t, err, clues.ToCore(err))
if err != nil {
return
}
assert.Equal(t, test.expectItem.ID, itm.ID, "item ID")
assert.Equal(t, test.expectItem.Name, itm.Name, "item name")
assert.NotNil(t, itm.Body, "body")
_, err = io.ReadAll(itm.Body)
require.NoError(t, err, clues.ToCore(err))
itm.Body.Close()
})
}
}

View File

@ -82,7 +82,7 @@ func (h *baseGroupsHandler) ProduceExportCollections(
) )
switch cat { switch cat {
case path.ChannelMessagesCategory: case path.ChannelMessagesCategory, path.ConversationPostsCategory:
folders = append(folders, fp.Folders()...) folders = append(folders, fp.Folders()...)
coll = groups.NewExportCollection( coll = groups.NewExportCollection(
@ -90,7 +90,8 @@ func (h *baseGroupsHandler) ProduceExportCollections(
[]data.RestoreCollection{restoreColl}, []data.RestoreCollection{restoreColl},
backupVersion, backupVersion,
exportCfg, exportCfg,
stats) stats,
cat)
case path.LibrariesCategory: case path.LibrariesCategory:
drivePath, err := path.ToDrivePath(restoreColl.FullPath()) drivePath, err := path.ToDrivePath(restoreColl.FullPath())

View File

@ -238,3 +238,86 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_libraries() {
expectedStats.UpdateResourceCount(path.FilesCategory) expectedStats.UpdateResourceCount(path.FilesCategory)
assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats") assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats")
} }
func (suite *ExportUnitSuite) TestExportRestoreCollections_ConversationPosts() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
var (
itemID = "itemID"
containerName = "convID"
content = groupMock.PostWithAttachments
body = io.NopCloser(bytes.NewBufferString(content))
exportCfg = control.ExportConfig{}
expectedPath = path.ConversationPostsCategory.HumanString() + "/" + containerName
expectedItems = []export.Item{
{
ID: itemID + ".data",
Name: itemID + ".eml",
// Body: body, not checked
},
}
)
p, err := path.Build("t", "pr", path.GroupsService, path.ConversationPostsCategory, false, containerName)
assert.NoError(t, err, "build path")
dcs := []data.RestoreCollection{
data.FetchRestoreCollection{
Collection: dataMock.Collection{
Path: p,
ItemData: []data.Item{
&dataMock.Item{
ItemID: itemID + ".data",
Reader: body,
},
},
},
FetchItemByNamer: finD{
id: itemID + ".meta",
key: "topic", name: itemID + ".meta",
},
},
}
stats := metrics.NewExportStats()
ecs, err := NewGroupsHandler(api.Client{}, nil).
ProduceExportCollections(
ctx,
int(version.Backup),
exportCfg,
dcs,
stats,
fault.New(true))
assert.NoError(t, err, "export collections error")
assert.Len(t, ecs, 1, "num of collections")
assert.Equal(t, expectedPath, ecs[0].BasePath(), "base dir")
fitems := []export.Item{}
size := 0
for item := range ecs[0].Items(ctx) {
b, err := io.ReadAll(item.Body)
assert.NoError(t, err, clues.ToCore(err))
// count up size for tests
size += len(b)
// have to nil out body, otherwise assert fails due to
// pointer memory location differences
item.Body = nil
fitems = append(fitems, item)
}
assert.Equal(t, expectedItems, fitems, "items")
expectedStats := metrics.NewExportStats()
expectedStats.UpdateBytes(path.ConversationPostsCategory, int64(size))
expectedStats.UpdateResourceCount(path.ConversationPostsCategory)
assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats")
}

View File

@ -143,6 +143,7 @@ func makeRestorePathsForEntry(
switch true { switch true {
case ent.Exchange != nil || case ent.Exchange != nil ||
(ent.Groups != nil && ent.Groups.ItemType == details.GroupsChannelMessage) || (ent.Groups != nil && ent.Groups.ItemType == details.GroupsChannelMessage) ||
(ent.Groups != nil && ent.Groups.ItemType == details.GroupsConversationPost) ||
(ent.SharePoint != nil && ent.SharePoint.ItemType == details.SharePointList): (ent.SharePoint != nil && ent.SharePoint.ItemType == details.SharePointList):
// TODO(ashmrtn): Eventually make Events have it's own function to handle // TODO(ashmrtn): Eventually make Events have it's own function to handle
// setting the restore destination properly. // setting the restore destination properly.

View File

@ -24,7 +24,7 @@ type Collectioner interface {
Items(context.Context) <-chan Item Items(context.Context) <-chan Item
} }
type itemStreamer func( type ItemStreamer func(
ctx context.Context, ctx context.Context,
backingColls []data.RestoreCollection, backingColls []data.RestoreCollection,
backupVersion int, backupVersion int,
@ -46,7 +46,7 @@ type BaseCollection struct {
Cfg control.ExportConfig Cfg control.ExportConfig
Stream itemStreamer Stream ItemStreamer
Stats *metrics.ExportStats Stats *metrics.ExportStats
} }