Compare commits
14 Commits
main
...
export_con
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f8033e6b70 | ||
|
|
0b4ea2f213 | ||
|
|
5ec4e1d21d | ||
|
|
06a2028738 | ||
|
|
d210ba475f | ||
|
|
152d78de21 | ||
|
|
eb6376e641 | ||
|
|
9f68f379e7 | ||
|
|
5230e60b24 | ||
|
|
7477c58698 | ||
|
|
deaf8dcd95 | ||
|
|
3e2268e524 | ||
|
|
abca65daa9 | ||
|
|
41eb63686d |
@ -7,7 +7,6 @@ import (
|
||||
"github.com/alcionai/corso/src/cli/flags"
|
||||
"github.com/alcionai/corso/src/cli/utils"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
)
|
||||
|
||||
// called by export.go to map subcommands to provider-specific handling.
|
||||
@ -95,7 +94,7 @@ func exportGroupsCmd(cmd *cobra.Command, args []string) error {
|
||||
|
||||
// TODO(pandeyabs): Exclude conversations from export since they are not
|
||||
// supported yet. https://github.com/alcionai/corso/issues/4822
|
||||
sel.Exclude(sel.Conversation(selectors.Any()))
|
||||
// sel.Exclude(sel.Conversation(selectors.Any()))
|
||||
|
||||
acceptedGroupsFormatTypes := []string{
|
||||
string(control.DefaultFormat),
|
||||
|
||||
@ -303,3 +303,138 @@ func FromJSON(ctx context.Context, body []byte) (string, error) {
|
||||
|
||||
return email.GetMessage(), nil
|
||||
}
|
||||
|
||||
// FromJSON converts a Messageable (as json) to .eml format
|
||||
func FromJSONPost(ctx context.Context, body []byte, topic string) (string, error) {
|
||||
data, err := api.BytesToPostable(body)
|
||||
if err != nil {
|
||||
return "", clues.WrapWC(ctx, err, "converting to postable")
|
||||
}
|
||||
|
||||
ctx = clues.Add(ctx, "item_id", ptr.Val(data.GetId()))
|
||||
|
||||
email := mail.NewMSG()
|
||||
email.AllowDuplicateAddress = true // More "correct" conversion
|
||||
email.AddBccToHeader = true // Don't ignore Bcc
|
||||
email.AllowEmptyAttachments = true // Don't error on empty attachments
|
||||
email.UseProvidedAddress = true // Don't try to parse the email address
|
||||
|
||||
if data.GetFrom() != nil {
|
||||
email.SetFrom(formatAddress(data.GetFrom().GetEmailAddress()))
|
||||
}
|
||||
|
||||
if data.GetReceivedDateTime() != nil {
|
||||
email.SetDate(ptr.Val(data.GetReceivedDateTime()).Format(dateFormat))
|
||||
}
|
||||
|
||||
// if data.GetSender() != nil {
|
||||
// email.SetReplyTo(formatAddress(data.GetSender().GetEmailAddress()))
|
||||
// }
|
||||
|
||||
// if data.GetCcRecipients() != nil {
|
||||
// for _, recipient := range data.GetCcRecipients() {
|
||||
// email.AddCc(formatAddress(recipient.GetEmailAddress()))
|
||||
// }
|
||||
// }
|
||||
|
||||
// if data.GetBccRecipients() != nil {
|
||||
// for _, recipient := range data.GetBccRecipients() {
|
||||
// email.AddBcc(formatAddress(recipient.GetEmailAddress()))
|
||||
// }
|
||||
// }
|
||||
|
||||
// if data.GetInReplyTo() != nil {
|
||||
// rts := data.GetInReplyTo()
|
||||
|
||||
// email.SetReplyTo(formatAddress(rts.GetEmailAddress()))
|
||||
|
||||
// }
|
||||
|
||||
email.SetSubject(topic)
|
||||
email.AddTo("dc_test@10rqc2.onmicrosoft.com")
|
||||
|
||||
// if data.GetSentDateTime() != nil {
|
||||
// email.SetDate(ptr.Val(data.GetSentDateTime()).Format(dateFormat))
|
||||
// }
|
||||
|
||||
if data.GetBody() != nil {
|
||||
if data.GetBody().GetContentType() != nil {
|
||||
var contentType mail.ContentType
|
||||
|
||||
switch data.GetBody().GetContentType().String() {
|
||||
case "html":
|
||||
contentType = mail.TextHTML
|
||||
case "text":
|
||||
contentType = mail.TextPlain
|
||||
default:
|
||||
// https://learn.microsoft.com/en-us/graph/api/resources/itembody?view=graph-rest-1.0#properties
|
||||
// This should not be possible according to the documentation
|
||||
logger.Ctx(ctx).
|
||||
With("body_type", data.GetBody().GetContentType().String()).
|
||||
Info("unknown body content type")
|
||||
|
||||
contentType = mail.TextPlain
|
||||
}
|
||||
|
||||
email.SetBody(contentType, ptr.Val(data.GetBody().GetContent()))
|
||||
}
|
||||
}
|
||||
|
||||
if data.GetAttachments() != nil {
|
||||
for _, attachment := range data.GetAttachments() {
|
||||
kind := ptr.Val(attachment.GetContentType())
|
||||
|
||||
bytes, err := attachment.GetBackingStore().Get("contentBytes")
|
||||
if err != nil {
|
||||
return "", clues.WrapWC(ctx, err, "failed to get attachment bytes")
|
||||
}
|
||||
|
||||
if bytes == nil {
|
||||
// Some attachments have an "item" field instead of
|
||||
// "contentBytes". There are items like contacts, emails
|
||||
// or calendar events which will not be a normal format
|
||||
// and will have to be converted to a text format.
|
||||
// TODO(meain): Handle custom attachments
|
||||
// https://github.com/alcionai/corso/issues/4772
|
||||
logger.Ctx(ctx).
|
||||
With("attachment_id", ptr.Val(attachment.GetId())).
|
||||
Info("unhandled attachment type")
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
bts, ok := bytes.([]byte)
|
||||
if !ok {
|
||||
return "", clues.WrapWC(ctx, err, "invalid content bytes")
|
||||
}
|
||||
|
||||
name := ptr.Val(attachment.GetName())
|
||||
|
||||
contentID, err := attachment.GetBackingStore().Get("contentId")
|
||||
if err != nil {
|
||||
return "", clues.WrapWC(ctx, err, "getting content id for attachment")
|
||||
}
|
||||
|
||||
if contentID != nil {
|
||||
cids, _ := str.AnyToString(contentID)
|
||||
if len(cids) > 0 {
|
||||
name = cids
|
||||
}
|
||||
}
|
||||
|
||||
email.Attach(&mail.File{
|
||||
// cannot use filename as inline attachment will not get mapped properly
|
||||
Name: name,
|
||||
MimeType: kind,
|
||||
Data: bts,
|
||||
Inline: ptr.Val(attachment.GetIsInline()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if err = email.GetError(); err != nil {
|
||||
return "", clues.WrapWC(ctx, err, "converting to eml")
|
||||
}
|
||||
|
||||
return email.GetMessage(), nil
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ package groups
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -60,6 +61,11 @@ func (bh mockBackupHandler) augmentItemInfo(
|
||||
// no-op
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (bh mockBackupHandler) supportsItemMetadata() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (bh mockBackupHandler) canMakeDeltaQueries() bool {
|
||||
return true
|
||||
}
|
||||
@ -136,6 +142,13 @@ func (bh mockBackupHandler) getItem(
|
||||
return bh.messages[itemID], bh.info[itemID], bh.getMessageErr[itemID]
|
||||
}
|
||||
|
||||
func (bh mockBackupHandler) getItemMetadata(
|
||||
_ context.Context,
|
||||
_ models.Channelable,
|
||||
) (io.ReadCloser, int, error) {
|
||||
return nil, 0, errMetadataFilesNotSupported
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Unit Suite
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@ -2,6 +2,7 @@ package groups
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
@ -105,6 +106,17 @@ func (bh channelsBackupHandler) getItem(
|
||||
return bh.ac.GetChannelMessage(ctx, groupID, containerIDs[0], messageID)
|
||||
}
|
||||
|
||||
// Channel messages don't carry metadata files. Return unsupported error.
|
||||
// Adding this method for interface compliance.
|
||||
//
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (bh channelsBackupHandler) getItemMetadata(
|
||||
_ context.Context,
|
||||
_ models.Channelable,
|
||||
) (io.ReadCloser, int, error) {
|
||||
return nil, 0, errMetadataFilesNotSupported
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (bh channelsBackupHandler) augmentItemInfo(
|
||||
dgi *details.GroupsInfo,
|
||||
@ -113,6 +125,12 @@ func (bh channelsBackupHandler) augmentItemInfo(
|
||||
// no-op
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (bh channelsBackupHandler) supportsItemMetadata() bool {
|
||||
// No .data and .meta files for channel messages
|
||||
return false
|
||||
}
|
||||
|
||||
func channelContainer(ch models.Channelable) container[models.Channelable] {
|
||||
return container[models.Channelable]{
|
||||
storageDirFolders: path.Elements{ptr.Val(ch.GetId())},
|
||||
|
||||
@ -23,6 +23,7 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api/graph/metadata"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -30,6 +31,8 @@ var (
|
||||
_ data.BackupCollection = &lazyFetchCollection[graph.GetIDer, groupsItemer]{}
|
||||
)
|
||||
|
||||
var errMetadataFilesNotSupported = clues.New("metadata files not supported")
|
||||
|
||||
const (
|
||||
collectionChannelBufferSize = 1000
|
||||
numberOfRetries = 4
|
||||
@ -176,7 +179,16 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphoreCh }()
|
||||
|
||||
col.stream <- data.NewDeletedItem(id)
|
||||
// This is a no-op for conversations, as there is no way to detect
|
||||
// deleted items in a conversation. It might be added in the future
|
||||
// if graph supports it, so make sure we put up both .data and .meta
|
||||
// files for deletions.
|
||||
if col.getAndAugment.supportsItemMetadata() {
|
||||
col.stream <- data.NewDeletedItem(id + metadata.DataFileSuffix)
|
||||
col.stream <- data.NewDeletedItem(id + metadata.MetaFileSuffix)
|
||||
} else {
|
||||
col.stream <- data.NewDeletedItem(id)
|
||||
}
|
||||
|
||||
atomic.AddInt64(&streamedItems, 1)
|
||||
col.Counter.Inc(count.StreamItemsRemoved)
|
||||
@ -200,6 +212,18 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphoreCh }()
|
||||
|
||||
ictx := clues.Add(
|
||||
ctx,
|
||||
"item_id", id,
|
||||
"parent_path", path.LoggableDir(col.LocationPath().String()))
|
||||
|
||||
// Conversation posts carry a .data suffix, while channel messages
|
||||
// don't have any suffix. Metadata files are only supported for conversations.
|
||||
dataFile := id
|
||||
if col.getAndAugment.supportsItemMetadata() {
|
||||
dataFile += metadata.DataFileSuffix
|
||||
}
|
||||
|
||||
writer := kjson.NewJsonSerializationWriter()
|
||||
defer writer.Close()
|
||||
|
||||
@ -234,24 +258,48 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul
|
||||
|
||||
info.ParentPath = col.LocationPath().String()
|
||||
|
||||
storeItem, err := data.NewPrefetchedItemWithInfo(
|
||||
dataItem, err := data.NewPrefetchedItemWithInfo(
|
||||
io.NopCloser(bytes.NewReader(itemData)),
|
||||
id,
|
||||
dataFile,
|
||||
details.ItemInfo{Groups: info})
|
||||
if err != nil {
|
||||
err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation)
|
||||
el.AddRecoverable(ctx, err)
|
||||
err := clues.StackWC(ictx, err).Label(fault.LabelForceNoBackupCreation)
|
||||
el.AddRecoverable(ictx, err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
col.stream <- storeItem
|
||||
// Handle metadata before data so that if metadata file fails,
|
||||
// we are not left with an orphaned data file. No op for channel
|
||||
// messages.
|
||||
if col.getAndAugment.supportsItemMetadata() {
|
||||
metaFile := id + metadata.MetaFileSuffix
|
||||
|
||||
// Use modTime from item info as it's the latest.
|
||||
metaItem, err := downloadItemMeta[C, I](
|
||||
ictx,
|
||||
col.getAndAugment,
|
||||
col.contains,
|
||||
metaFile,
|
||||
info.Modified,
|
||||
el)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
col.stream <- metaItem
|
||||
}
|
||||
|
||||
// Add data file to collection only if the metadata file(if any) was
|
||||
// successfully added. This also helps to maintain consistency between
|
||||
// prefetch & lazy fetch collection behaviors.
|
||||
col.stream <- dataItem
|
||||
|
||||
atomic.AddInt64(&streamedItems, 1)
|
||||
atomic.AddInt64(&totalBytes, info.Size)
|
||||
|
||||
if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 {
|
||||
logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values())
|
||||
logger.Ctx(ictx).Infow("item stream progress", "stats", col.Counter.Values())
|
||||
}
|
||||
|
||||
col.Counter.Add(count.StreamBytesAdded, info.Size)
|
||||
@ -341,7 +389,16 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphoreCh }()
|
||||
|
||||
col.stream <- data.NewDeletedItem(id)
|
||||
// This is a no-op for conversations, as there is no way to detect
|
||||
// deleted items in a conversation. It might be added in the future
|
||||
// if graph supports it, so make sure we put up both .data and .meta
|
||||
// files for deletions.
|
||||
if col.getAndAugment.supportsItemMetadata() {
|
||||
col.stream <- data.NewDeletedItem(id + metadata.DataFileSuffix)
|
||||
col.stream <- data.NewDeletedItem(id + metadata.MetaFileSuffix)
|
||||
} else {
|
||||
col.stream <- data.NewDeletedItem(id)
|
||||
}
|
||||
|
||||
atomic.AddInt64(&streamedItems, 1)
|
||||
col.Counter.Inc(count.StreamItemsRemoved)
|
||||
@ -370,6 +427,39 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau
|
||||
"item_id", id,
|
||||
"parent_path", path.LoggableDir(col.LocationPath().String()))
|
||||
|
||||
// Channel message files don't carry .data suffix, post files do.
|
||||
dataFile := id
|
||||
|
||||
// Handle metadata before data so that if metadata file fails,
|
||||
// we are not left with an orphaned data file.
|
||||
//
|
||||
// If the data download fails for some reason other than deleted in
|
||||
// flight, we will still end up persisting a .meta file. This is
|
||||
// fine since the next backup will overwrite it.
|
||||
//
|
||||
// If item is deleted in flight, we will end up with an orphaned
|
||||
// .meta file. The only impact here is storage bloat, which
|
||||
// is minimal. Other impact could be if we do an in-order restore
|
||||
// using a tree built from .meta files. We may have some .meta
|
||||
// files without corresponding .data files.
|
||||
if col.getAndAugment.supportsItemMetadata() {
|
||||
dataFile += metadata.DataFileSuffix
|
||||
metaFile := id + metadata.MetaFileSuffix
|
||||
|
||||
metaItem, err := downloadItemMeta[C, I](
|
||||
ictx,
|
||||
col.getAndAugment,
|
||||
col.contains,
|
||||
metaFile,
|
||||
modTime,
|
||||
el)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
col.stream <- metaItem
|
||||
}
|
||||
|
||||
col.stream <- data.NewLazyItemWithInfo(
|
||||
ictx,
|
||||
&lazyItemGetter[C, I]{
|
||||
@ -381,7 +471,7 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau
|
||||
contains: col.contains,
|
||||
parentPath: col.LocationPath().String(),
|
||||
},
|
||||
id,
|
||||
dataFile,
|
||||
modTime,
|
||||
col.Counter,
|
||||
el)
|
||||
@ -463,3 +553,36 @@ func (lig *lazyItemGetter[C, I]) GetData(
|
||||
false,
|
||||
nil
|
||||
}
|
||||
|
||||
func downloadItemMeta[C graph.GetIDer, I groupsItemer](
|
||||
ctx context.Context,
|
||||
gim getItemMetadataer[C, I],
|
||||
c container[C],
|
||||
metaFile string,
|
||||
modTime time.Time,
|
||||
errs *fault.Bus,
|
||||
) (data.Item, error) {
|
||||
itemMeta, _, err := gim.getItemMetadata(
|
||||
ctx,
|
||||
c.container)
|
||||
if err != nil {
|
||||
errs.AddRecoverable(ctx, clues.StackWC(ctx, err))
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Skip adding progress reader for metadata files. It doesn't add
|
||||
// much value.
|
||||
storeItem, err := data.NewPrefetchedItem(
|
||||
itemMeta,
|
||||
metaFile,
|
||||
// Use the same last modified time as post's.
|
||||
modTime)
|
||||
if err != nil {
|
||||
errs.AddRecoverable(ctx, clues.StackWC(ctx, err))
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return storeItem, nil
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -13,6 +14,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
"github.com/alcionai/corso/src/internal/common/readers"
|
||||
@ -165,11 +167,24 @@ func (m getAndAugmentChannelMessage) getItem(
|
||||
return msg, &details.GroupsInfo{}, m.Err
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (getAndAugmentChannelMessage) getItemMetadata(
|
||||
_ context.Context,
|
||||
_ models.Channelable,
|
||||
) (io.ReadCloser, int, error) {
|
||||
return nil, 0, errMetadataFilesNotSupported
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (getAndAugmentChannelMessage) augmentItemInfo(*details.GroupsInfo, models.Channelable) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (getAndAugmentChannelMessage) supportsItemMetadata() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() {
|
||||
var (
|
||||
t = suite.T()
|
||||
@ -279,6 +294,7 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() {
|
||||
|
||||
type getAndAugmentConversation struct {
|
||||
GetItemErr error
|
||||
GetMetaErr error
|
||||
CallIDs []string
|
||||
}
|
||||
|
||||
@ -297,12 +313,25 @@ func (m *getAndAugmentConversation) getItem(
|
||||
return p, &details.GroupsInfo{}, m.GetItemErr
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (m *getAndAugmentConversation) getItemMetadata(
|
||||
_ context.Context,
|
||||
_ models.Conversationable,
|
||||
) (io.ReadCloser, int, error) {
|
||||
return io.NopCloser(strings.NewReader("test")), 4, m.GetMetaErr
|
||||
}
|
||||
|
||||
//
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (m *getAndAugmentConversation) augmentItemInfo(*details.GroupsInfo, models.Conversationable) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (m *getAndAugmentConversation) supportsItemMetadata() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *getAndAugmentConversation) check(t *testing.T, expected []string) {
|
||||
// Sort before comparing. We could use a set, but that would prevent us from
|
||||
// detecting duplicates.
|
||||
@ -312,7 +341,7 @@ func (m *getAndAugmentConversation) check(t *testing.T, expected []string) {
|
||||
assert.Equal(t, expected, m.CallIDs, "expected calls")
|
||||
}
|
||||
|
||||
func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
|
||||
func (suite *CollectionUnitSuite) TestLazyFetchCollection_Conversations() {
|
||||
var (
|
||||
t = suite.T()
|
||||
start = time.Now().Add(-time.Second)
|
||||
@ -343,13 +372,11 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
|
||||
added: map[string]time.Time{
|
||||
"fisher": start.Add(time.Minute),
|
||||
"flannigan": start.Add(2 * time.Minute),
|
||||
"fitzbog": start.Add(3 * time.Minute),
|
||||
},
|
||||
expectItemCount: 3,
|
||||
expectItemCount: 4,
|
||||
expectReads: []string{
|
||||
"fisher",
|
||||
"flannigan",
|
||||
"fitzbog",
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -359,7 +386,7 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
|
||||
"poppy": {},
|
||||
"petunia": {},
|
||||
},
|
||||
expectItemCount: 3,
|
||||
expectItemCount: 6,
|
||||
},
|
||||
{
|
||||
// TODO(pandeyabs): Overlaps between added and removed are deleted
|
||||
@ -371,23 +398,23 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
|
||||
// prefetch collections.
|
||||
name: "added and removed items",
|
||||
added: map[string]time.Time{
|
||||
"goblin": {},
|
||||
"goblin": start.Add(time.Minute),
|
||||
},
|
||||
removed: map[string]struct{}{
|
||||
"general": {},
|
||||
"goose": {},
|
||||
"grumbles": {},
|
||||
},
|
||||
expectItemCount: 4,
|
||||
expectItemCount: 8,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range table {
|
||||
suite.Run(test.name, func() {
|
||||
var (
|
||||
t = suite.T()
|
||||
errs = fault.New(true)
|
||||
itemCount int
|
||||
t = suite.T()
|
||||
errs = fault.New(true)
|
||||
itemMap = map[string]data.Item{}
|
||||
)
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
@ -413,9 +440,20 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
|
||||
}
|
||||
|
||||
for item := range col.Items(ctx, errs) {
|
||||
itemCount++
|
||||
var trimmedID string
|
||||
|
||||
_, rok := test.removed[item.ID()]
|
||||
switch {
|
||||
case strings.HasSuffix(item.ID(), ".data"):
|
||||
trimmedID = strings.TrimSuffix(item.ID(), ".data")
|
||||
case strings.HasSuffix(item.ID(), ".meta"):
|
||||
trimmedID = strings.TrimSuffix(item.ID(), ".meta")
|
||||
default:
|
||||
assert.Fail(t, "unexpected item suffix: %s", item.ID())
|
||||
}
|
||||
|
||||
itemMap[item.ID()] = item
|
||||
|
||||
_, rok := test.removed[trimmedID]
|
||||
if rok {
|
||||
assert.True(t, item.Deleted(), "removals should be marked as deleted")
|
||||
dimt, ok := item.(data.ItemModTime)
|
||||
@ -423,7 +461,7 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
|
||||
assert.True(t, dimt.ModTime().After(start), "deleted items should set mod time to now()")
|
||||
}
|
||||
|
||||
modTime, aok := test.added[item.ID()]
|
||||
modTime, aok := test.added[trimmedID]
|
||||
if !rok && aok {
|
||||
// Item's mod time should be what's passed into the collection
|
||||
// initializer.
|
||||
@ -434,7 +472,10 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
|
||||
|
||||
// Check if the test wants us to read the item's data so the lazy
|
||||
// data fetch is executed.
|
||||
if slices.Contains(test.expectReads, item.ID()) {
|
||||
//
|
||||
// Inspect with trimmedID since getItem() operates on ID, not the file
|
||||
if strings.HasSuffix(item.ID(), ".data") &&
|
||||
slices.Contains(test.expectReads, trimmedID) {
|
||||
r := item.ToReader()
|
||||
|
||||
_, err := io.ReadAll(r)
|
||||
@ -458,9 +499,40 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
|
||||
assert.NoError(t, errs.Failure())
|
||||
assert.Equal(
|
||||
t,
|
||||
test.expectItemCount,
|
||||
itemCount,
|
||||
test.expectItemCount, // 2*(len(test.added)+len(test.removed)),
|
||||
len(itemMap),
|
||||
"should see all expected items")
|
||||
|
||||
addedAndRemoved := append(maps.Keys(test.added), maps.Keys(test.removed)...)
|
||||
for _, id := range addedAndRemoved {
|
||||
// Should have a .data and a .meta file
|
||||
d, ok := itemMap[id+".data"]
|
||||
assert.True(t, ok, "should have data file for %q", id)
|
||||
|
||||
m, ok := itemMap[id+".meta"]
|
||||
assert.True(t, ok, "should have meta file for %q", id)
|
||||
|
||||
// Meta files should not have item info.
|
||||
assert.Implements(t, (*data.Item)(nil), m)
|
||||
|
||||
if slices.Contains(maps.Keys(test.removed), id) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Mod times should match. Not doing this check for removed items
|
||||
// since mod time is set to now() for them.
|
||||
assert.Equal(t, d.(data.ItemModTime).ModTime(), m.(data.ItemModTime).ModTime(), "item mod time")
|
||||
|
||||
// Read meta file data. The data is of no significance, we just want
|
||||
// to make sure the file is readable.
|
||||
r := m.ToReader()
|
||||
|
||||
_, err := io.ReadAll(r)
|
||||
assert.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
r.Close()
|
||||
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,12 +1,16 @@
|
||||
package groups
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
"github.com/alcionai/corso/src/internal/m365/collection/groups/metadata"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/selectors"
|
||||
@ -132,6 +136,24 @@ func (bh conversationsBackupHandler) getItem(
|
||||
postID)
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (bh conversationsBackupHandler) getItemMetadata(
|
||||
ctx context.Context,
|
||||
c models.Conversationable,
|
||||
) (io.ReadCloser, int, error) {
|
||||
meta := metadata.ConversationPostMetadata{
|
||||
Recipients: []string{bh.resourceEmail},
|
||||
Topic: ptr.Val(c.GetTopic()),
|
||||
}
|
||||
|
||||
metaJSON, err := json.Marshal(meta)
|
||||
if err != nil {
|
||||
return nil, 0, clues.WrapWC(ctx, err, "serializing post metadata")
|
||||
}
|
||||
|
||||
return io.NopCloser(bytes.NewReader(metaJSON)), len(metaJSON), nil
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (bh conversationsBackupHandler) augmentItemInfo(
|
||||
dgi *details.GroupsInfo,
|
||||
@ -148,6 +170,11 @@ func (bh conversationsBackupHandler) augmentItemInfo(
|
||||
dgi.Post.Topic = ptr.Val(c.GetTopic())
|
||||
}
|
||||
|
||||
//lint:ignore U1000 false linter issue due to generics
|
||||
func (bh conversationsBackupHandler) supportsItemMetadata() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func conversationThreadContainer(
|
||||
c models.Conversationable,
|
||||
t models.ConversationThreadable,
|
||||
|
||||
@ -0,0 +1,62 @@
|
||||
package groups
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
"github.com/alcionai/corso/src/internal/m365/collection/groups/metadata"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
)
|
||||
|
||||
type ConversationHandlerUnitSuite struct {
|
||||
tester.Suite
|
||||
}
|
||||
|
||||
func TestConversationHandlerUnitSuite(t *testing.T) {
|
||||
suite.Run(t, &ConversationHandlerUnitSuite{Suite: tester.NewUnitSuite(t)})
|
||||
}
|
||||
|
||||
// Basic test to ensure metadata is serialized and deserialized correctly.
|
||||
func (suite *ConversationHandlerUnitSuite) TestGetItemMetadata() {
|
||||
var (
|
||||
t = suite.T()
|
||||
bh = conversationsBackupHandler{
|
||||
resourceEmail: "test@example.com",
|
||||
}
|
||||
|
||||
topic = "test topic"
|
||||
conv = models.NewConversation()
|
||||
)
|
||||
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
conv.SetTopic(&topic)
|
||||
|
||||
rc, size, err := bh.getItemMetadata(ctx, conv)
|
||||
assert.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
require.NotNil(t, rc, "nil read closer")
|
||||
assert.Greater(t, size, 0, "incorrect size")
|
||||
|
||||
defer rc.Close()
|
||||
|
||||
m, err := io.ReadAll(rc)
|
||||
assert.NoError(t, err, "reading metadata")
|
||||
|
||||
var meta metadata.ConversationPostMetadata
|
||||
|
||||
err = json.Unmarshal(m, &meta)
|
||||
assert.NoError(t, err, "deserializing metadata")
|
||||
|
||||
assert.Equal(t, []string{"test@example.com"}, meta.Recipients, "incorrect recipients")
|
||||
assert.Equal(t, ptr.Val(conv.GetTopic()), meta.Topic, "incorrect topic")
|
||||
}
|
||||
@ -4,17 +4,21 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
"github.com/alcionai/corso/src/internal/converters/eml"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/export"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/metrics"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
@ -26,19 +30,25 @@ func NewExportCollection(
|
||||
backupVersion int,
|
||||
cec control.ExportConfig,
|
||||
stats *metrics.ExportStats,
|
||||
cat path.CategoryType,
|
||||
) export.Collectioner {
|
||||
s := streamChannelItems
|
||||
if cat == path.ConversationPostsCategory {
|
||||
s = streamConversationPosts
|
||||
}
|
||||
|
||||
return export.BaseCollection{
|
||||
BaseDir: baseDir,
|
||||
BackingCollection: backingCollections,
|
||||
BackupVersion: backupVersion,
|
||||
Cfg: cec,
|
||||
Stream: streamItems,
|
||||
Stream: s,
|
||||
Stats: stats,
|
||||
}
|
||||
}
|
||||
|
||||
// streamItems streams the items in the backingCollection into the export stream chan
|
||||
func streamItems(
|
||||
func streamChannelItems(
|
||||
ctx context.Context,
|
||||
drc []data.RestoreCollection,
|
||||
backupVersion int,
|
||||
@ -198,3 +208,130 @@ func makeMinimumChannelMesasge(item models.ChatMessageable) minimumChannelMessag
|
||||
Subject: ptr.Val(item.GetSubject()),
|
||||
}
|
||||
}
|
||||
|
||||
// streamItems streams the items in the backingCollection into the export stream chan
|
||||
func streamConversationPosts(
|
||||
ctx context.Context,
|
||||
drc []data.RestoreCollection,
|
||||
backupVersion int,
|
||||
cec control.ExportConfig,
|
||||
ch chan<- export.Item,
|
||||
stats *metrics.ExportStats,
|
||||
) {
|
||||
defer close(ch)
|
||||
|
||||
errs := fault.New(false)
|
||||
|
||||
for _, rc := range drc {
|
||||
ictx := clues.Add(ctx, "path_short_ref", rc.FullPath().ShortRef())
|
||||
|
||||
for item := range rc.Items(ctx, errs) {
|
||||
// Trim .data suffix from itemID
|
||||
trimmedName := strings.TrimSuffix(item.ID(), dataFileSuffix)
|
||||
name := trimmedName + ".eml"
|
||||
meta, _ := getMetadataContents(ictx, item.ID(), rc)
|
||||
|
||||
fmt.Println(meta)
|
||||
|
||||
itemCtx := clues.Add(ictx, "stream_item_id", item.ID())
|
||||
|
||||
reader := item.ToReader()
|
||||
content, err := io.ReadAll(reader)
|
||||
|
||||
reader.Close()
|
||||
|
||||
if err != nil {
|
||||
ch <- export.Item{
|
||||
ID: item.ID(),
|
||||
Error: err,
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
topic := rc.FullPath().Folders()[0]
|
||||
email, err := eml.FromJSONPost(itemCtx, content, topic)
|
||||
if err != nil {
|
||||
err = clues.Wrap(err, "converting JSON to eml")
|
||||
|
||||
logger.CtxErr(ctx, err).Info("processing collection item")
|
||||
|
||||
ch <- export.Item{
|
||||
ID: item.ID(),
|
||||
Error: err,
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
emlReader := io.NopCloser(bytes.NewReader([]byte(email)))
|
||||
body := metrics.ReaderWithStats(emlReader, path.EmailCategory, stats)
|
||||
|
||||
ch <- export.Item{
|
||||
ID: item.ID(),
|
||||
Name: name,
|
||||
Body: body,
|
||||
}
|
||||
}
|
||||
|
||||
items, recovered := errs.ItemsAndRecovered()
|
||||
|
||||
// Return all the items that we failed to source from the persistence layer
|
||||
for _, item := range items {
|
||||
ch <- export.Item{
|
||||
ID: item.ID,
|
||||
Error: &item,
|
||||
}
|
||||
}
|
||||
|
||||
for _, err := range recovered {
|
||||
ch <- export.Item{
|
||||
Error: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getMetadataContents(
|
||||
ctx context.Context,
|
||||
itemID string,
|
||||
fin data.FetchItemByNamer,
|
||||
) (ConversationPostMetadata, error) {
|
||||
// Trim .data suffix from itemID
|
||||
trimmedName := strings.TrimSuffix(itemID, dataFileSuffix)
|
||||
metaName := trimmedName + metaFileSuffix
|
||||
|
||||
meta, err := fin.FetchItemByName(ctx, metaName)
|
||||
if err != nil {
|
||||
return ConversationPostMetadata{}, clues.Wrap(err, "fetching metadata")
|
||||
}
|
||||
|
||||
metaReader := meta.ToReader()
|
||||
defer metaReader.Close()
|
||||
|
||||
metaFormatted, err := getMetadata(metaReader)
|
||||
if err != nil {
|
||||
return ConversationPostMetadata{}, clues.Wrap(err, "deserializing metadata")
|
||||
}
|
||||
|
||||
return metaFormatted, nil
|
||||
}
|
||||
|
||||
// getMetadata read and parses the metadata info for an item
|
||||
func getMetadata(metar io.ReadCloser) (ConversationPostMetadata, error) {
|
||||
var meta ConversationPostMetadata
|
||||
// `metar` will be nil for the top level container folder
|
||||
if metar != nil {
|
||||
metaraw, err := io.ReadAll(metar)
|
||||
if err != nil {
|
||||
return ConversationPostMetadata{}, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(metaraw, &meta)
|
||||
if err != nil {
|
||||
return ConversationPostMetadata{}, err
|
||||
}
|
||||
}
|
||||
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ package groups
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/microsoft/kiota-abstractions-go/serialization"
|
||||
|
||||
@ -33,7 +34,9 @@ type backupHandler[C graph.GetIDer, I groupsItemer] interface {
|
||||
|
||||
type getItemAndAugmentInfoer[C graph.GetIDer, I groupsItemer] interface {
|
||||
getItemer[I]
|
||||
getItemMetadataer[C, I]
|
||||
augmentItemInfoer[C]
|
||||
supportsItemMetadataer[C, I]
|
||||
}
|
||||
|
||||
type augmentItemInfoer[C graph.GetIDer] interface {
|
||||
@ -51,6 +54,17 @@ type getItemer[I groupsItemer] interface {
|
||||
) (I, *details.GroupsInfo, error)
|
||||
}
|
||||
|
||||
type getItemMetadataer[C graph.GetIDer, I groupsItemer] interface {
|
||||
getItemMetadata(
|
||||
ctx context.Context,
|
||||
c C,
|
||||
) (io.ReadCloser, int, error)
|
||||
}
|
||||
|
||||
type supportsItemMetadataer[C graph.GetIDer, I groupsItemer] interface {
|
||||
supportsItemMetadata() bool
|
||||
}
|
||||
|
||||
// gets all containers for the resource
|
||||
type getContainerser[C graph.GetIDer] interface {
|
||||
getContainers(
|
||||
|
||||
@ -0,0 +1,8 @@
|
||||
package metadata
|
||||
|
||||
// ConversationPostMetadata stores metadata for a given conversation post,
|
||||
// stored as a .meta file in kopia.
|
||||
type ConversationPostMetadata struct {
|
||||
Recipients []string `json:"recipients,omitempty"`
|
||||
Topic string `json:"topic,omitempty"`
|
||||
}
|
||||
@ -82,7 +82,7 @@ func (h *baseGroupsHandler) ProduceExportCollections(
|
||||
)
|
||||
|
||||
switch cat {
|
||||
case path.ChannelMessagesCategory:
|
||||
case path.ChannelMessagesCategory, path.ConversationPostsCategory:
|
||||
folders = append(folders, fp.Folders()...)
|
||||
|
||||
coll = groups.NewExportCollection(
|
||||
@ -90,7 +90,8 @@ func (h *baseGroupsHandler) ProduceExportCollections(
|
||||
[]data.RestoreCollection{restoreColl},
|
||||
backupVersion,
|
||||
exportCfg,
|
||||
stats)
|
||||
stats,
|
||||
cat)
|
||||
|
||||
case path.LibrariesCategory:
|
||||
drivePath, err := path.ToDrivePath(restoreColl.FullPath())
|
||||
|
||||
@ -143,7 +143,8 @@ func makeRestorePathsForEntry(
|
||||
switch true {
|
||||
case ent.Exchange != nil ||
|
||||
(ent.Groups != nil && ent.Groups.ItemType == details.GroupsChannelMessage) ||
|
||||
(ent.SharePoint != nil && ent.SharePoint.ItemType == details.SharePointList):
|
||||
(ent.SharePoint != nil && ent.SharePoint.ItemType == details.SharePointList) ||
|
||||
(ent.Groups != nil && ent.Groups.ItemType == details.GroupsConversationPost):
|
||||
// TODO(ashmrtn): Eventually make Events have it's own function to handle
|
||||
// setting the restore destination properly.
|
||||
res.RestorePath, err = basicLocationPath(repoRef, locRef)
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
package metadata
|
||||
|
||||
import "strings"
|
||||
import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
MetaFileSuffix = ".meta"
|
||||
|
||||
@ -9,9 +9,12 @@ func IsMetadataFile(p path.Path) bool {
|
||||
case path.OneDriveService:
|
||||
return HasMetaSuffix(p.Item())
|
||||
|
||||
case path.SharePointService, path.GroupsService:
|
||||
case path.SharePointService:
|
||||
return p.Category() == path.LibrariesCategory && HasMetaSuffix(p.Item())
|
||||
|
||||
case path.GroupsService:
|
||||
return p.Category() == path.LibrariesCategory && HasMetaSuffix(p.Item()) ||
|
||||
p.Category() == path.ConversationPostsCategory && HasMetaSuffix(p.Item())
|
||||
default:
|
||||
return false
|
||||
}
|
||||
|
||||
@ -152,3 +152,108 @@ func (suite *MetadataUnitSuite) TestIsMetadataFile_Directories() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *MetadataUnitSuite) TestIsMetadataFile() {
|
||||
table := []struct {
|
||||
name string
|
||||
service path.ServiceType
|
||||
category path.CategoryType
|
||||
isMetaFile bool
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "onedrive .data file",
|
||||
service: path.OneDriveService,
|
||||
category: path.FilesCategory,
|
||||
},
|
||||
{
|
||||
name: "sharepoint library .data file",
|
||||
service: path.SharePointService,
|
||||
category: path.LibrariesCategory,
|
||||
},
|
||||
{
|
||||
name: "group library .data file",
|
||||
service: path.GroupsService,
|
||||
category: path.LibrariesCategory,
|
||||
},
|
||||
{
|
||||
name: "group conversations .data file",
|
||||
service: path.GroupsService,
|
||||
category: path.ConversationPostsCategory,
|
||||
},
|
||||
{
|
||||
name: "onedrive .meta file",
|
||||
service: path.OneDriveService,
|
||||
category: path.FilesCategory,
|
||||
isMetaFile: true,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "sharepoint library .meta file",
|
||||
service: path.SharePointService,
|
||||
category: path.LibrariesCategory,
|
||||
isMetaFile: true,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "group library .meta file",
|
||||
service: path.GroupsService,
|
||||
category: path.LibrariesCategory,
|
||||
isMetaFile: true,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "group conversations .meta file",
|
||||
service: path.GroupsService,
|
||||
category: path.ConversationPostsCategory,
|
||||
isMetaFile: true,
|
||||
expected: true,
|
||||
},
|
||||
// For services which don't have metadata files, make sure the function
|
||||
// returns false. We don't want .meta suffix (assuming it exists) in
|
||||
// these cases to be interpreted as metadata files.
|
||||
{
|
||||
name: "exchange service",
|
||||
service: path.ExchangeService,
|
||||
category: path.EmailCategory,
|
||||
isMetaFile: true,
|
||||
},
|
||||
{
|
||||
name: "group channels",
|
||||
service: path.GroupsService,
|
||||
category: path.ChannelMessagesCategory,
|
||||
isMetaFile: true,
|
||||
},
|
||||
{
|
||||
name: "lists",
|
||||
service: path.SharePointService,
|
||||
category: path.ListsCategory,
|
||||
isMetaFile: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range table {
|
||||
suite.Run(test.name, func() {
|
||||
t := suite.T()
|
||||
|
||||
fileName := "file"
|
||||
if test.isMetaFile {
|
||||
fileName += metadata.MetaFileSuffix
|
||||
} else {
|
||||
fileName += metadata.DataFileSuffix
|
||||
}
|
||||
|
||||
p, err := path.Build(
|
||||
"t",
|
||||
"u",
|
||||
test.service,
|
||||
test.category,
|
||||
true,
|
||||
"some", "path", "for", fileName)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
actual := metadata.IsMetadataFile(p)
|
||||
assert.Equal(t, test.expected, actual)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -629,6 +629,32 @@ func BytesToMessageable(body []byte) (models.Messageable, error) {
|
||||
return v.(models.Messageable), nil
|
||||
}
|
||||
|
||||
func bytesToPostable(body []byte) (models.Postable, error) {
|
||||
v, err := CreateFromBytes(body, models.CreatePostFromDiscriminatorValue)
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), invalidJSON) {
|
||||
return nil, clues.Wrap(err, "deserializing bytes to message")
|
||||
}
|
||||
|
||||
// If the JSON was invalid try sanitizing and deserializing again.
|
||||
// Sanitizing should transform characters < 0x20 according to the spec where
|
||||
// possible. The resulting JSON may still be invalid though.
|
||||
body = sanitize.JSONBytes(body)
|
||||
v, err = CreateFromBytes(body, models.CreateMessageFromDiscriminatorValue)
|
||||
}
|
||||
|
||||
return v.(models.Postable), clues.Stack(err).OrNil()
|
||||
}
|
||||
|
||||
func BytesToPostable(body []byte) (models.Postable, error) {
|
||||
v, err := bytesToPostable(body)
|
||||
if err != nil {
|
||||
return nil, clues.Stack(err)
|
||||
}
|
||||
|
||||
return v.(models.Postable), nil
|
||||
}
|
||||
|
||||
func (c Mail) Serialize(
|
||||
ctx context.Context,
|
||||
item serialization.Parsable,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user