Add group mailbox exports
This commit is contained in:
parent
ec729fb509
commit
0199c9ad68
@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Update repo init configuration to reduce the total number of GET requests sent
|
||||
to the object store when using corso. This affects repos that have many
|
||||
backups created in them per day the most.
|
||||
- Group mailbox emails can now be exported as `.eml` files.
|
||||
|
||||
### Fixed
|
||||
- Retry transient 400 "invalidRequest" errors during onedrive & sharepoint backup.
|
||||
|
||||
@ -7,7 +7,6 @@ import (
|
||||
"github.com/alcionai/corso/src/cli/flags"
|
||||
"github.com/alcionai/corso/src/cli/utils"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
)
|
||||
|
||||
// called by export.go to map subcommands to provider-specific handling.
|
||||
@ -93,10 +92,6 @@ func exportGroupsCmd(cmd *cobra.Command, args []string) error {
|
||||
sel := utils.IncludeGroupsRestoreDataSelectors(ctx, opts)
|
||||
utils.FilterGroupsRestoreInfoSelectors(sel, opts)
|
||||
|
||||
// TODO(pandeyabs): Exclude conversations from export since they are not
|
||||
// supported yet. https://github.com/alcionai/corso/issues/4822
|
||||
sel.Exclude(sel.Conversation(selectors.Any()))
|
||||
|
||||
acceptedGroupsFormatTypes := []string{
|
||||
string(control.DefaultFormat),
|
||||
string(control.JSONFormat),
|
||||
|
||||
@ -5,19 +5,24 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
"github.com/alcionai/corso/src/internal/converters/eml"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
groupMeta "github.com/alcionai/corso/src/internal/m365/collection/groups/metadata"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api/graph/metadata"
|
||||
)
|
||||
|
||||
func NewExportCollection(
|
||||
@ -26,7 +31,13 @@ func NewExportCollection(
|
||||
backupVersion int,
|
||||
cec control.ExportConfig,
|
||||
stats *metrics.ExportStats,
|
||||
cat path.CategoryType,
|
||||
) export.Collectioner {
|
||||
streamItems := streamChannelMessages
|
||||
if cat == path.ConversationPostsCategory {
|
||||
streamItems = streamConversationPosts
|
||||
}
|
||||
|
||||
return export.BaseCollection{
|
||||
BaseDir: baseDir,
|
||||
BackingCollection: backingCollections,
|
||||
@ -37,8 +48,12 @@ func NewExportCollection(
|
||||
}
|
||||
}
|
||||
|
||||
// streamItems streams the items in the backingCollection into the export stream chan
|
||||
func streamItems(
|
||||
//-------------------------------------------------------------
|
||||
// Channel Messages
|
||||
//-------------------------------------------------------------
|
||||
|
||||
// streamChannelMessages streams the items in the backingCollection into the export stream chan
|
||||
func streamChannelMessages(
|
||||
ctx context.Context,
|
||||
drc []data.RestoreCollection,
|
||||
backupVersion int,
|
||||
@ -198,3 +213,146 @@ func makeMinimumChannelMesasge(item models.ChatMessageable) minimumChannelMessag
|
||||
Subject: ptr.Val(item.GetSubject()),
|
||||
}
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------
|
||||
// Conversation Posts
|
||||
//-------------------------------------------------------------
|
||||
|
||||
// streamConversationPosts adds the post items into the export stream channel.
|
||||
func streamConversationPosts(
|
||||
ctx context.Context,
|
||||
drc []data.RestoreCollection,
|
||||
backupVersion int,
|
||||
cec control.ExportConfig,
|
||||
ch chan<- export.Item,
|
||||
stats *metrics.ExportStats,
|
||||
) {
|
||||
defer close(ch)
|
||||
|
||||
errs := fault.New(false)
|
||||
|
||||
for _, rc := range drc {
|
||||
ictx := clues.Add(ctx, "path_short_ref", rc.FullPath().ShortRef())
|
||||
|
||||
for item := range rc.Items(ctx, errs) {
|
||||
itemCtx := clues.Add(ictx, "stream_item_id", item.ID())
|
||||
|
||||
// Trim .data suffix from itemID. Also, we don't expect .meta files
|
||||
// here since details are not persisted for metadata files.
|
||||
trimmedID := strings.TrimSuffix(item.ID(), metadata.DataFileSuffix)
|
||||
exportName := trimmedID + ".eml"
|
||||
|
||||
postMetadata, err := fetchAndReadMetadata(itemCtx, trimmedID, rc)
|
||||
if err != nil {
|
||||
ch <- export.Item{
|
||||
ID: item.ID(),
|
||||
Error: err,
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
reader := item.ToReader()
|
||||
content, err := io.ReadAll(reader)
|
||||
|
||||
reader.Close()
|
||||
|
||||
if err != nil {
|
||||
ch <- export.Item{
|
||||
ID: item.ID(),
|
||||
Error: err,
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// Convert JSON to eml.
|
||||
email, err := eml.FromJSONPostToEML(itemCtx, content, postMetadata)
|
||||
if err != nil {
|
||||
err = clues.Wrap(err, "converting JSON to eml")
|
||||
|
||||
logger.CtxErr(itemCtx, err).Info("processing collection item")
|
||||
|
||||
ch <- export.Item{
|
||||
ID: item.ID(),
|
||||
Error: err,
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
emlReader := io.NopCloser(bytes.NewReader([]byte(email)))
|
||||
|
||||
stats.UpdateResourceCount(path.ConversationPostsCategory)
|
||||
body := metrics.ReaderWithStats(emlReader, path.ConversationPostsCategory, stats)
|
||||
|
||||
ch <- export.Item{
|
||||
ID: item.ID(),
|
||||
Name: exportName,
|
||||
Body: body,
|
||||
}
|
||||
}
|
||||
|
||||
items, recovered := errs.ItemsAndRecovered()
|
||||
|
||||
// Return all the items that we failed to source from the persistence layer
|
||||
for _, item := range items {
|
||||
ch <- export.Item{
|
||||
ID: item.ID,
|
||||
Error: &item,
|
||||
}
|
||||
}
|
||||
|
||||
for _, err := range recovered {
|
||||
ch <- export.Item{
|
||||
Error: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func fetchAndReadMetadata(
|
||||
ctx context.Context,
|
||||
itemID string,
|
||||
fin data.FetchItemByNamer,
|
||||
) (groupMeta.ConversationPostMetadata, error) {
|
||||
metaName := itemID + metadata.MetaFileSuffix
|
||||
|
||||
ictx := clues.Add(ctx, "meta_file_name", metaName)
|
||||
|
||||
meta, err := fin.FetchItemByName(ictx, metaName)
|
||||
if err != nil {
|
||||
return groupMeta.ConversationPostMetadata{},
|
||||
clues.WrapWC(ctx, err, "fetching metadata")
|
||||
}
|
||||
|
||||
metaReader := meta.ToReader()
|
||||
defer metaReader.Close()
|
||||
|
||||
metaFormatted, err := readMetadata(metaReader)
|
||||
if err != nil {
|
||||
return groupMeta.ConversationPostMetadata{},
|
||||
clues.WrapWC(ctx, err, "deserializing metadata")
|
||||
}
|
||||
|
||||
return metaFormatted, nil
|
||||
}
|
||||
|
||||
// getMetadata reads and parses the metadata info for an item
|
||||
func readMetadata(metaRC io.ReadCloser) (groupMeta.ConversationPostMetadata, error) {
|
||||
var meta groupMeta.ConversationPostMetadata
|
||||
// `metaRC` will be nil for the top level container folder
|
||||
if metaRC != nil {
|
||||
metaraw, err := io.ReadAll(metaRC)
|
||||
if err != nil {
|
||||
return groupMeta.ConversationPostMetadata{}, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(metaraw, &meta)
|
||||
if err != nil {
|
||||
return groupMeta.ConversationPostMetadata{}, err
|
||||
}
|
||||
}
|
||||
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
@ -16,6 +17,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
type ExportUnitSuite struct {
|
||||
@ -26,7 +28,7 @@ func TestExportUnitSuite(t *testing.T) {
|
||||
suite.Run(t, &ExportUnitSuite{Suite: tester.NewUnitSuite(t)})
|
||||
}
|
||||
|
||||
func (suite *ExportUnitSuite) TestStreamItems() {
|
||||
func (suite *ExportUnitSuite) TestStreamChannelMessages() {
|
||||
makeBody := func() io.ReadCloser {
|
||||
return io.NopCloser(bytes.NewReader([]byte("{}")))
|
||||
}
|
||||
@ -86,7 +88,7 @@ func (suite *ExportUnitSuite) TestStreamItems() {
|
||||
|
||||
ch := make(chan export.Item)
|
||||
|
||||
go streamItems(
|
||||
go streamChannelMessages(
|
||||
ctx,
|
||||
[]data.RestoreCollection{test.backingColl},
|
||||
version.NoBackup,
|
||||
@ -113,3 +115,164 @@ func (suite *ExportUnitSuite) TestStreamItems() {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *ExportUnitSuite) TestStreamConversationPosts() {
|
||||
testPath, err := path.Build(
|
||||
"t",
|
||||
"g",
|
||||
path.GroupsService,
|
||||
path.ConversationPostsCategory,
|
||||
true,
|
||||
"convID",
|
||||
"threadID")
|
||||
require.NoError(suite.T(), err, clues.ToCore(err))
|
||||
|
||||
makeBody := func() io.ReadCloser {
|
||||
rc := io.NopCloser(bytes.NewReader([]byte("{}")))
|
||||
|
||||
return metrics.ReaderWithStats(
|
||||
rc,
|
||||
path.ConversationPostsCategory,
|
||||
&metrics.ExportStats{})
|
||||
}
|
||||
|
||||
makeMeta := func() io.ReadCloser {
|
||||
return io.NopCloser(
|
||||
bytes.NewReader([]byte(`{"topic":"t", "recipients":["em@il"]}`)))
|
||||
}
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
backingColl dataMock.Collection
|
||||
expectItem export.Item
|
||||
expectErr assert.ErrorAssertionFunc
|
||||
}{
|
||||
{
|
||||
name: "no errors",
|
||||
backingColl: dataMock.Collection{
|
||||
ItemData: []data.Item{
|
||||
&dataMock.Item{
|
||||
ItemID: "zim.data",
|
||||
Reader: makeBody(),
|
||||
},
|
||||
},
|
||||
Path: testPath,
|
||||
AuxItems: map[string]data.Item{
|
||||
"zim.meta": &dataMock.Item{
|
||||
ItemID: "zim.meta",
|
||||
Reader: makeMeta(),
|
||||
},
|
||||
},
|
||||
},
|
||||
expectItem: export.Item{
|
||||
ID: "zim.data",
|
||||
Name: "zim.eml",
|
||||
Body: makeBody(),
|
||||
},
|
||||
expectErr: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "only recoverable errors",
|
||||
backingColl: dataMock.Collection{
|
||||
ItemsRecoverableErrs: []error{
|
||||
clues.New("The knowledge... it fills me! It is neat!"),
|
||||
},
|
||||
Path: testPath,
|
||||
},
|
||||
expectErr: assert.Error,
|
||||
},
|
||||
{
|
||||
name: "items and recoverable errors",
|
||||
backingColl: dataMock.Collection{
|
||||
ItemData: []data.Item{
|
||||
&dataMock.Item{
|
||||
ItemID: "gir.data",
|
||||
Reader: makeBody(),
|
||||
},
|
||||
},
|
||||
ItemsRecoverableErrs: []error{
|
||||
clues.New("I miss my cupcake."),
|
||||
},
|
||||
Path: testPath,
|
||||
AuxItems: map[string]data.Item{
|
||||
"gir.meta": &dataMock.Item{
|
||||
ItemID: "gir.meta",
|
||||
Reader: makeMeta(),
|
||||
},
|
||||
},
|
||||
},
|
||||
expectItem: export.Item{
|
||||
ID: "gir.data",
|
||||
Name: "gir.eml",
|
||||
Body: makeBody(),
|
||||
},
|
||||
expectErr: assert.Error,
|
||||
},
|
||||
{
|
||||
name: "missing metadata",
|
||||
backingColl: dataMock.Collection{
|
||||
ItemData: []data.Item{
|
||||
&dataMock.Item{
|
||||
ItemID: "mir.data",
|
||||
Reader: makeBody(),
|
||||
},
|
||||
},
|
||||
Path: testPath,
|
||||
},
|
||||
expectItem: export.Item{
|
||||
ID: "mir.data",
|
||||
Name: "mir.eml",
|
||||
Error: assert.AnError,
|
||||
},
|
||||
expectErr: assert.Error,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range table {
|
||||
suite.Run(test.name, func() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
ch := make(chan export.Item)
|
||||
|
||||
go streamConversationPosts(
|
||||
ctx,
|
||||
[]data.RestoreCollection{test.backingColl},
|
||||
version.NoBackup,
|
||||
control.DefaultExportConfig(),
|
||||
ch,
|
||||
&metrics.ExportStats{})
|
||||
|
||||
var (
|
||||
itm export.Item
|
||||
err error
|
||||
)
|
||||
|
||||
for i := range ch {
|
||||
if i.Error == nil {
|
||||
itm = i
|
||||
} else {
|
||||
err = i.Error
|
||||
}
|
||||
}
|
||||
|
||||
test.expectErr(t, err, clues.ToCore(err))
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
assert.Equal(t, test.expectItem.ID, itm.ID, "item ID")
|
||||
assert.Equal(t, test.expectItem.Name, itm.Name, "item name")
|
||||
|
||||
assert.NotNil(t, itm.Body, "body")
|
||||
|
||||
_, err = io.ReadAll(itm.Body)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
itm.Body.Close()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -82,7 +82,7 @@ func (h *baseGroupsHandler) ProduceExportCollections(
|
||||
)
|
||||
|
||||
switch cat {
|
||||
case path.ChannelMessagesCategory:
|
||||
case path.ChannelMessagesCategory, path.ConversationPostsCategory:
|
||||
folders = append(folders, fp.Folders()...)
|
||||
|
||||
coll = groups.NewExportCollection(
|
||||
@ -90,7 +90,8 @@ func (h *baseGroupsHandler) ProduceExportCollections(
|
||||
[]data.RestoreCollection{restoreColl},
|
||||
backupVersion,
|
||||
exportCfg,
|
||||
stats)
|
||||
stats,
|
||||
cat)
|
||||
|
||||
case path.LibrariesCategory:
|
||||
drivePath, err := path.ToDrivePath(restoreColl.FullPath())
|
||||
|
||||
@ -143,6 +143,7 @@ func makeRestorePathsForEntry(
|
||||
switch true {
|
||||
case ent.Exchange != nil ||
|
||||
(ent.Groups != nil && ent.Groups.ItemType == details.GroupsChannelMessage) ||
|
||||
(ent.Groups != nil && ent.Groups.ItemType == details.GroupsConversationPost) ||
|
||||
(ent.SharePoint != nil && ent.SharePoint.ItemType == details.SharePointList):
|
||||
// TODO(ashmrtn): Eventually make Events have it's own function to handle
|
||||
// setting the restore destination properly.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user