Compare commits

...

14 Commits

Author SHA1 Message Date
Abhishek Pandey
f8033e6b70 Pull contents from metadata file 2024-01-26 20:39:16 -08:00
Abhishek Pandey
0b4ea2f213 json to eml 2024-01-26 20:39:16 -08:00
Abhishek Pandey
5ec4e1d21d Unrollback change 2024-01-26 20:39:16 -08:00
Abhishek Pandey
06a2028738 Adjust prior to rebase 2024-01-26 20:39:16 -08:00
Abhishek Pandey
d210ba475f Add conv export 2024-01-26 20:39:16 -08:00
Abhishek Pandey
152d78de21 Refactor metadata read code 2024-01-26 19:27:28 -08:00
Abhishek Pandey
eb6376e641 Rebase changes 2024-01-26 19:27:28 -08:00
Abhishek Pandey
9f68f379e7 Fix rebase failures 2024-01-26 19:27:28 -08:00
Abhishek Pandey
5230e60b24 Differentiate file suffixes between channel messages and conversations 2024-01-26 19:27:28 -08:00
Abhishek Pandey
7477c58698 Fix failing unit test 2024-01-26 19:25:21 -08:00
Abhishek Pandey
deaf8dcd95 Fix file name 2024-01-26 18:27:41 -08:00
Abhishek Pandey
3e2268e524 Move conv metadata definiton 2024-01-26 14:20:11 -08:00
Abhishek Pandey
abca65daa9 Add tests 2024-01-26 14:20:11 -08:00
Abhishek Pandey
41eb63686d Persist metadata files for group mailboxes 2024-01-26 14:20:11 -08:00
17 changed files with 780 additions and 34 deletions

View File

@ -7,7 +7,6 @@ import (
"github.com/alcionai/corso/src/cli/flags"
"github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/selectors"
)
// called by export.go to map subcommands to provider-specific handling.
@ -95,7 +94,7 @@ func exportGroupsCmd(cmd *cobra.Command, args []string) error {
// TODO(pandeyabs): Exclude conversations from export since they are not
// supported yet. https://github.com/alcionai/corso/issues/4822
sel.Exclude(sel.Conversation(selectors.Any()))
// sel.Exclude(sel.Conversation(selectors.Any()))
acceptedGroupsFormatTypes := []string{
string(control.DefaultFormat),

View File

@ -303,3 +303,138 @@ func FromJSON(ctx context.Context, body []byte) (string, error) {
return email.GetMessage(), nil
}
// FromJSON converts a Messageable (as json) to .eml format
func FromJSONPost(ctx context.Context, body []byte, topic string) (string, error) {
data, err := api.BytesToPostable(body)
if err != nil {
return "", clues.WrapWC(ctx, err, "converting to postable")
}
ctx = clues.Add(ctx, "item_id", ptr.Val(data.GetId()))
email := mail.NewMSG()
email.AllowDuplicateAddress = true // More "correct" conversion
email.AddBccToHeader = true // Don't ignore Bcc
email.AllowEmptyAttachments = true // Don't error on empty attachments
email.UseProvidedAddress = true // Don't try to parse the email address
if data.GetFrom() != nil {
email.SetFrom(formatAddress(data.GetFrom().GetEmailAddress()))
}
if data.GetReceivedDateTime() != nil {
email.SetDate(ptr.Val(data.GetReceivedDateTime()).Format(dateFormat))
}
// if data.GetSender() != nil {
// email.SetReplyTo(formatAddress(data.GetSender().GetEmailAddress()))
// }
// if data.GetCcRecipients() != nil {
// for _, recipient := range data.GetCcRecipients() {
// email.AddCc(formatAddress(recipient.GetEmailAddress()))
// }
// }
// if data.GetBccRecipients() != nil {
// for _, recipient := range data.GetBccRecipients() {
// email.AddBcc(formatAddress(recipient.GetEmailAddress()))
// }
// }
// if data.GetInReplyTo() != nil {
// rts := data.GetInReplyTo()
// email.SetReplyTo(formatAddress(rts.GetEmailAddress()))
// }
email.SetSubject(topic)
email.AddTo("dc_test@10rqc2.onmicrosoft.com")
// if data.GetSentDateTime() != nil {
// email.SetDate(ptr.Val(data.GetSentDateTime()).Format(dateFormat))
// }
if data.GetBody() != nil {
if data.GetBody().GetContentType() != nil {
var contentType mail.ContentType
switch data.GetBody().GetContentType().String() {
case "html":
contentType = mail.TextHTML
case "text":
contentType = mail.TextPlain
default:
// https://learn.microsoft.com/en-us/graph/api/resources/itembody?view=graph-rest-1.0#properties
// This should not be possible according to the documentation
logger.Ctx(ctx).
With("body_type", data.GetBody().GetContentType().String()).
Info("unknown body content type")
contentType = mail.TextPlain
}
email.SetBody(contentType, ptr.Val(data.GetBody().GetContent()))
}
}
if data.GetAttachments() != nil {
for _, attachment := range data.GetAttachments() {
kind := ptr.Val(attachment.GetContentType())
bytes, err := attachment.GetBackingStore().Get("contentBytes")
if err != nil {
return "", clues.WrapWC(ctx, err, "failed to get attachment bytes")
}
if bytes == nil {
// Some attachments have an "item" field instead of
// "contentBytes". There are items like contacts, emails
// or calendar events which will not be a normal format
// and will have to be converted to a text format.
// TODO(meain): Handle custom attachments
// https://github.com/alcionai/corso/issues/4772
logger.Ctx(ctx).
With("attachment_id", ptr.Val(attachment.GetId())).
Info("unhandled attachment type")
continue
}
bts, ok := bytes.([]byte)
if !ok {
return "", clues.WrapWC(ctx, err, "invalid content bytes")
}
name := ptr.Val(attachment.GetName())
contentID, err := attachment.GetBackingStore().Get("contentId")
if err != nil {
return "", clues.WrapWC(ctx, err, "getting content id for attachment")
}
if contentID != nil {
cids, _ := str.AnyToString(contentID)
if len(cids) > 0 {
name = cids
}
}
email.Attach(&mail.File{
// cannot use filename as inline attachment will not get mapped properly
Name: name,
MimeType: kind,
Data: bts,
Inline: ptr.Val(attachment.GetIsInline()),
})
}
}
if err = email.GetError(); err != nil {
return "", clues.WrapWC(ctx, err, "converting to eml")
}
return email.GetMessage(), nil
}

View File

@ -2,6 +2,7 @@ package groups
import (
"context"
"io"
"testing"
"time"
@ -60,6 +61,11 @@ func (bh mockBackupHandler) augmentItemInfo(
// no-op
}
//lint:ignore U1000 false linter issue due to generics
func (bh mockBackupHandler) supportsItemMetadata() bool {
return false
}
func (bh mockBackupHandler) canMakeDeltaQueries() bool {
return true
}
@ -136,6 +142,13 @@ func (bh mockBackupHandler) getItem(
return bh.messages[itemID], bh.info[itemID], bh.getMessageErr[itemID]
}
func (bh mockBackupHandler) getItemMetadata(
_ context.Context,
_ models.Channelable,
) (io.ReadCloser, int, error) {
return nil, 0, errMetadataFilesNotSupported
}
// ---------------------------------------------------------------------------
// Unit Suite
// ---------------------------------------------------------------------------

View File

@ -2,6 +2,7 @@ package groups
import (
"context"
"io"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
@ -105,6 +106,17 @@ func (bh channelsBackupHandler) getItem(
return bh.ac.GetChannelMessage(ctx, groupID, containerIDs[0], messageID)
}
// Channel messages don't carry metadata files. Return unsupported error.
// Adding this method for interface compliance.
//
//lint:ignore U1000 false linter issue due to generics
func (bh channelsBackupHandler) getItemMetadata(
_ context.Context,
_ models.Channelable,
) (io.ReadCloser, int, error) {
return nil, 0, errMetadataFilesNotSupported
}
//lint:ignore U1000 false linter issue due to generics
func (bh channelsBackupHandler) augmentItemInfo(
dgi *details.GroupsInfo,
@ -113,6 +125,12 @@ func (bh channelsBackupHandler) augmentItemInfo(
// no-op
}
//lint:ignore U1000 false linter issue due to generics
func (bh channelsBackupHandler) supportsItemMetadata() bool {
// No .data and .meta files for channel messages
return false
}
func channelContainer(ch models.Channelable) container[models.Channelable] {
return container[models.Channelable]{
storageDirFolders: path.Elements{ptr.Val(ch.GetId())},

View File

@ -23,6 +23,7 @@ import (
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph/metadata"
)
var (
@ -30,6 +31,8 @@ var (
_ data.BackupCollection = &lazyFetchCollection[graph.GetIDer, groupsItemer]{}
)
var errMetadataFilesNotSupported = clues.New("metadata files not supported")
const (
collectionChannelBufferSize = 1000
numberOfRetries = 4
@ -176,7 +179,16 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul
defer wg.Done()
defer func() { <-semaphoreCh }()
col.stream <- data.NewDeletedItem(id)
// This is a no-op for conversations, as there is no way to detect
// deleted items in a conversation. It might be added in the future
// if graph supports it, so make sure we put up both .data and .meta
// files for deletions.
if col.getAndAugment.supportsItemMetadata() {
col.stream <- data.NewDeletedItem(id + metadata.DataFileSuffix)
col.stream <- data.NewDeletedItem(id + metadata.MetaFileSuffix)
} else {
col.stream <- data.NewDeletedItem(id)
}
atomic.AddInt64(&streamedItems, 1)
col.Counter.Inc(count.StreamItemsRemoved)
@ -200,6 +212,18 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul
defer wg.Done()
defer func() { <-semaphoreCh }()
ictx := clues.Add(
ctx,
"item_id", id,
"parent_path", path.LoggableDir(col.LocationPath().String()))
// Conversation posts carry a .data suffix, while channel messages
// don't have any suffix. Metadata files are only supported for conversations.
dataFile := id
if col.getAndAugment.supportsItemMetadata() {
dataFile += metadata.DataFileSuffix
}
writer := kjson.NewJsonSerializationWriter()
defer writer.Close()
@ -234,24 +258,48 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul
info.ParentPath = col.LocationPath().String()
storeItem, err := data.NewPrefetchedItemWithInfo(
dataItem, err := data.NewPrefetchedItemWithInfo(
io.NopCloser(bytes.NewReader(itemData)),
id,
dataFile,
details.ItemInfo{Groups: info})
if err != nil {
err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err)
err := clues.StackWC(ictx, err).Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ictx, err)
return
}
col.stream <- storeItem
// Handle metadata before data so that if metadata file fails,
// we are not left with an orphaned data file. No op for channel
// messages.
if col.getAndAugment.supportsItemMetadata() {
metaFile := id + metadata.MetaFileSuffix
// Use modTime from item info as it's the latest.
metaItem, err := downloadItemMeta[C, I](
ictx,
col.getAndAugment,
col.contains,
metaFile,
info.Modified,
el)
if err != nil {
return
}
col.stream <- metaItem
}
// Add data file to collection only if the metadata file(if any) was
// successfully added. This also helps to maintain consistency between
// prefetch & lazy fetch collection behaviors.
col.stream <- dataItem
atomic.AddInt64(&streamedItems, 1)
atomic.AddInt64(&totalBytes, info.Size)
if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 {
logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values())
logger.Ctx(ictx).Infow("item stream progress", "stats", col.Counter.Values())
}
col.Counter.Add(count.StreamBytesAdded, info.Size)
@ -341,7 +389,16 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau
defer wg.Done()
defer func() { <-semaphoreCh }()
col.stream <- data.NewDeletedItem(id)
// This is a no-op for conversations, as there is no way to detect
// deleted items in a conversation. It might be added in the future
// if graph supports it, so make sure we put up both .data and .meta
// files for deletions.
if col.getAndAugment.supportsItemMetadata() {
col.stream <- data.NewDeletedItem(id + metadata.DataFileSuffix)
col.stream <- data.NewDeletedItem(id + metadata.MetaFileSuffix)
} else {
col.stream <- data.NewDeletedItem(id)
}
atomic.AddInt64(&streamedItems, 1)
col.Counter.Inc(count.StreamItemsRemoved)
@ -370,6 +427,39 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau
"item_id", id,
"parent_path", path.LoggableDir(col.LocationPath().String()))
// Channel message files don't carry .data suffix, post files do.
dataFile := id
// Handle metadata before data so that if metadata file fails,
// we are not left with an orphaned data file.
//
// If the data download fails for some reason other than deleted in
// flight, we will still end up persisting a .meta file. This is
// fine since the next backup will overwrite it.
//
// If item is deleted in flight, we will end up with an orphaned
// .meta file. The only impact here is storage bloat, which
// is minimal. Other impact could be if we do an in-order restore
// using a tree built from .meta files. We may have some .meta
// files without corresponding .data files.
if col.getAndAugment.supportsItemMetadata() {
dataFile += metadata.DataFileSuffix
metaFile := id + metadata.MetaFileSuffix
metaItem, err := downloadItemMeta[C, I](
ictx,
col.getAndAugment,
col.contains,
metaFile,
modTime,
el)
if err != nil {
return
}
col.stream <- metaItem
}
col.stream <- data.NewLazyItemWithInfo(
ictx,
&lazyItemGetter[C, I]{
@ -381,7 +471,7 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau
contains: col.contains,
parentPath: col.LocationPath().String(),
},
id,
dataFile,
modTime,
col.Counter,
el)
@ -463,3 +553,36 @@ func (lig *lazyItemGetter[C, I]) GetData(
false,
nil
}
func downloadItemMeta[C graph.GetIDer, I groupsItemer](
ctx context.Context,
gim getItemMetadataer[C, I],
c container[C],
metaFile string,
modTime time.Time,
errs *fault.Bus,
) (data.Item, error) {
itemMeta, _, err := gim.getItemMetadata(
ctx,
c.container)
if err != nil {
errs.AddRecoverable(ctx, clues.StackWC(ctx, err))
return nil, err
}
// Skip adding progress reader for metadata files. It doesn't add
// much value.
storeItem, err := data.NewPrefetchedItem(
itemMeta,
metaFile,
// Use the same last modified time as post's.
modTime)
if err != nil {
errs.AddRecoverable(ctx, clues.StackWC(ctx, err))
return nil, err
}
return storeItem, nil
}

View File

@ -5,6 +5,7 @@ import (
"context"
"io"
"slices"
"strings"
"testing"
"time"
@ -13,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/readers"
@ -165,11 +167,24 @@ func (m getAndAugmentChannelMessage) getItem(
return msg, &details.GroupsInfo{}, m.Err
}
//lint:ignore U1000 false linter issue due to generics
func (getAndAugmentChannelMessage) getItemMetadata(
_ context.Context,
_ models.Channelable,
) (io.ReadCloser, int, error) {
return nil, 0, errMetadataFilesNotSupported
}
//lint:ignore U1000 false linter issue due to generics
func (getAndAugmentChannelMessage) augmentItemInfo(*details.GroupsInfo, models.Channelable) {
// no-op
}
//lint:ignore U1000 false linter issue due to generics
func (getAndAugmentChannelMessage) supportsItemMetadata() bool {
return false
}
func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() {
var (
t = suite.T()
@ -279,6 +294,7 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() {
type getAndAugmentConversation struct {
GetItemErr error
GetMetaErr error
CallIDs []string
}
@ -297,12 +313,25 @@ func (m *getAndAugmentConversation) getItem(
return p, &details.GroupsInfo{}, m.GetItemErr
}
//lint:ignore U1000 false linter issue due to generics
func (m *getAndAugmentConversation) getItemMetadata(
_ context.Context,
_ models.Conversationable,
) (io.ReadCloser, int, error) {
return io.NopCloser(strings.NewReader("test")), 4, m.GetMetaErr
}
//
//lint:ignore U1000 false linter issue due to generics
func (m *getAndAugmentConversation) augmentItemInfo(*details.GroupsInfo, models.Conversationable) {
// no-op
}
//lint:ignore U1000 false linter issue due to generics
func (m *getAndAugmentConversation) supportsItemMetadata() bool {
return true
}
func (m *getAndAugmentConversation) check(t *testing.T, expected []string) {
// Sort before comparing. We could use a set, but that would prevent us from
// detecting duplicates.
@ -312,7 +341,7 @@ func (m *getAndAugmentConversation) check(t *testing.T, expected []string) {
assert.Equal(t, expected, m.CallIDs, "expected calls")
}
func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
func (suite *CollectionUnitSuite) TestLazyFetchCollection_Conversations() {
var (
t = suite.T()
start = time.Now().Add(-time.Second)
@ -343,13 +372,11 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
added: map[string]time.Time{
"fisher": start.Add(time.Minute),
"flannigan": start.Add(2 * time.Minute),
"fitzbog": start.Add(3 * time.Minute),
},
expectItemCount: 3,
expectItemCount: 4,
expectReads: []string{
"fisher",
"flannigan",
"fitzbog",
},
},
{
@ -359,7 +386,7 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
"poppy": {},
"petunia": {},
},
expectItemCount: 3,
expectItemCount: 6,
},
{
// TODO(pandeyabs): Overlaps between added and removed are deleted
@ -371,23 +398,23 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
// prefetch collections.
name: "added and removed items",
added: map[string]time.Time{
"goblin": {},
"goblin": start.Add(time.Minute),
},
removed: map[string]struct{}{
"general": {},
"goose": {},
"grumbles": {},
},
expectItemCount: 4,
expectItemCount: 8,
},
}
for _, test := range table {
suite.Run(test.name, func() {
var (
t = suite.T()
errs = fault.New(true)
itemCount int
t = suite.T()
errs = fault.New(true)
itemMap = map[string]data.Item{}
)
ctx, flush := tester.NewContext(t)
@ -413,9 +440,20 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
}
for item := range col.Items(ctx, errs) {
itemCount++
var trimmedID string
_, rok := test.removed[item.ID()]
switch {
case strings.HasSuffix(item.ID(), ".data"):
trimmedID = strings.TrimSuffix(item.ID(), ".data")
case strings.HasSuffix(item.ID(), ".meta"):
trimmedID = strings.TrimSuffix(item.ID(), ".meta")
default:
assert.Fail(t, "unexpected item suffix: %s", item.ID())
}
itemMap[item.ID()] = item
_, rok := test.removed[trimmedID]
if rok {
assert.True(t, item.Deleted(), "removals should be marked as deleted")
dimt, ok := item.(data.ItemModTime)
@ -423,7 +461,7 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
assert.True(t, dimt.ModTime().After(start), "deleted items should set mod time to now()")
}
modTime, aok := test.added[item.ID()]
modTime, aok := test.added[trimmedID]
if !rok && aok {
// Item's mod time should be what's passed into the collection
// initializer.
@ -434,7 +472,10 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
// Check if the test wants us to read the item's data so the lazy
// data fetch is executed.
if slices.Contains(test.expectReads, item.ID()) {
//
// Inspect with trimmedID since getItem() operates on ID, not the file
if strings.HasSuffix(item.ID(), ".data") &&
slices.Contains(test.expectReads, trimmedID) {
r := item.ToReader()
_, err := io.ReadAll(r)
@ -458,9 +499,40 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
assert.NoError(t, errs.Failure())
assert.Equal(
t,
test.expectItemCount,
itemCount,
test.expectItemCount, // 2*(len(test.added)+len(test.removed)),
len(itemMap),
"should see all expected items")
addedAndRemoved := append(maps.Keys(test.added), maps.Keys(test.removed)...)
for _, id := range addedAndRemoved {
// Should have a .data and a .meta file
d, ok := itemMap[id+".data"]
assert.True(t, ok, "should have data file for %q", id)
m, ok := itemMap[id+".meta"]
assert.True(t, ok, "should have meta file for %q", id)
// Meta files should not have item info.
assert.Implements(t, (*data.Item)(nil), m)
if slices.Contains(maps.Keys(test.removed), id) {
continue
}
// Mod times should match. Not doing this check for removed items
// since mod time is set to now() for them.
assert.Equal(t, d.(data.ItemModTime).ModTime(), m.(data.ItemModTime).ModTime(), "item mod time")
// Read meta file data. The data is of no significance, we just want
// to make sure the file is readable.
r := m.ToReader()
_, err := io.ReadAll(r)
assert.NoError(t, err, clues.ToCore(err))
r.Close()
}
})
}
}

View File

@ -1,12 +1,16 @@
package groups
import (
"bytes"
"context"
"encoding/json"
"io"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/m365/collection/groups/metadata"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -132,6 +136,24 @@ func (bh conversationsBackupHandler) getItem(
postID)
}
//lint:ignore U1000 false linter issue due to generics
func (bh conversationsBackupHandler) getItemMetadata(
ctx context.Context,
c models.Conversationable,
) (io.ReadCloser, int, error) {
meta := metadata.ConversationPostMetadata{
Recipients: []string{bh.resourceEmail},
Topic: ptr.Val(c.GetTopic()),
}
metaJSON, err := json.Marshal(meta)
if err != nil {
return nil, 0, clues.WrapWC(ctx, err, "serializing post metadata")
}
return io.NopCloser(bytes.NewReader(metaJSON)), len(metaJSON), nil
}
//lint:ignore U1000 false linter issue due to generics
func (bh conversationsBackupHandler) augmentItemInfo(
dgi *details.GroupsInfo,
@ -148,6 +170,11 @@ func (bh conversationsBackupHandler) augmentItemInfo(
dgi.Post.Topic = ptr.Val(c.GetTopic())
}
//lint:ignore U1000 false linter issue due to generics
func (bh conversationsBackupHandler) supportsItemMetadata() bool {
return true
}
func conversationThreadContainer(
c models.Conversationable,
t models.ConversationThreadable,

View File

@ -0,0 +1,62 @@
package groups
import (
"encoding/json"
"io"
"testing"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/m365/collection/groups/metadata"
"github.com/alcionai/corso/src/internal/tester"
)
type ConversationHandlerUnitSuite struct {
tester.Suite
}
func TestConversationHandlerUnitSuite(t *testing.T) {
suite.Run(t, &ConversationHandlerUnitSuite{Suite: tester.NewUnitSuite(t)})
}
// Basic test to ensure metadata is serialized and deserialized correctly.
func (suite *ConversationHandlerUnitSuite) TestGetItemMetadata() {
var (
t = suite.T()
bh = conversationsBackupHandler{
resourceEmail: "test@example.com",
}
topic = "test topic"
conv = models.NewConversation()
)
ctx, flush := tester.NewContext(t)
defer flush()
conv.SetTopic(&topic)
rc, size, err := bh.getItemMetadata(ctx, conv)
assert.NoError(t, err, clues.ToCore(err))
require.NotNil(t, rc, "nil read closer")
assert.Greater(t, size, 0, "incorrect size")
defer rc.Close()
m, err := io.ReadAll(rc)
assert.NoError(t, err, "reading metadata")
var meta metadata.ConversationPostMetadata
err = json.Unmarshal(m, &meta)
assert.NoError(t, err, "deserializing metadata")
assert.Equal(t, []string{"test@example.com"}, meta.Recipients, "incorrect recipients")
assert.Equal(t, ptr.Val(conv.GetTopic()), meta.Topic, "incorrect topic")
}

View File

@ -4,17 +4,21 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"strings"
"time"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/converters/eml"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/metrics"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
@ -26,19 +30,25 @@ func NewExportCollection(
backupVersion int,
cec control.ExportConfig,
stats *metrics.ExportStats,
cat path.CategoryType,
) export.Collectioner {
s := streamChannelItems
if cat == path.ConversationPostsCategory {
s = streamConversationPosts
}
return export.BaseCollection{
BaseDir: baseDir,
BackingCollection: backingCollections,
BackupVersion: backupVersion,
Cfg: cec,
Stream: streamItems,
Stream: s,
Stats: stats,
}
}
// streamItems streams the items in the backingCollection into the export stream chan
func streamItems(
func streamChannelItems(
ctx context.Context,
drc []data.RestoreCollection,
backupVersion int,
@ -198,3 +208,130 @@ func makeMinimumChannelMesasge(item models.ChatMessageable) minimumChannelMessag
Subject: ptr.Val(item.GetSubject()),
}
}
// streamItems streams the items in the backingCollection into the export stream chan
func streamConversationPosts(
ctx context.Context,
drc []data.RestoreCollection,
backupVersion int,
cec control.ExportConfig,
ch chan<- export.Item,
stats *metrics.ExportStats,
) {
defer close(ch)
errs := fault.New(false)
for _, rc := range drc {
ictx := clues.Add(ctx, "path_short_ref", rc.FullPath().ShortRef())
for item := range rc.Items(ctx, errs) {
// Trim .data suffix from itemID
trimmedName := strings.TrimSuffix(item.ID(), dataFileSuffix)
name := trimmedName + ".eml"
meta, _ := getMetadataContents(ictx, item.ID(), rc)
fmt.Println(meta)
itemCtx := clues.Add(ictx, "stream_item_id", item.ID())
reader := item.ToReader()
content, err := io.ReadAll(reader)
reader.Close()
if err != nil {
ch <- export.Item{
ID: item.ID(),
Error: err,
}
continue
}
topic := rc.FullPath().Folders()[0]
email, err := eml.FromJSONPost(itemCtx, content, topic)
if err != nil {
err = clues.Wrap(err, "converting JSON to eml")
logger.CtxErr(ctx, err).Info("processing collection item")
ch <- export.Item{
ID: item.ID(),
Error: err,
}
continue
}
emlReader := io.NopCloser(bytes.NewReader([]byte(email)))
body := metrics.ReaderWithStats(emlReader, path.EmailCategory, stats)
ch <- export.Item{
ID: item.ID(),
Name: name,
Body: body,
}
}
items, recovered := errs.ItemsAndRecovered()
// Return all the items that we failed to source from the persistence layer
for _, item := range items {
ch <- export.Item{
ID: item.ID,
Error: &item,
}
}
for _, err := range recovered {
ch <- export.Item{
Error: err,
}
}
}
}
func getMetadataContents(
ctx context.Context,
itemID string,
fin data.FetchItemByNamer,
) (ConversationPostMetadata, error) {
// Trim .data suffix from itemID
trimmedName := strings.TrimSuffix(itemID, dataFileSuffix)
metaName := trimmedName + metaFileSuffix
meta, err := fin.FetchItemByName(ctx, metaName)
if err != nil {
return ConversationPostMetadata{}, clues.Wrap(err, "fetching metadata")
}
metaReader := meta.ToReader()
defer metaReader.Close()
metaFormatted, err := getMetadata(metaReader)
if err != nil {
return ConversationPostMetadata{}, clues.Wrap(err, "deserializing metadata")
}
return metaFormatted, nil
}
// getMetadata read and parses the metadata info for an item
func getMetadata(metar io.ReadCloser) (ConversationPostMetadata, error) {
var meta ConversationPostMetadata
// `metar` will be nil for the top level container folder
if metar != nil {
metaraw, err := io.ReadAll(metar)
if err != nil {
return ConversationPostMetadata{}, err
}
err = json.Unmarshal(metaraw, &meta)
if err != nil {
return ConversationPostMetadata{}, err
}
}
return meta, nil
}

View File

@ -2,6 +2,7 @@ package groups
import (
"context"
"io"
"github.com/microsoft/kiota-abstractions-go/serialization"
@ -33,7 +34,9 @@ type backupHandler[C graph.GetIDer, I groupsItemer] interface {
type getItemAndAugmentInfoer[C graph.GetIDer, I groupsItemer] interface {
getItemer[I]
getItemMetadataer[C, I]
augmentItemInfoer[C]
supportsItemMetadataer[C, I]
}
type augmentItemInfoer[C graph.GetIDer] interface {
@ -51,6 +54,17 @@ type getItemer[I groupsItemer] interface {
) (I, *details.GroupsInfo, error)
}
type getItemMetadataer[C graph.GetIDer, I groupsItemer] interface {
getItemMetadata(
ctx context.Context,
c C,
) (io.ReadCloser, int, error)
}
type supportsItemMetadataer[C graph.GetIDer, I groupsItemer] interface {
supportsItemMetadata() bool
}
// gets all containers for the resource
type getContainerser[C graph.GetIDer] interface {
getContainers(

View File

@ -0,0 +1,8 @@
package metadata
// ConversationPostMetadata stores metadata for a given conversation post,
// stored as a .meta file in kopia.
type ConversationPostMetadata struct {
Recipients []string `json:"recipients,omitempty"`
Topic string `json:"topic,omitempty"`
}

View File

@ -82,7 +82,7 @@ func (h *baseGroupsHandler) ProduceExportCollections(
)
switch cat {
case path.ChannelMessagesCategory:
case path.ChannelMessagesCategory, path.ConversationPostsCategory:
folders = append(folders, fp.Folders()...)
coll = groups.NewExportCollection(
@ -90,7 +90,8 @@ func (h *baseGroupsHandler) ProduceExportCollections(
[]data.RestoreCollection{restoreColl},
backupVersion,
exportCfg,
stats)
stats,
cat)
case path.LibrariesCategory:
drivePath, err := path.ToDrivePath(restoreColl.FullPath())

View File

@ -143,7 +143,8 @@ func makeRestorePathsForEntry(
switch true {
case ent.Exchange != nil ||
(ent.Groups != nil && ent.Groups.ItemType == details.GroupsChannelMessage) ||
(ent.SharePoint != nil && ent.SharePoint.ItemType == details.SharePointList):
(ent.SharePoint != nil && ent.SharePoint.ItemType == details.SharePointList) ||
(ent.Groups != nil && ent.Groups.ItemType == details.GroupsConversationPost):
// TODO(ashmrtn): Eventually make Events have it's own function to handle
// setting the restore destination properly.
res.RestorePath, err = basicLocationPath(repoRef, locRef)

View File

@ -1,6 +1,8 @@
package metadata
import "strings"
import (
"strings"
)
const (
MetaFileSuffix = ".meta"

View File

@ -9,9 +9,12 @@ func IsMetadataFile(p path.Path) bool {
case path.OneDriveService:
return HasMetaSuffix(p.Item())
case path.SharePointService, path.GroupsService:
case path.SharePointService:
return p.Category() == path.LibrariesCategory && HasMetaSuffix(p.Item())
case path.GroupsService:
return p.Category() == path.LibrariesCategory && HasMetaSuffix(p.Item()) ||
p.Category() == path.ConversationPostsCategory && HasMetaSuffix(p.Item())
default:
return false
}

View File

@ -152,3 +152,108 @@ func (suite *MetadataUnitSuite) TestIsMetadataFile_Directories() {
}
}
}
func (suite *MetadataUnitSuite) TestIsMetadataFile() {
table := []struct {
name string
service path.ServiceType
category path.CategoryType
isMetaFile bool
expected bool
}{
{
name: "onedrive .data file",
service: path.OneDriveService,
category: path.FilesCategory,
},
{
name: "sharepoint library .data file",
service: path.SharePointService,
category: path.LibrariesCategory,
},
{
name: "group library .data file",
service: path.GroupsService,
category: path.LibrariesCategory,
},
{
name: "group conversations .data file",
service: path.GroupsService,
category: path.ConversationPostsCategory,
},
{
name: "onedrive .meta file",
service: path.OneDriveService,
category: path.FilesCategory,
isMetaFile: true,
expected: true,
},
{
name: "sharepoint library .meta file",
service: path.SharePointService,
category: path.LibrariesCategory,
isMetaFile: true,
expected: true,
},
{
name: "group library .meta file",
service: path.GroupsService,
category: path.LibrariesCategory,
isMetaFile: true,
expected: true,
},
{
name: "group conversations .meta file",
service: path.GroupsService,
category: path.ConversationPostsCategory,
isMetaFile: true,
expected: true,
},
// For services which don't have metadata files, make sure the function
// returns false. We don't want .meta suffix (assuming it exists) in
// these cases to be interpreted as metadata files.
{
name: "exchange service",
service: path.ExchangeService,
category: path.EmailCategory,
isMetaFile: true,
},
{
name: "group channels",
service: path.GroupsService,
category: path.ChannelMessagesCategory,
isMetaFile: true,
},
{
name: "lists",
service: path.SharePointService,
category: path.ListsCategory,
isMetaFile: true,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
fileName := "file"
if test.isMetaFile {
fileName += metadata.MetaFileSuffix
} else {
fileName += metadata.DataFileSuffix
}
p, err := path.Build(
"t",
"u",
test.service,
test.category,
true,
"some", "path", "for", fileName)
require.NoError(t, err, clues.ToCore(err))
actual := metadata.IsMetadataFile(p)
assert.Equal(t, test.expected, actual)
})
}
}

View File

@ -629,6 +629,32 @@ func BytesToMessageable(body []byte) (models.Messageable, error) {
return v.(models.Messageable), nil
}
func bytesToPostable(body []byte) (models.Postable, error) {
v, err := CreateFromBytes(body, models.CreatePostFromDiscriminatorValue)
if err != nil {
if !strings.Contains(err.Error(), invalidJSON) {
return nil, clues.Wrap(err, "deserializing bytes to message")
}
// If the JSON was invalid try sanitizing and deserializing again.
// Sanitizing should transform characters < 0x20 according to the spec where
// possible. The resulting JSON may still be invalid though.
body = sanitize.JSONBytes(body)
v, err = CreateFromBytes(body, models.CreateMessageFromDiscriminatorValue)
}
return v.(models.Postable), clues.Stack(err).OrNil()
}
func BytesToPostable(body []byte) (models.Postable, error) {
v, err := bytesToPostable(body)
if err != nil {
return nil, clues.Stack(err)
}
return v.(models.Postable), nil
}
func (c Mail) Serialize(
ctx context.Context,
item serialization.Parsable,