Compare commits

...

20 Commits

Author SHA1 Message Date
Abhishek Pandey
c4984e9309 With conv export 2024-01-30 23:44:13 -08:00
Abhishek Pandey
c8adfc8f72 conv export sanity 2024-01-30 22:17:07 -08:00
Abhishek Pandey
fb420b11e7 Reduce tests 2024-01-30 21:25:14 -08:00
Abhishek Pandey
989b3c0bb8 Remove teams notif 2024-01-30 21:11:19 -08:00
Abhishek Pandey
3fab788ce0 conv sanity test 2024-01-30 21:08:46 -08:00
Abhishek Pandey
3a41cc1056 Add outline for exports 2024-01-29 09:19:11 -08:00
Abhishek Pandey
08cff81c76 Add more tests 2024-01-29 00:13:02 -08:00
Abhishek Pandey
0199c9ad68 Add group mailbox exports 2024-01-29 00:13:02 -08:00
Abhishek Pandey
ec729fb509 Add newline 2024-01-29 00:04:04 -08:00
Abhishek Pandey
8a74bc893c Convert serialized posts to eml 2024-01-29 00:04:04 -08:00
Abhishek Pandey
b0a8b310d3 Add tests for prefetch streamItems 2024-01-29 00:03:06 -08:00
Abhishek Pandey
152d78de21 Refactor metadata read code 2024-01-26 19:27:28 -08:00
Abhishek Pandey
eb6376e641 Rebase changes 2024-01-26 19:27:28 -08:00
Abhishek Pandey
9f68f379e7 Fix rebase failures 2024-01-26 19:27:28 -08:00
Abhishek Pandey
5230e60b24 Differentiate file suffixes between channel messages and conversations 2024-01-26 19:27:28 -08:00
Abhishek Pandey
7477c58698 Fix failing unit test 2024-01-26 19:25:21 -08:00
Abhishek Pandey
deaf8dcd95 Fix file name 2024-01-26 18:27:41 -08:00
Abhishek Pandey
3e2268e524 Move conv metadata definiton 2024-01-26 14:20:11 -08:00
Abhishek Pandey
abca65daa9 Add tests 2024-01-26 14:20:11 -08:00
Abhishek Pandey
41eb63686d Persist metadata files for group mailboxes 2024-01-26 14:20:11 -08:00
26 changed files with 1626 additions and 116 deletions

View File

@ -3,6 +3,7 @@ on:
push: push:
branches: branches:
- main - main
- conv_sanity
workflow_dispatch: workflow_dispatch:
inputs: inputs:
user: user:
@ -533,4 +534,4 @@ jobs:
uses: ./.github/actions/teams-message uses: ./.github/actions/teams-message
with: with:
msg: "[FAILED] Sanity Tests" msg: "[FAILED] Sanity Tests"
teams_url: ${{ secrets.TEAMS_CORSO_CI_WEBHOOK_URL }} #teams_url: ${{ secrets.TEAMS_CORSO_CI_WEBHOOK_URL }}

View File

@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Update repo init configuration to reduce the total number of GET requests sent - Update repo init configuration to reduce the total number of GET requests sent
to the object store when using corso. This affects repos that have many to the object store when using corso. This affects repos that have many
backups created in them per day the most. backups created in them per day the most.
- Group mailbox emails can now be exported as `.eml` files.
### Fixed ### Fixed
- Retry transient 400 "invalidRequest" errors during onedrive & sharepoint backup. - Retry transient 400 "invalidRequest" errors during onedrive & sharepoint backup.

View File

@ -7,7 +7,6 @@ import (
"github.com/alcionai/corso/src/cli/flags" "github.com/alcionai/corso/src/cli/flags"
"github.com/alcionai/corso/src/cli/utils" "github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/selectors"
) )
// called by export.go to map subcommands to provider-specific handling. // called by export.go to map subcommands to provider-specific handling.
@ -93,10 +92,6 @@ func exportGroupsCmd(cmd *cobra.Command, args []string) error {
sel := utils.IncludeGroupsRestoreDataSelectors(ctx, opts) sel := utils.IncludeGroupsRestoreDataSelectors(ctx, opts)
utils.FilterGroupsRestoreInfoSelectors(sel, opts) utils.FilterGroupsRestoreInfoSelectors(sel, opts)
// TODO(pandeyabs): Exclude conversations from export since they are not
// supported yet. https://github.com/alcionai/corso/issues/4822
sel.Exclude(sel.Conversation(selectors.Any()))
acceptedGroupsFormatTypes := []string{ acceptedGroupsFormatTypes := []string{
string(control.DefaultFormat), string(control.DefaultFormat),
string(control.JSONFormat), string(control.JSONFormat),

View File

@ -3,13 +3,11 @@ package export
import ( import (
"context" "context"
"io/fs" "io/fs"
"path/filepath"
"strings" "strings"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/cmd/sanity_test/common" "github.com/alcionai/corso/src/cmd/sanity_test/common"
"github.com/alcionai/corso/src/cmd/sanity_test/driveish"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api"
@ -23,53 +21,174 @@ func CheckGroupsExport(
// assumes we only need to sanity check the default site. // assumes we only need to sanity check the default site.
// should we expand this to check all sites in the group? // should we expand this to check all sites in the group?
// are we backing up / restoring more than the default site? // are we backing up / restoring more than the default site?
site, err := ac.Sites().GetByID(ctx, envs.TeamSiteID, api.CallConfig{}) // site, err := ac.Sites().GetByID(ctx, envs.TeamSiteID, api.CallConfig{})
if err != nil { // if err != nil {
common.Fatal(ctx, "getting the drive:", err) // common.Fatal(ctx, "getting the drive:", err)
} // }
drive, err := ac.Sites().GetDefaultDrive(ctx, envs.TeamSiteID) // drive, err := ac.Sites().GetDefaultDrive(ctx, envs.TeamSiteID)
if err != nil { // if err != nil {
common.Fatal(ctx, "getting the drive:", err) // common.Fatal(ctx, "getting the drive:", err)
} // }
checkChannelMessagesExport( // checkChannelMessagesExport(
// ctx,
// ac,
// envs)
checkConversationPostExports(
ctx, ctx,
ac, ac,
envs) envs)
envs.RestoreContainer = filepath.Join( // envs.RestoreContainer = filepath.Join(
envs.RestoreContainer, // envs.RestoreContainer,
"Libraries", // "Libraries",
ptr.Val(site.GetName()), // ptr.Val(site.GetName()),
"Documents") // check in default loc // "Documents") // check in default loc
driveish.CheckExport( // driveish.CheckExport(
ctx, // ctx,
ac, // ac,
drive, // drive,
envs) // envs)
} }
func removeDeletedMessages( // func removeDeletedMessages(
ctx context.Context, // ctx context.Context,
tree *common.Sanitree[models.Channelable, models.ChatMessageable], // tree *common.Sanitree[models.Channelable, models.ChatMessageable],
) { // ) {
for _, child := range tree.Children { // for _, child := range tree.Children {
for _, leaf := range child.Leaves { // for _, leaf := range child.Leaves {
if leaf.Self.GetDeletedDateTime() != nil { // if leaf.Self.GetDeletedDateTime() != nil {
delete(child.Leaves, leaf.ID) // delete(child.Leaves, leaf.ID)
child.CountLeaves-- // child.CountLeaves--
} // }
} // }
} // }
} // }
func checkChannelMessagesExport( // func checkChannelMessagesExport(
// ctx context.Context,
// ac api.Client,
// envs common.Envs,
// ) {
// sourceTree := populateMessagesSanitree(
// ctx,
// ac,
// envs.GroupID)
// fpTree := common.BuildFilepathSanitree(ctx, envs.RestoreContainer)
// comparator := func(
// ctx context.Context,
// expect *common.Sanitree[models.Channelable, models.ChatMessageable],
// result *common.Sanitree[fs.FileInfo, fs.FileInfo],
// ) {
// for key := range expect.Leaves {
// expect.Leaves[key].Size = 0 // msg sizes cannot be compared
// }
// updatedResultLeaves := map[string]*common.Sanileaf[fs.FileInfo, fs.FileInfo]{}
// for key, leaf := range result.Leaves {
// key = strings.TrimSuffix(key, ".json")
// leaf.Size = 0 // we cannot compare sizes
// updatedResultLeaves[key] = leaf
// }
// common.CompareLeaves(ctx, expect.Leaves, updatedResultLeaves, nil)
// }
// removeDeletedMessages(ctx, sourceTree)
// common.CompareDiffTrees(
// ctx,
// sourceTree,
// fpTree.Children["Messages"],
// comparator)
// common.Infof(ctx, "Success")
// }
// func populateMessagesSanitree(
// ctx context.Context,
// ac api.Client,
// groupID string,
// ) *common.Sanitree[models.Channelable, models.ChatMessageable] {
// root := &common.Sanitree[models.Channelable, models.ChatMessageable]{
// ID: groupID,
// Name: path.ChannelMessagesCategory.HumanString(),
// // group should not have leaves
// Children: map[string]*common.Sanitree[models.Channelable, models.ChatMessageable]{},
// }
// channels, err := ac.Channels().GetChannels(ctx, groupID)
// if err != nil {
// common.Fatal(ctx, "getting channels", err)
// }
// for _, ch := range channels {
// child := &common.Sanitree[
// models.Channelable, models.ChatMessageable,
// ]{
// Parent: root,
// ID: ptr.Val(ch.GetId()),
// Name: ptr.Val(ch.GetDisplayName()),
// Leaves: map[string]*common.Sanileaf[models.Channelable, models.ChatMessageable]{},
// // no children in channels
// }
// msgs, err := ac.Channels().GetChannelMessages(
// ctx,
// groupID,
// ptr.Val(ch.GetId()),
// api.CallConfig{
// // include all nessage replies in each message
// Expand: []string{"replies"},
// })
// if err != nil {
// common.Fatal(ctx, "getting channel messages", err)
// }
// filteredMsgs := []models.ChatMessageable{}
// for _, msg := range msgs {
// // filter out system messages (we don't really work with them)
// if api.IsNotSystemMessage(msg) {
// filteredMsgs = append(filteredMsgs, msg)
// }
// }
// if len(filteredMsgs) == 0 {
// common.Infof(ctx, "skipped empty channel: %s", ptr.Val(ch.GetDisplayName()))
// continue
// }
// for _, msg := range filteredMsgs {
// child.CountLeaves++
// child.Leaves[ptr.Val(msg.GetId())] = &common.Sanileaf[
// models.Channelable,
// models.ChatMessageable,
// ]{
// Self: msg,
// ID: ptr.Val(msg.GetId()),
// Name: ptr.Val(msg.GetId()), // channel messages have no display name
// Size: int64(len(msg.GetReplies())), // size is the count of replies
// }
// }
// root.Children[ptr.Val(ch.GetDisplayName())] = child
// }
// return root
// }
func checkConversationPostExports(
ctx context.Context, ctx context.Context,
ac api.Client, ac api.Client,
envs common.Envs, envs common.Envs,
) { ) {
sourceTree := populateMessagesSanitree( sourceTree := populatePostsSanitree(
ctx, ctx,
ac, ac,
envs.GroupID) envs.GroupID)
@ -78,7 +197,7 @@ func checkChannelMessagesExport(
comparator := func( comparator := func(
ctx context.Context, ctx context.Context,
expect *common.Sanitree[models.Channelable, models.ChatMessageable], expect *common.Sanitree[models.Conversationable, models.Postable],
result *common.Sanitree[fs.FileInfo, fs.FileInfo], result *common.Sanitree[fs.FileInfo, fs.FileInfo],
) { ) {
for key := range expect.Leaves { for key := range expect.Leaves {
@ -88,7 +207,7 @@ func checkChannelMessagesExport(
updatedResultLeaves := map[string]*common.Sanileaf[fs.FileInfo, fs.FileInfo]{} updatedResultLeaves := map[string]*common.Sanileaf[fs.FileInfo, fs.FileInfo]{}
for key, leaf := range result.Leaves { for key, leaf := range result.Leaves {
key = strings.TrimSuffix(key, ".json") key = strings.TrimSuffix(key, ".eml")
leaf.Size = 0 // we cannot compare sizes leaf.Size = 0 // we cannot compare sizes
updatedResultLeaves[key] = leaf updatedResultLeaves[key] = leaf
} }
@ -96,85 +215,74 @@ func checkChannelMessagesExport(
common.CompareLeaves(ctx, expect.Leaves, updatedResultLeaves, nil) common.CompareLeaves(ctx, expect.Leaves, updatedResultLeaves, nil)
} }
removeDeletedMessages(ctx, sourceTree)
common.CompareDiffTrees( common.CompareDiffTrees(
ctx, ctx,
sourceTree, sourceTree,
fpTree.Children["Messages"], fpTree.Children[path.ConversationPostsCategory.HumanString()],
comparator) comparator)
common.Infof(ctx, "Success") common.Infof(ctx, "Success")
} }
func populateMessagesSanitree( func populatePostsSanitree(
ctx context.Context, ctx context.Context,
ac api.Client, ac api.Client,
groupID string, groupID string,
) *common.Sanitree[models.Channelable, models.ChatMessageable] { ) *common.Sanitree[models.Conversationable, models.Postable] {
root := &common.Sanitree[models.Channelable, models.ChatMessageable]{ root := &common.Sanitree[models.Conversationable, models.Postable]{
ID: groupID, ID: groupID,
Name: path.ChannelMessagesCategory.HumanString(), Name: path.ConversationPostsCategory.HumanString(),
// group should not have leaves // group should not have leaves
Children: map[string]*common.Sanitree[models.Channelable, models.ChatMessageable]{}, Children: map[string]*common.Sanitree[models.Conversationable, models.Postable]{},
} }
channels, err := ac.Channels().GetChannels(ctx, groupID) convs, err := ac.Conversations().GetConversations(ctx, groupID, api.CallConfig{})
if err != nil { if err != nil {
common.Fatal(ctx, "getting channels", err) common.Fatal(ctx, "getting conversations", err)
} }
for _, ch := range channels { for _, ch := range convs {
child := &common.Sanitree[ child := &common.Sanitree[
models.Channelable, models.ChatMessageable, models.Conversationable, models.Postable,
]{ ]{
Parent: root, Parent: root,
ID: ptr.Val(ch.GetId()), ID: ptr.Val(ch.GetId()),
Name: ptr.Val(ch.GetDisplayName()), Name: ptr.Val(ch.GetTopic()),
Leaves: map[string]*common.Sanileaf[models.Channelable, models.ChatMessageable]{}, Leaves: map[string]*common.Sanileaf[models.Conversationable, models.Postable]{},
// no children in channels // no children in channels
} }
msgs, err := ac.Channels().GetChannelMessages( threads, err := ac.Conversations().GetConversationThreads(
ctx, ctx,
groupID, groupID,
ptr.Val(ch.GetId()), ptr.Val(ch.GetId()),
api.CallConfig{ api.CallConfig{})
// include all nessage replies in each message
Expand: []string{"replies"},
})
if err != nil { if err != nil {
common.Fatal(ctx, "getting channel messages", err) common.Fatal(ctx, "getting conversation threads", err)
} }
filteredMsgs := []models.ChatMessageable{} posts, err := ac.Conversations().GetConversationThreadPosts(
ctx,
for _, msg := range msgs { groupID,
// filter out system messages (we don't really work with them) ptr.Val(ch.GetId()),
if api.IsNotSystemMessage(msg) { ptr.Val(threads[0].GetId()),
filteredMsgs = append(filteredMsgs, msg) api.CallConfig{})
} if err != nil {
common.Fatal(ctx, "getting conversation posts", err)
} }
if len(filteredMsgs) == 0 { for _, msg := range posts {
common.Infof(ctx, "skipped empty channel: %s", ptr.Val(ch.GetDisplayName()))
continue
}
for _, msg := range filteredMsgs {
child.CountLeaves++ child.CountLeaves++
child.Leaves[ptr.Val(msg.GetId())] = &common.Sanileaf[ child.Leaves[ptr.Val(msg.GetId())] = &common.Sanileaf[
models.Channelable, models.Conversationable,
models.ChatMessageable, models.Postable,
]{ ]{
Self: msg, Self: msg,
ID: ptr.Val(msg.GetId()), ID: ptr.Val(msg.GetId()),
Name: ptr.Val(msg.GetId()), // channel messages have no display name
Size: int64(len(msg.GetReplies())), // size is the count of replies
} }
} }
root.Children[ptr.Val(ch.GetDisplayName())] = child root.Children[ptr.Val(ch.GetTopic())] = child
} }
return root return root

View File

@ -23,6 +23,7 @@ import (
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/str" "github.com/alcionai/corso/src/internal/common/str"
"github.com/alcionai/corso/src/internal/converters/ics" "github.com/alcionai/corso/src/internal/converters/ics"
"github.com/alcionai/corso/src/internal/m365/collection/groups/metadata"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api"
) )
@ -303,3 +304,137 @@ func FromJSON(ctx context.Context, body []byte) (string, error) {
return email.GetMessage(), nil return email.GetMessage(), nil
} }
//-------------------------------------------------------------
// Postable -> EML
//-------------------------------------------------------------
// FromJSONPostToEML converts a postable (as json) to .eml format.
// TODO(pandeyabs): This is a stripped down copy of messageable to
// eml conversion, it can be folded into one function by having a post
// to messageable converter.
func FromJSONPostToEML(
ctx context.Context,
body []byte,
postMetadata metadata.ConversationPostMetadata,
) (string, error) {
ctx = clues.Add(ctx, "body_len", len(body))
data, err := api.BytesToPostable(body)
if err != nil {
return "", clues.WrapWC(ctx, err, "converting to postable")
}
ctx = clues.Add(ctx, "item_id", ptr.Val(data.GetId()))
email := mail.NewMSG()
email.Encoding = mail.EncodingBase64 // Doing it to be safe for when we have eventMessage (newline issues)
email.AllowDuplicateAddress = true // More "correct" conversion
email.AddBccToHeader = true // Don't ignore Bcc
email.AllowEmptyAttachments = true // Don't error on empty attachments
email.UseProvidedAddress = true // Don't try to parse the email address
if data.GetFrom() != nil {
email.SetFrom(formatAddress(data.GetFrom().GetEmailAddress()))
}
// We don't have the To, Cc, Bcc recipient information for posts due to a graph
// limitation. All posts carry the group email address as the only recipient
// for now.
email.AddTo(postMetadata.Recipients...)
email.SetSubject(postMetadata.Topic)
// Reply-To email address is not available for posts. Note that this is different
// from inReplyTo field.
if data.GetReceivedDateTime() != nil {
email.SetDate(ptr.Val(data.GetReceivedDateTime()).Format(dateFormat))
}
if data.GetBody() != nil {
if data.GetBody().GetContentType() != nil {
var contentType mail.ContentType
switch data.GetBody().GetContentType().String() {
case "html":
contentType = mail.TextHTML
case "text":
contentType = mail.TextPlain
default:
// https://learn.microsoft.com/en-us/graph/api/resources/itembody?view=graph-rest-1.0#properties
// This should not be possible according to the documentation
logger.Ctx(ctx).
With("body_type", data.GetBody().GetContentType().String()).
Info("unknown body content type")
contentType = mail.TextPlain
}
email.SetBody(contentType, ptr.Val(data.GetBody().GetContent()))
}
}
if data.GetAttachments() != nil {
for _, attachment := range data.GetAttachments() {
kind := ptr.Val(attachment.GetContentType())
bytes, err := attachment.GetBackingStore().Get("contentBytes")
if err != nil {
return "", clues.WrapWC(ctx, err, "failed to get attachment bytes").
With("kind", kind)
}
if bytes == nil {
// TODO(meain): Handle non file attachments
// https://github.com/alcionai/corso/issues/4772
//
// TODO(pandeyabs): Above issue is for messages.
// This is not a problem for posts but leaving it here for safety.
logger.Ctx(ctx).
With("attachment_id", ptr.Val(attachment.GetId()),
"attachment_type", ptr.Val(attachment.GetOdataType())).
Info("no contentBytes for attachment")
continue
}
bts, ok := bytes.([]byte)
if !ok {
return "", clues.WrapWC(ctx, err, "invalid content bytes").
With("kind", kind).
With("interface_type", fmt.Sprintf("%T", bytes))
}
name := ptr.Val(attachment.GetName())
contentID, err := attachment.GetBackingStore().Get("contentId")
if err != nil {
return "", clues.WrapWC(ctx, err, "getting content id for attachment").
With("kind", kind)
}
if contentID != nil {
cids, _ := str.AnyToString(contentID)
if len(cids) > 0 {
name = cids
}
}
email.Attach(&mail.File{
// cannot use filename as inline attachment will not get mapped properly
Name: name,
MimeType: kind,
Data: bts,
Inline: ptr.Val(attachment.GetIsInline()),
})
}
}
// Note: Posts cannot be of type EventMessageResponse, EventMessage or
// CalendarSharingMessage. So we don't need to handle those cases here.
if err = email.GetError(); err != nil {
return "", clues.WrapWC(ctx, err, "converting to eml")
}
return email.GetMessage(), nil
}

View File

@ -18,6 +18,8 @@ import (
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/converters/eml/testdata" "github.com/alcionai/corso/src/internal/converters/eml/testdata"
"github.com/alcionai/corso/src/internal/converters/ics" "github.com/alcionai/corso/src/internal/converters/ics"
"github.com/alcionai/corso/src/internal/m365/collection/groups/metadata"
stub "github.com/alcionai/corso/src/internal/m365/service/groups/mock"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api"
) )
@ -325,3 +327,74 @@ func (suite *EMLUnitSuite) TestConvert_eml_ics_from_event_obj() {
assert.NotEqual(t, ptr.Val(msg.GetSubject()), event.GetProperty(ical.ComponentPropertySummary).Value) assert.NotEqual(t, ptr.Val(msg.GetSubject()), event.GetProperty(ical.ComponentPropertySummary).Value)
assert.Equal(t, ptr.Val(evt.GetSubject()), event.GetProperty(ical.ComponentPropertySummary).Value) assert.Equal(t, ptr.Val(evt.GetSubject()), event.GetProperty(ical.ComponentPropertySummary).Value)
} }
//-------------------------------------------------------------
// Postable -> EML tests
//-------------------------------------------------------------
func (suite *EMLUnitSuite) TestConvert_postable_to_eml() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
body := []byte(stub.PostWithAttachments)
postMetadata := metadata.ConversationPostMetadata{
Recipients: []string{"group@example.com"},
Topic: "test subject",
}
out, err := FromJSONPostToEML(ctx, body, postMetadata)
assert.NoError(t, err, "converting to eml")
post, err := api.BytesToPostable(body)
require.NoError(t, err, "creating post")
eml, err := enmime.ReadEnvelope(strings.NewReader(out))
require.NoError(t, err, "reading created eml")
assert.Equal(t, postMetadata.Topic, eml.GetHeader("Subject"))
assert.Equal(t, post.GetReceivedDateTime().Format(time.RFC1123Z), eml.GetHeader("Date"))
assert.Equal(t, formatAddress(post.GetFrom().GetEmailAddress()), eml.GetHeader("From"))
// Test recipients. The post metadata should contain the group email address.
tos := strings.Split(eml.GetHeader("To"), ", ")
for _, sourceTo := range postMetadata.Recipients {
assert.Contains(t, tos, sourceTo)
}
// Assert cc, bcc to be empty since they are not supported for posts right now.
assert.Equal(t, "", eml.GetHeader("Cc"))
assert.Equal(t, "", eml.GetHeader("Bcc"))
// Test attachments using PostWithAttachments data as a reference.
// This data has 1 direct attachment and 1 inline attachment.
assert.Equal(t, 1, len(eml.Attachments), "direct attachment count")
assert.Equal(t, 1, len(eml.Inlines), "inline attachment count")
for _, sourceAttachment := range post.GetAttachments() {
targetContent := eml.Attachments[0].Content
if ptr.Val(sourceAttachment.GetIsInline()) {
targetContent = eml.Inlines[0].Content
}
sourceContent, err := sourceAttachment.GetBackingStore().Get("contentBytes")
assert.NoError(t, err, "getting source attachment content")
assert.Equal(t, sourceContent, targetContent)
}
// Test body
source := strings.ReplaceAll(eml.HTML, "\n", "")
target := strings.ReplaceAll(ptr.Val(post.GetBody().GetContent()), "\n", "")
// replace the cid with a constant value to make the comparison
re := regexp.MustCompile(`(?:src|originalSrc)="cid:[^"]*"`)
source = re.ReplaceAllString(source, `src="cid:replaced"`)
target = re.ReplaceAllString(target, `src="cid:replaced"`)
assert.Equal(t, source, target)
}

View File

@ -2,6 +2,7 @@ package groups
import ( import (
"context" "context"
"io"
"testing" "testing"
"time" "time"
@ -60,6 +61,11 @@ func (bh mockBackupHandler) augmentItemInfo(
// no-op // no-op
} }
//lint:ignore U1000 false linter issue due to generics
func (bh mockBackupHandler) supportsItemMetadata() bool {
return false
}
func (bh mockBackupHandler) canMakeDeltaQueries() bool { func (bh mockBackupHandler) canMakeDeltaQueries() bool {
return true return true
} }
@ -136,6 +142,13 @@ func (bh mockBackupHandler) getItem(
return bh.messages[itemID], bh.info[itemID], bh.getMessageErr[itemID] return bh.messages[itemID], bh.info[itemID], bh.getMessageErr[itemID]
} }
func (bh mockBackupHandler) getItemMetadata(
_ context.Context,
_ models.Channelable,
) (io.ReadCloser, int, error) {
return nil, 0, errMetadataFilesNotSupported
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Unit Suite // Unit Suite
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -2,6 +2,7 @@ package groups
import ( import (
"context" "context"
"io"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
@ -105,6 +106,17 @@ func (bh channelsBackupHandler) getItem(
return bh.ac.GetChannelMessage(ctx, groupID, containerIDs[0], messageID) return bh.ac.GetChannelMessage(ctx, groupID, containerIDs[0], messageID)
} }
// Channel messages don't carry metadata files. Return unsupported error.
// Adding this method for interface compliance.
//
//lint:ignore U1000 false linter issue due to generics
func (bh channelsBackupHandler) getItemMetadata(
_ context.Context,
_ models.Channelable,
) (io.ReadCloser, int, error) {
return nil, 0, errMetadataFilesNotSupported
}
//lint:ignore U1000 false linter issue due to generics //lint:ignore U1000 false linter issue due to generics
func (bh channelsBackupHandler) augmentItemInfo( func (bh channelsBackupHandler) augmentItemInfo(
dgi *details.GroupsInfo, dgi *details.GroupsInfo,
@ -113,6 +125,12 @@ func (bh channelsBackupHandler) augmentItemInfo(
// no-op // no-op
} }
//lint:ignore U1000 false linter issue due to generics
func (bh channelsBackupHandler) supportsItemMetadata() bool {
// No .data and .meta files for channel messages
return false
}
func channelContainer(ch models.Channelable) container[models.Channelable] { func channelContainer(ch models.Channelable) container[models.Channelable] {
return container[models.Channelable]{ return container[models.Channelable]{
storageDirFolders: path.Elements{ptr.Val(ch.GetId())}, storageDirFolders: path.Elements{ptr.Val(ch.GetId())},

View File

@ -23,6 +23,7 @@ import (
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph" "github.com/alcionai/corso/src/pkg/services/m365/api/graph"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph/metadata"
) )
var ( var (
@ -30,6 +31,8 @@ var (
_ data.BackupCollection = &lazyFetchCollection[graph.GetIDer, groupsItemer]{} _ data.BackupCollection = &lazyFetchCollection[graph.GetIDer, groupsItemer]{}
) )
var errMetadataFilesNotSupported = clues.New("metadata files not supported")
const ( const (
collectionChannelBufferSize = 1000 collectionChannelBufferSize = 1000
numberOfRetries = 4 numberOfRetries = 4
@ -176,7 +179,15 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul
defer wg.Done() defer wg.Done()
defer func() { <-semaphoreCh }() defer func() { <-semaphoreCh }()
col.stream <- data.NewDeletedItem(id) // This is a no-op for conversations, as there is no way to detect
// deleted items. It might be added in future if graph supports it,
// so make sure we put up both .data and .meta files for deletions.
if col.getAndAugment.supportsItemMetadata() {
col.stream <- data.NewDeletedItem(id + metadata.DataFileSuffix)
col.stream <- data.NewDeletedItem(id + metadata.MetaFileSuffix)
} else {
col.stream <- data.NewDeletedItem(id)
}
atomic.AddInt64(&streamedItems, 1) atomic.AddInt64(&streamedItems, 1)
col.Counter.Inc(count.StreamItemsRemoved) col.Counter.Inc(count.StreamItemsRemoved)
@ -200,6 +211,18 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul
defer wg.Done() defer wg.Done()
defer func() { <-semaphoreCh }() defer func() { <-semaphoreCh }()
ictx := clues.Add(
ctx,
"item_id", id,
"parent_path", path.LoggableDir(col.LocationPath().String()))
// Conversation posts carry a .data suffix, while channel messages
// don't have any suffix. Metadata files are only supported for conversations.
dataFile := id
if col.getAndAugment.supportsItemMetadata() {
dataFile += metadata.DataFileSuffix
}
writer := kjson.NewJsonSerializationWriter() writer := kjson.NewJsonSerializationWriter()
defer writer.Close() defer writer.Close()
@ -234,24 +257,48 @@ func (col *prefetchCollection[C, I]) streamItems(ctx context.Context, errs *faul
info.ParentPath = col.LocationPath().String() info.ParentPath = col.LocationPath().String()
storeItem, err := data.NewPrefetchedItemWithInfo( dataItem, err := data.NewPrefetchedItemWithInfo(
io.NopCloser(bytes.NewReader(itemData)), io.NopCloser(bytes.NewReader(itemData)),
id, dataFile,
details.ItemInfo{Groups: info}) details.ItemInfo{Groups: info})
if err != nil { if err != nil {
err := clues.StackWC(ctx, err).Label(fault.LabelForceNoBackupCreation) err := clues.StackWC(ictx, err).Label(fault.LabelForceNoBackupCreation)
el.AddRecoverable(ctx, err) el.AddRecoverable(ictx, err)
return return
} }
col.stream <- storeItem // Handle metadata before data so that if metadata file fails,
// we are not left with an orphaned data file. No op for channel
// messages.
if col.getAndAugment.supportsItemMetadata() {
metaFile := id + metadata.MetaFileSuffix
// Use modTime from item info as it's the latest.
metaItem, err := downloadItemMeta[C, I](
ictx,
col.getAndAugment,
col.contains,
metaFile,
info.Modified,
el)
if err != nil {
return
}
col.stream <- metaItem
}
// Add data file to collection only if the metadata file(if any) was
// successfully added. This also helps to maintain consistency between
// prefetch & lazy fetch collection behaviors.
col.stream <- dataItem
atomic.AddInt64(&streamedItems, 1) atomic.AddInt64(&streamedItems, 1)
atomic.AddInt64(&totalBytes, info.Size) atomic.AddInt64(&totalBytes, info.Size)
if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 { if col.Counter.Inc(count.StreamItemsAdded)%1000 == 0 {
logger.Ctx(ctx).Infow("item stream progress", "stats", col.Counter.Values()) logger.Ctx(ictx).Infow("item stream progress", "stats", col.Counter.Values())
} }
col.Counter.Add(count.StreamBytesAdded, info.Size) col.Counter.Add(count.StreamBytesAdded, info.Size)
@ -341,7 +388,16 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau
defer wg.Done() defer wg.Done()
defer func() { <-semaphoreCh }() defer func() { <-semaphoreCh }()
col.stream <- data.NewDeletedItem(id) // This is a no-op for conversations, as there is no way to detect
// deleted items in a conversation. It might be added in the future
// if graph supports it, so make sure we put up both .data and .meta
// files for deletions.
if col.getAndAugment.supportsItemMetadata() {
col.stream <- data.NewDeletedItem(id + metadata.DataFileSuffix)
col.stream <- data.NewDeletedItem(id + metadata.MetaFileSuffix)
} else {
col.stream <- data.NewDeletedItem(id)
}
atomic.AddInt64(&streamedItems, 1) atomic.AddInt64(&streamedItems, 1)
col.Counter.Inc(count.StreamItemsRemoved) col.Counter.Inc(count.StreamItemsRemoved)
@ -370,6 +426,39 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau
"item_id", id, "item_id", id,
"parent_path", path.LoggableDir(col.LocationPath().String())) "parent_path", path.LoggableDir(col.LocationPath().String()))
// Channel message files don't carry .data suffix, post files do.
dataFile := id
// Handle metadata before data so that if metadata file fails,
// we are not left with an orphaned data file.
//
// If the data download fails for some reason other than deleted in
// flight, we will still end up persisting a .meta file. This is
// fine since the next backup will overwrite it.
//
// If item is deleted in flight, we will end up with an orphaned
// .meta file. The only impact here is storage bloat, which
// is minimal. Other impact could be if we do an in-order restore
// using a tree built from .meta files. We may have some .meta
// files without corresponding .data files.
if col.getAndAugment.supportsItemMetadata() {
dataFile += metadata.DataFileSuffix
metaFile := id + metadata.MetaFileSuffix
metaItem, err := downloadItemMeta[C, I](
ictx,
col.getAndAugment,
col.contains,
metaFile,
modTime,
el)
if err != nil {
return
}
col.stream <- metaItem
}
col.stream <- data.NewLazyItemWithInfo( col.stream <- data.NewLazyItemWithInfo(
ictx, ictx,
&lazyItemGetter[C, I]{ &lazyItemGetter[C, I]{
@ -381,7 +470,7 @@ func (col *lazyFetchCollection[C, I]) streamItems(ctx context.Context, errs *fau
contains: col.contains, contains: col.contains,
parentPath: col.LocationPath().String(), parentPath: col.LocationPath().String(),
}, },
id, dataFile,
modTime, modTime,
col.Counter, col.Counter,
el) el)
@ -463,3 +552,36 @@ func (lig *lazyItemGetter[C, I]) GetData(
false, false,
nil nil
} }
func downloadItemMeta[C graph.GetIDer, I groupsItemer](
ctx context.Context,
gim getItemMetadataer[C, I],
c container[C],
metaFile string,
modTime time.Time,
errs *fault.Bus,
) (data.Item, error) {
itemMeta, _, err := gim.getItemMetadata(
ctx,
c.container)
if err != nil {
errs.AddRecoverable(ctx, clues.StackWC(ctx, err))
return nil, err
}
// Skip adding progress reader for metadata files. It doesn't add
// much value.
storeItem, err := data.NewPrefetchedItem(
itemMeta,
metaFile,
// Use the same last modified time as post's.
modTime)
if err != nil {
errs.AddRecoverable(ctx, clues.StackWC(ctx, err))
return nil, err
}
return storeItem, nil
}

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"io" "io"
"slices" "slices"
"strings"
"testing" "testing"
"time" "time"
@ -13,6 +14,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/common/readers"
@ -165,12 +167,26 @@ func (m getAndAugmentChannelMessage) getItem(
return msg, &details.GroupsInfo{}, m.Err return msg, &details.GroupsInfo{}, m.Err
} }
//lint:ignore U1000 false linter issue due to generics
func (getAndAugmentChannelMessage) getItemMetadata(
_ context.Context,
_ models.Channelable,
) (io.ReadCloser, int, error) {
return nil, 0, errMetadataFilesNotSupported
}
//lint:ignore U1000 false linter issue due to generics //lint:ignore U1000 false linter issue due to generics
func (getAndAugmentChannelMessage) augmentItemInfo(*details.GroupsInfo, models.Channelable) { func (getAndAugmentChannelMessage) augmentItemInfo(*details.GroupsInfo, models.Channelable) {
// no-op // no-op
} }
func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() { //lint:ignore U1000 false linter issue due to generics
func (getAndAugmentChannelMessage) supportsItemMetadata() bool {
return false
}
// Test prefetch streamItems() for channel messages.
func (suite *CollectionUnitSuite) TestPrefetchCollection_ChannelMessages() {
var ( var (
t = suite.T() t = suite.T()
start = time.Now().Add(-1 * time.Second) start = time.Now().Add(-1 * time.Second)
@ -249,6 +265,11 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() {
go col.streamItems(ctx, errs) go col.streamItems(ctx, errs)
for item := range col.stream { for item := range col.stream {
// Make sure item IDs don't have .data and .meta suffixes.
// Those are only meant for conversations.
assert.False(t, strings.HasSuffix(item.ID(), ".data"))
assert.False(t, strings.HasSuffix(item.ID(), ".meta"))
itemCount++ itemCount++
_, aok := test.added[item.ID()] _, aok := test.added[item.ID()]
@ -277,8 +298,164 @@ func (suite *CollectionUnitSuite) TestPrefetchCollection_streamItems() {
} }
} }
// Test prefetch streamItems() for conversations.
func (suite *CollectionUnitSuite) TestPrefetchCollection_Conversations() {
var (
t = suite.T()
start = time.Now().Add(-1 * time.Second)
statusUpdater = func(*support.ControllerOperationStatus) {}
)
fullPath, err := path.Build("t", "pr", path.GroupsService, path.ConversationPostsCategory, false, "f", "s")
require.NoError(t, err, clues.ToCore(err))
locPath, err := path.Build("t", "pr", path.GroupsService, path.ConversationPostsCategory, false, "f", "s")
require.NoError(t, err, clues.ToCore(err))
table := []struct {
name string
added map[string]time.Time
removed map[string]struct{}
}{
{
name: "no items",
},
{
name: "only added items",
added: map[string]time.Time{
"fisher": {},
"flannigan": {},
"fitzbog": {},
},
},
{
name: "only removed items",
removed: map[string]struct{}{
"princess": {},
"poppy": {},
"petunia": {},
},
},
{
name: "added and removed items",
added: map[string]time.Time{
"goblin": {},
},
removed: map[string]struct{}{
"general": {},
"goose": {},
"grumbles": {},
},
},
}
for _, test := range table {
suite.Run(test.name, func() {
var (
t = suite.T()
errs = fault.New(true)
itemCount int
itemMap = map[string]data.Item{}
)
ctx, flush := tester.NewContext(t)
defer flush()
col := &prefetchCollection[models.Conversationable, models.Postable]{
BaseCollection: data.NewBaseCollection(
fullPath,
nil,
locPath.ToBuilder(),
control.DefaultOptions(),
false,
count.New()),
added: test.added,
contains: container[models.Conversationable]{},
removed: test.removed,
getAndAugment: &getAndAugmentConversation{},
stream: make(chan data.Item),
statusUpdater: statusUpdater,
}
go col.streamItems(ctx, errs)
for item := range col.stream {
var trimmedID string
switch {
case strings.HasSuffix(item.ID(), ".data"):
trimmedID = strings.TrimSuffix(item.ID(), ".data")
case strings.HasSuffix(item.ID(), ".meta"):
trimmedID = strings.TrimSuffix(item.ID(), ".meta")
default:
assert.Fail(t, "unexpected item suffix: %s", item.ID())
}
itemCount++
itemMap[item.ID()] = item
_, aok := test.added[trimmedID]
if aok {
assert.False(t, item.Deleted(), "additions should not be marked as deleted")
}
_, rok := test.removed[trimmedID]
if rok {
assert.True(t, item.Deleted(), "removals should be marked as deleted")
dimt, ok := item.(data.ItemModTime)
require.True(t, ok, "item implements data.ItemModTime")
assert.True(t, dimt.ModTime().After(start), "deleted items should set mod time to now()")
}
assert.True(t, aok || rok, "item must be either added or removed: %q", item.ID())
}
assert.NoError(t, errs.Failure())
assert.Equal(
t,
2*(len(test.added)+len(test.removed)),
itemCount,
"should see all expected items")
addedAndRemoved := append(maps.Keys(test.added), maps.Keys(test.removed)...)
for _, id := range addedAndRemoved {
// Should have a .data and a .meta file
d, ok := itemMap[id+".data"]
assert.True(t, ok, "should have data file for %q", id)
m, ok := itemMap[id+".meta"]
assert.True(t, ok, "should have meta file for %q", id)
// Meta files should not have item info.
assert.Implements(t, (*data.Item)(nil), m)
if slices.Contains(maps.Keys(test.removed), id) {
continue
}
// Mod times should match. Not doing this check for removed items
// since mod time is set to now() for them.
assert.Equal(
t,
d.(data.ItemModTime).ModTime(),
m.(data.ItemModTime).ModTime(),
"item mod time")
// Read meta file data. The data is of no significance, we just want
// to make sure the file is readable.
r := m.ToReader()
_, err := io.ReadAll(r)
assert.NoError(t, err, clues.ToCore(err))
r.Close()
}
})
}
}
type getAndAugmentConversation struct { type getAndAugmentConversation struct {
GetItemErr error GetItemErr error
GetMetaErr error
CallIDs []string CallIDs []string
} }
@ -297,12 +474,25 @@ func (m *getAndAugmentConversation) getItem(
return p, &details.GroupsInfo{}, m.GetItemErr return p, &details.GroupsInfo{}, m.GetItemErr
} }
//lint:ignore U1000 false linter issue due to generics
func (m *getAndAugmentConversation) getItemMetadata(
_ context.Context,
_ models.Conversationable,
) (io.ReadCloser, int, error) {
return io.NopCloser(strings.NewReader("test")), 4, m.GetMetaErr
}
// //
//lint:ignore U1000 false linter issue due to generics //lint:ignore U1000 false linter issue due to generics
func (m *getAndAugmentConversation) augmentItemInfo(*details.GroupsInfo, models.Conversationable) { func (m *getAndAugmentConversation) augmentItemInfo(*details.GroupsInfo, models.Conversationable) {
// no-op // no-op
} }
//lint:ignore U1000 false linter issue due to generics
func (m *getAndAugmentConversation) supportsItemMetadata() bool {
return true
}
func (m *getAndAugmentConversation) check(t *testing.T, expected []string) { func (m *getAndAugmentConversation) check(t *testing.T, expected []string) {
// Sort before comparing. We could use a set, but that would prevent us from // Sort before comparing. We could use a set, but that would prevent us from
// detecting duplicates. // detecting duplicates.
@ -312,7 +502,8 @@ func (m *getAndAugmentConversation) check(t *testing.T, expected []string) {
assert.Equal(t, expected, m.CallIDs, "expected calls") assert.Equal(t, expected, m.CallIDs, "expected calls")
} }
func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() { // Test lazy streamItems() for conversations.
func (suite *CollectionUnitSuite) TestLazyFetchCollection_Conversations() {
var ( var (
t = suite.T() t = suite.T()
start = time.Now().Add(-time.Second) start = time.Now().Add(-time.Second)
@ -343,13 +534,11 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
added: map[string]time.Time{ added: map[string]time.Time{
"fisher": start.Add(time.Minute), "fisher": start.Add(time.Minute),
"flannigan": start.Add(2 * time.Minute), "flannigan": start.Add(2 * time.Minute),
"fitzbog": start.Add(3 * time.Minute),
}, },
expectItemCount: 3, expectItemCount: 4,
expectReads: []string{ expectReads: []string{
"fisher", "fisher",
"flannigan", "flannigan",
"fitzbog",
}, },
}, },
{ {
@ -359,7 +548,7 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
"poppy": {}, "poppy": {},
"petunia": {}, "petunia": {},
}, },
expectItemCount: 3, expectItemCount: 6,
}, },
{ {
// TODO(pandeyabs): Overlaps between added and removed are deleted // TODO(pandeyabs): Overlaps between added and removed are deleted
@ -371,23 +560,23 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
// prefetch collections. // prefetch collections.
name: "added and removed items", name: "added and removed items",
added: map[string]time.Time{ added: map[string]time.Time{
"goblin": {}, "goblin": start.Add(time.Minute),
}, },
removed: map[string]struct{}{ removed: map[string]struct{}{
"general": {}, "general": {},
"goose": {}, "goose": {},
"grumbles": {}, "grumbles": {},
}, },
expectItemCount: 4, expectItemCount: 8,
}, },
} }
for _, test := range table { for _, test := range table {
suite.Run(test.name, func() { suite.Run(test.name, func() {
var ( var (
t = suite.T() t = suite.T()
errs = fault.New(true) errs = fault.New(true)
itemCount int itemMap = map[string]data.Item{}
) )
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
@ -413,9 +602,20 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
} }
for item := range col.Items(ctx, errs) { for item := range col.Items(ctx, errs) {
itemCount++ var trimmedID string
_, rok := test.removed[item.ID()] switch {
case strings.HasSuffix(item.ID(), ".data"):
trimmedID = strings.TrimSuffix(item.ID(), ".data")
case strings.HasSuffix(item.ID(), ".meta"):
trimmedID = strings.TrimSuffix(item.ID(), ".meta")
default:
assert.Fail(t, "unexpected item suffix: %s", item.ID())
}
itemMap[item.ID()] = item
_, rok := test.removed[trimmedID]
if rok { if rok {
assert.True(t, item.Deleted(), "removals should be marked as deleted") assert.True(t, item.Deleted(), "removals should be marked as deleted")
dimt, ok := item.(data.ItemModTime) dimt, ok := item.(data.ItemModTime)
@ -423,7 +623,7 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
assert.True(t, dimt.ModTime().After(start), "deleted items should set mod time to now()") assert.True(t, dimt.ModTime().After(start), "deleted items should set mod time to now()")
} }
modTime, aok := test.added[item.ID()] modTime, aok := test.added[trimmedID]
if !rok && aok { if !rok && aok {
// Item's mod time should be what's passed into the collection // Item's mod time should be what's passed into the collection
// initializer. // initializer.
@ -434,7 +634,10 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
// Check if the test wants us to read the item's data so the lazy // Check if the test wants us to read the item's data so the lazy
// data fetch is executed. // data fetch is executed.
if slices.Contains(test.expectReads, item.ID()) { //
// Inspect with trimmedID since getItem() operates on ID, not the file
if strings.HasSuffix(item.ID(), ".data") &&
slices.Contains(test.expectReads, trimmedID) {
r := item.ToReader() r := item.ToReader()
_, err := io.ReadAll(r) _, err := io.ReadAll(r)
@ -458,9 +661,40 @@ func (suite *CollectionUnitSuite) TestLazyFetchCollection_Items_LazyFetch() {
assert.NoError(t, errs.Failure()) assert.NoError(t, errs.Failure())
assert.Equal( assert.Equal(
t, t,
test.expectItemCount, test.expectItemCount, // 2*(len(test.added)+len(test.removed)),
itemCount, len(itemMap),
"should see all expected items") "should see all expected items")
addedAndRemoved := append(maps.Keys(test.added), maps.Keys(test.removed)...)
for _, id := range addedAndRemoved {
// Should have a .data and a .meta file
d, ok := itemMap[id+".data"]
assert.True(t, ok, "should have data file for %q", id)
m, ok := itemMap[id+".meta"]
assert.True(t, ok, "should have meta file for %q", id)
// Meta files should not have item info.
assert.Implements(t, (*data.Item)(nil), m)
if slices.Contains(maps.Keys(test.removed), id) {
continue
}
// Mod times should match. Not doing this check for removed items
// since mod time is set to now() for them.
assert.Equal(t, d.(data.ItemModTime).ModTime(), m.(data.ItemModTime).ModTime(), "item mod time")
// Read meta file data. The data is of no significance, we just want
// to make sure the file is readable.
r := m.ToReader()
_, err := io.ReadAll(r)
assert.NoError(t, err, clues.ToCore(err))
r.Close()
}
}) })
} }
} }

View File

@ -1,12 +1,16 @@
package groups package groups
import ( import (
"bytes"
"context" "context"
"encoding/json"
"io"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/m365/collection/groups/metadata"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
@ -132,6 +136,24 @@ func (bh conversationsBackupHandler) getItem(
postID) postID)
} }
//lint:ignore U1000 false linter issue due to generics
func (bh conversationsBackupHandler) getItemMetadata(
ctx context.Context,
c models.Conversationable,
) (io.ReadCloser, int, error) {
meta := metadata.ConversationPostMetadata{
Recipients: []string{bh.resourceEmail},
Topic: ptr.Val(c.GetTopic()),
}
metaJSON, err := json.Marshal(meta)
if err != nil {
return nil, 0, clues.WrapWC(ctx, err, "serializing post metadata")
}
return io.NopCloser(bytes.NewReader(metaJSON)), len(metaJSON), nil
}
//lint:ignore U1000 false linter issue due to generics //lint:ignore U1000 false linter issue due to generics
func (bh conversationsBackupHandler) augmentItemInfo( func (bh conversationsBackupHandler) augmentItemInfo(
dgi *details.GroupsInfo, dgi *details.GroupsInfo,
@ -148,6 +170,11 @@ func (bh conversationsBackupHandler) augmentItemInfo(
dgi.Post.Topic = ptr.Val(c.GetTopic()) dgi.Post.Topic = ptr.Val(c.GetTopic())
} }
//lint:ignore U1000 false linter issue due to generics
func (bh conversationsBackupHandler) supportsItemMetadata() bool {
return true
}
func conversationThreadContainer( func conversationThreadContainer(
c models.Conversationable, c models.Conversationable,
t models.ConversationThreadable, t models.ConversationThreadable,

View File

@ -0,0 +1,62 @@
package groups
import (
"encoding/json"
"io"
"testing"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/m365/collection/groups/metadata"
"github.com/alcionai/corso/src/internal/tester"
)
type ConversationHandlerUnitSuite struct {
tester.Suite
}
func TestConversationHandlerUnitSuite(t *testing.T) {
suite.Run(t, &ConversationHandlerUnitSuite{Suite: tester.NewUnitSuite(t)})
}
// Basic test to ensure metadata is serialized and deserialized correctly.
func (suite *ConversationHandlerUnitSuite) TestGetItemMetadata() {
var (
t = suite.T()
bh = conversationsBackupHandler{
resourceEmail: "test@example.com",
}
topic = "test topic"
conv = models.NewConversation()
)
ctx, flush := tester.NewContext(t)
defer flush()
conv.SetTopic(&topic)
rc, size, err := bh.getItemMetadata(ctx, conv)
assert.NoError(t, err, clues.ToCore(err))
require.NotNil(t, rc, "nil read closer")
assert.Greater(t, size, 0, "incorrect size")
defer rc.Close()
m, err := io.ReadAll(rc)
assert.NoError(t, err, "reading metadata")
var meta metadata.ConversationPostMetadata
err = json.Unmarshal(m, &meta)
assert.NoError(t, err, "deserializing metadata")
assert.Equal(t, []string{"test@example.com"}, meta.Recipients, "incorrect recipients")
assert.Equal(t, ptr.Val(conv.GetTopic()), meta.Topic, "incorrect topic")
}

View File

@ -5,19 +5,24 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"io" "io"
"strings"
"time" "time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/converters/eml"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
groupMeta "github.com/alcionai/corso/src/internal/m365/collection/groups/metadata"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/metrics"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph/metadata"
) )
func NewExportCollection( func NewExportCollection(
@ -26,7 +31,13 @@ func NewExportCollection(
backupVersion int, backupVersion int,
cec control.ExportConfig, cec control.ExportConfig,
stats *metrics.ExportStats, stats *metrics.ExportStats,
cat path.CategoryType,
) export.Collectioner { ) export.Collectioner {
streamItems := streamChannelMessages
if cat == path.ConversationPostsCategory {
streamItems = streamConversationPosts
}
return export.BaseCollection{ return export.BaseCollection{
BaseDir: baseDir, BaseDir: baseDir,
BackingCollection: backingCollections, BackingCollection: backingCollections,
@ -37,8 +48,12 @@ func NewExportCollection(
} }
} }
// streamItems streams the items in the backingCollection into the export stream chan //-------------------------------------------------------------
func streamItems( // Channel Messages
//-------------------------------------------------------------
// streamChannelMessages streams the items in the backingCollection into the export stream chan
func streamChannelMessages(
ctx context.Context, ctx context.Context,
drc []data.RestoreCollection, drc []data.RestoreCollection,
backupVersion int, backupVersion int,
@ -198,3 +213,146 @@ func makeMinimumChannelMesasge(item models.ChatMessageable) minimumChannelMessag
Subject: ptr.Val(item.GetSubject()), Subject: ptr.Val(item.GetSubject()),
} }
} }
//-------------------------------------------------------------
// Conversation Posts
//-------------------------------------------------------------
// streamConversationPosts adds the post items into the export stream channel.
func streamConversationPosts(
ctx context.Context,
drc []data.RestoreCollection,
backupVersion int,
cec control.ExportConfig,
ch chan<- export.Item,
stats *metrics.ExportStats,
) {
defer close(ch)
errs := fault.New(false)
for _, rc := range drc {
ictx := clues.Add(ctx, "path_short_ref", rc.FullPath().ShortRef())
for item := range rc.Items(ctx, errs) {
itemCtx := clues.Add(ictx, "stream_item_id", item.ID())
// Trim .data suffix from itemID. Also, we don't expect .meta files
// here since details are not persisted for metadata files.
trimmedID := strings.TrimSuffix(item.ID(), metadata.DataFileSuffix)
exportName := trimmedID + ".eml"
postMetadata, err := fetchAndReadMetadata(itemCtx, trimmedID, rc)
if err != nil {
ch <- export.Item{
ID: item.ID(),
Error: err,
}
continue
}
reader := item.ToReader()
content, err := io.ReadAll(reader)
reader.Close()
if err != nil {
ch <- export.Item{
ID: item.ID(),
Error: err,
}
continue
}
// Convert JSON to eml.
email, err := eml.FromJSONPostToEML(itemCtx, content, postMetadata)
if err != nil {
err = clues.Wrap(err, "converting JSON to eml")
logger.CtxErr(itemCtx, err).Info("processing collection item")
ch <- export.Item{
ID: item.ID(),
Error: err,
}
continue
}
emlReader := io.NopCloser(bytes.NewReader([]byte(email)))
stats.UpdateResourceCount(path.ConversationPostsCategory)
body := metrics.ReaderWithStats(emlReader, path.ConversationPostsCategory, stats)
ch <- export.Item{
ID: item.ID(),
Name: exportName,
Body: body,
}
}
items, recovered := errs.ItemsAndRecovered()
// Return all the items that we failed to source from the persistence layer
for _, item := range items {
ch <- export.Item{
ID: item.ID,
Error: &item,
}
}
for _, err := range recovered {
ch <- export.Item{
Error: err,
}
}
}
}
func fetchAndReadMetadata(
ctx context.Context,
itemID string,
fin data.FetchItemByNamer,
) (groupMeta.ConversationPostMetadata, error) {
metaName := itemID + metadata.MetaFileSuffix
ictx := clues.Add(ctx, "meta_file_name", metaName)
meta, err := fin.FetchItemByName(ictx, metaName)
if err != nil {
return groupMeta.ConversationPostMetadata{},
clues.WrapWC(ctx, err, "fetching metadata")
}
metaReader := meta.ToReader()
defer metaReader.Close()
metaFormatted, err := readMetadata(metaReader)
if err != nil {
return groupMeta.ConversationPostMetadata{},
clues.WrapWC(ctx, err, "deserializing metadata")
}
return metaFormatted, nil
}
// getMetadata reads and parses the metadata info for an item
func readMetadata(metaRC io.ReadCloser) (groupMeta.ConversationPostMetadata, error) {
var meta groupMeta.ConversationPostMetadata
// `metaRC` will be nil for the top level container folder
if metaRC != nil {
metaraw, err := io.ReadAll(metaRC)
if err != nil {
return groupMeta.ConversationPostMetadata{}, err
}
err = json.Unmarshal(metaraw, &meta)
if err != nil {
return groupMeta.ConversationPostMetadata{}, err
}
}
return meta, nil
}

View File

@ -7,6 +7,7 @@ import (
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
@ -16,6 +17,7 @@ import (
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/export" "github.com/alcionai/corso/src/pkg/export"
"github.com/alcionai/corso/src/pkg/metrics" "github.com/alcionai/corso/src/pkg/metrics"
"github.com/alcionai/corso/src/pkg/path"
) )
type ExportUnitSuite struct { type ExportUnitSuite struct {
@ -26,7 +28,7 @@ func TestExportUnitSuite(t *testing.T) {
suite.Run(t, &ExportUnitSuite{Suite: tester.NewUnitSuite(t)}) suite.Run(t, &ExportUnitSuite{Suite: tester.NewUnitSuite(t)})
} }
func (suite *ExportUnitSuite) TestStreamItems() { func (suite *ExportUnitSuite) TestStreamChannelMessages() {
makeBody := func() io.ReadCloser { makeBody := func() io.ReadCloser {
return io.NopCloser(bytes.NewReader([]byte("{}"))) return io.NopCloser(bytes.NewReader([]byte("{}")))
} }
@ -86,7 +88,7 @@ func (suite *ExportUnitSuite) TestStreamItems() {
ch := make(chan export.Item) ch := make(chan export.Item)
go streamItems( go streamChannelMessages(
ctx, ctx,
[]data.RestoreCollection{test.backingColl}, []data.RestoreCollection{test.backingColl},
version.NoBackup, version.NoBackup,
@ -113,3 +115,163 @@ func (suite *ExportUnitSuite) TestStreamItems() {
}) })
} }
} }
func (suite *ExportUnitSuite) TestStreamConversationPosts() {
testPath, err := path.Build(
"t",
"g",
path.GroupsService,
path.ConversationPostsCategory,
true,
"convID",
"threadID")
require.NoError(suite.T(), err, clues.ToCore(err))
makeBody := func() io.ReadCloser {
rc := io.NopCloser(bytes.NewReader([]byte("{}")))
return metrics.ReaderWithStats(
rc,
path.ConversationPostsCategory,
&metrics.ExportStats{})
}
makeMeta := func() io.ReadCloser {
return io.NopCloser(
bytes.NewReader([]byte(`{"topic":"t", "recipients":["em@il"]}`)))
}
table := []struct {
name string
backingColl dataMock.Collection
expectItem export.Item
expectErr assert.ErrorAssertionFunc
}{
{
name: "no errors",
backingColl: dataMock.Collection{
ItemData: []data.Item{
&dataMock.Item{
ItemID: "zim.data",
Reader: makeBody(),
},
},
Path: testPath,
AuxItems: map[string]data.Item{
"zim.meta": &dataMock.Item{
ItemID: "zim.meta",
Reader: makeMeta(),
},
},
},
expectItem: export.Item{
ID: "zim.data",
Name: "zim.eml",
Body: makeBody(),
},
expectErr: assert.NoError,
},
{
name: "only recoverable errors",
backingColl: dataMock.Collection{
ItemsRecoverableErrs: []error{
clues.New("The knowledge... it fills me! It is neat!"),
},
Path: testPath,
},
expectErr: assert.Error,
},
{
name: "items and recoverable errors",
backingColl: dataMock.Collection{
ItemData: []data.Item{
&dataMock.Item{
ItemID: "gir.data",
Reader: makeBody(),
},
},
ItemsRecoverableErrs: []error{
clues.New("I miss my cupcake."),
},
Path: testPath,
AuxItems: map[string]data.Item{
"gir.meta": &dataMock.Item{
ItemID: "gir.meta",
Reader: makeMeta(),
},
},
},
expectItem: export.Item{
ID: "gir.data",
Name: "gir.eml",
Body: makeBody(),
},
expectErr: assert.Error,
},
{
name: "missing metadata",
backingColl: dataMock.Collection{
ItemData: []data.Item{
&dataMock.Item{
ItemID: "mir.data",
Reader: makeBody(),
},
},
Path: testPath,
},
expectItem: export.Item{
ID: "mir.data",
Error: assert.AnError,
},
expectErr: assert.Error,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
ch := make(chan export.Item)
go streamConversationPosts(
ctx,
[]data.RestoreCollection{test.backingColl},
version.NoBackup,
control.DefaultExportConfig(),
ch,
&metrics.ExportStats{})
var (
itm export.Item
err error
)
for i := range ch {
if i.Error == nil {
itm = i
} else {
err = i.Error
}
}
test.expectErr(t, err, clues.ToCore(err))
assert.Equal(t, test.expectItem.ID, itm.ID, "item ID")
assert.Equal(t, test.expectItem.Name, itm.Name, "item name")
if err != nil {
return
}
assert.NotNil(t, itm.Body, "body")
_, err = io.ReadAll(itm.Body)
require.NoError(t, err, clues.ToCore(err))
itm.Body.Close()
})
}
}

View File

@ -2,6 +2,7 @@ package groups
import ( import (
"context" "context"
"io"
"github.com/microsoft/kiota-abstractions-go/serialization" "github.com/microsoft/kiota-abstractions-go/serialization"
@ -33,7 +34,9 @@ type backupHandler[C graph.GetIDer, I groupsItemer] interface {
type getItemAndAugmentInfoer[C graph.GetIDer, I groupsItemer] interface { type getItemAndAugmentInfoer[C graph.GetIDer, I groupsItemer] interface {
getItemer[I] getItemer[I]
getItemMetadataer[C, I]
augmentItemInfoer[C] augmentItemInfoer[C]
supportsItemMetadataer[C, I]
} }
type augmentItemInfoer[C graph.GetIDer] interface { type augmentItemInfoer[C graph.GetIDer] interface {
@ -51,6 +54,17 @@ type getItemer[I groupsItemer] interface {
) (I, *details.GroupsInfo, error) ) (I, *details.GroupsInfo, error)
} }
type getItemMetadataer[C graph.GetIDer, I groupsItemer] interface {
getItemMetadata(
ctx context.Context,
c C,
) (io.ReadCloser, int, error)
}
type supportsItemMetadataer[C graph.GetIDer, I groupsItemer] interface {
supportsItemMetadata() bool
}
// gets all containers for the resource // gets all containers for the resource
type getContainerser[C graph.GetIDer] interface { type getContainerser[C graph.GetIDer] interface {
getContainers( getContainers(

View File

@ -0,0 +1,8 @@
package metadata
// ConversationPostMetadata stores metadata for a given conversation post,
// stored as a .meta file in kopia.
type ConversationPostMetadata struct {
Recipients []string `json:"recipients,omitempty"`
Topic string `json:"topic,omitempty"`
}

View File

@ -82,7 +82,7 @@ func (h *baseGroupsHandler) ProduceExportCollections(
) )
switch cat { switch cat {
case path.ChannelMessagesCategory: case path.ChannelMessagesCategory, path.ConversationPostsCategory:
folders = append(folders, fp.Folders()...) folders = append(folders, fp.Folders()...)
coll = groups.NewExportCollection( coll = groups.NewExportCollection(
@ -90,7 +90,8 @@ func (h *baseGroupsHandler) ProduceExportCollections(
[]data.RestoreCollection{restoreColl}, []data.RestoreCollection{restoreColl},
backupVersion, backupVersion,
exportCfg, exportCfg,
stats) stats,
cat)
case path.LibrariesCategory: case path.LibrariesCategory:
drivePath, err := path.ToDrivePath(restoreColl.FullPath()) drivePath, err := path.ToDrivePath(restoreColl.FullPath())

View File

@ -238,3 +238,86 @@ func (suite *ExportUnitSuite) TestExportRestoreCollections_libraries() {
expectedStats.UpdateResourceCount(path.FilesCategory) expectedStats.UpdateResourceCount(path.FilesCategory)
assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats") assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats")
} }
func (suite *ExportUnitSuite) TestExportRestoreCollections_ConversationPosts() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
var (
itemID = "itemID"
containerName = "convID"
content = groupMock.PostWithAttachments
body = io.NopCloser(bytes.NewBufferString(content))
exportCfg = control.ExportConfig{}
expectedPath = path.ConversationPostsCategory.HumanString() + "/" + containerName
expectedItems = []export.Item{
{
ID: itemID + ".data",
Name: itemID + ".eml",
// Body: body, not checked
},
}
)
p, err := path.Build("t", "pr", path.GroupsService, path.ConversationPostsCategory, false, containerName)
assert.NoError(t, err, "build path")
dcs := []data.RestoreCollection{
data.FetchRestoreCollection{
Collection: dataMock.Collection{
Path: p,
ItemData: []data.Item{
&dataMock.Item{
ItemID: itemID + ".data",
Reader: body,
},
},
},
FetchItemByNamer: finD{
id: itemID + ".meta",
key: "topic", name: itemID + ".meta",
},
},
}
stats := metrics.NewExportStats()
ecs, err := NewGroupsHandler(api.Client{}, nil).
ProduceExportCollections(
ctx,
int(version.Backup),
exportCfg,
dcs,
stats,
fault.New(true))
assert.NoError(t, err, "export collections error")
assert.Len(t, ecs, 1, "num of collections")
assert.Equal(t, expectedPath, ecs[0].BasePath(), "base dir")
fitems := []export.Item{}
size := 0
for item := range ecs[0].Items(ctx) {
b, err := io.ReadAll(item.Body)
assert.NoError(t, err, clues.ToCore(err))
// count up size for tests
size += len(b)
// have to nil out body, otherwise assert fails due to
// pointer memory location differences
item.Body = nil
fitems = append(fitems, item)
}
assert.Equal(t, expectedItems, fitems, "items")
expectedStats := metrics.NewExportStats()
expectedStats.UpdateBytes(path.ConversationPostsCategory, int64(size))
expectedStats.UpdateResourceCount(path.ConversationPostsCategory)
assert.Equal(t, expectedStats.GetStats(), stats.GetStats(), "stats")
}

View File

@ -0,0 +1,85 @@
{
"@odata.context": "https://graph.microsoft.com/v1.0/$metadata#groups('1623c35a-b67a-473a-9b21-5e4891f22e70')/conversations('AAQkAGNhMGQwY2ZmLTEzZDctNDNhZC05Y2I4LWIyOTgzNjk4YWExZQAQAHUiDz4vCHZNqyz90GJoN54%3D')/threads('AAQkAGNhMGQwY2ZmLTEzZDctNDNhZC05Y2I4LWIyOTgzNjk4YWExZQMkABAAdSIPPi8Idk2rLP3QYmg3nhAAdSIPPi8Idk2rLP3QYmg3ng%3D%3D')/posts(*,attachments())/$entity",
"@odata.etag": "W/\"CQAAABYAAADxkJD2bSaUS7TYwOHY6vKrAAAiegI9\"",
"id": "AAMkAGNhMGQwY2ZmLTEzZDctNDNhZC05Y2I4LWIyOTgzNjk4YWExZQBGAAAAAADdwE6qobHzQo5d_R1eoqPKBwDxkJD2bSaUS7TYwOHY6vKrAAAAAAEMAADxkJD2bSaUS7TYwOHY6vKrAAAidRn9AAA=",
"createdDateTime": "2024-01-29T02:22:18Z",
"lastModifiedDateTime": "2024-01-29T02:22:19Z",
"changeKey": "CQAAABYAAADxkJD2bSaUS7TYwOHY6vKrAAAiegI9",
"categories": [],
"receivedDateTime": "2024-01-29T02:22:19Z",
"hasAttachments": true,
"conversationThreadId": "AAQkAGNhMGQwY2ZmLTEzZDctNDNhZC05Y2I4LWIyOTgzNjk4YWExZQMkABAAdSIPPi8Idk2rLP3QYmg3nhAAdSIPPi8Idk2rLP3QYmg3ng==",
"conversationId": null,
"body": {
"contentType": "html",
"content": "<html><body><div>\r\n<div dir=\"ltr\"><table id=\"x_x_x_x_x_x_content\" style=\"text-align:left;background-color:white;border-spacing:0;border-collapse:collapse;margin:0;box-sizing:border-box;\">\r\n<tr>\r\n<td style=\"text-align:left;vertical-align:top;box-sizing:border-box;\">\r\n<div style=\"text-align:left;margin:0;\"><span style=\"color:black;font-size:15px;font-family:Segoe UI,Tahoma,Microsoft Sans Serif,Verdana,sans-serif;background-color:white;\"><a href=\"mailto:dc_test@10rqc2.onmicrosoft.com\" id=\"OWAf1f38008-d513-0f95-4824-1d46e7a2841b\" data-linkindex=\"0\" style=\"text-align:left;background-color:white;text-decoration:none;margin:0;\"><img data-imagetype=\"AttachmentByCid\" originalSrc=\"cid:7fa9ea6b-8e03-473c-8b34-cae13eaa33aa\" explicitlogon=\"dc_test@10rqc2.onmicrosoft.com\" src=\"cid:7fa9ea6b-8e03-473c-8b34-cae13eaa33aa\" id=\"x_x_image_0\" data-outlook-trace=\"F:5|T:5\" size=\"447\" style=\"vertical-align:top;display:block;width:64px;height:64px;max-width:1001px;margin:0;min-height:auto;min-width:auto;\"></a></span></div></td></tr></table>\r\n<div style=\"color:black;font-size:12pt;font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;text-align:left;background-color:white;margin:0;\"><span style=\"color:black;font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;\"><br>\r\n</span></div>\r\n<div style=\"color:black;font-size:12pt;font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;text-align:left;background-color:white;margin:0;\"><span style=\"color:black;font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;\">Embedded + direct attachments.</span></div></div></div>\r\n</body></html>"
},
"from": {
"emailAddress": {
"name": "Dustin Corners",
"address": "Dustin.Corners@10rqc2.onmicrosoft.com"
}
},
"sender": {
"emailAddress": {
"name": "Dustin Corners",
"address": "Dustin.Corners@10rqc2.onmicrosoft.com"
}
},
"newParticipants": [],
"attachments": [
{
"@odata.type": "#microsoft.graph.fileAttachment",
"@odata.mediaContentType": "image/png",
"id": "AAMkAGNhMGQwY2ZmLTEzZDctNDNhZC05Y2I4LWIyOTgzNjk4YWExZQBGAAAAAADdwE6qobHzQo5d_R1eoqPKBwDxkJD2bSaUS7TYwOHY6vKrAAAAAAEMAADxkJD2bSaUS7TYwOHY6vKrAAAidRn9AAABEgAQAJALn6ReFnlAuFpgf3BBdwM=",
"lastModifiedDateTime": "2024-01-29T02:22:18Z",
"name": "image.png",
"contentType": "image/png",
"size": 690,
"isInline": true,
"contentId": "7fa9ea6b-8e03-473c-8b34-cae13eaa33aa",
"contentLocation": null,
"contentBytes": "iVBORw0KGgoAAAANSUhEUgAAAEAAAABACAYAAACqaXHeAAAAAXNSR0IArs4c6QAAAXlJREFUeF7tmcFtwkAQRWcjhIKogQLSQ5pIIbnkEkqAC23QQ670EArIKeckQuICWsAGrayFsdhRtPO4YuOdN//NYjuI809wXr8AgAQ4J4ACzgPAEEQBFHBOAAWcB4BdAAVQwDkBFHAeAHYBFEAB5wRQwHkA2AVQQKbr3b/WYPZUtEkBAE0CCpNWp8xoXecEAKCsayRASwAFTrtTYTWZAe02WJi01gCrdZEAK9IkQEuAXYBd4HiTVng4MwQZgkbDRjsDrRqDAlakSYCWgJGaKIACRlHTGmDVGBRQk158jeRtslF3VHuCUTLz7wXS/+Gx+O+fDwnhWVtP7+PN7gW6Vnh58UPxv3MJ8tq7mD4nFgfQtag0fmnn46KsVOgDTXFO93u3FMD756qNfUXFR055AH+DRxlvlyIPL+29eSWdb0KSB7CRrYxkaPFgQpHaux6aB9BcqvAgumtFyh+7DqAy51M+14dgZc7fDqDyzueHYPy28s7nATgpPkLYA1p04EEgvdAkAAAAAElFTkSuQmCC"
},
{
"@odata.type": "#microsoft.graph.fileAttachment",
"@odata.mediaContentType": "application/octet-stream",
"id": "AAMkAGNhMGQwY2ZmLTEzZDctNDNhZC05Y2I4LWIyOTgzNjk4YWExZQBGAAAAAADdwE6qobHzQo5d_R1eoqPKBwDxkJD2bSaUS7TYwOHY6vKrAAAAAAEMAADxkJD2bSaUS7TYwOHY6vKrAAAidRn9AAABEgAQAO6vI6h5OXZDlVIaM2DTB_I=",
"lastModifiedDateTime": "2024-01-29T02:22:18Z",
"name": "file_100bytes",
"contentType": "application/octet-stream",
"size": 250,
"isInline": false,
"contentId": null,
"contentLocation": null,
"contentBytes": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=="
}
],
"inReplyTo@odata.associationLink": "https://graph.microsoft.com/v1.0/groups('1623c35a-b67a-473a-9b21-5e4891f22e70')/threads('AAQkAGNhMGQwY2ZmLTEzZDctNDNhZC05Y2I4LWIyOTgzNjk4YWExZQMkABAAdSIPPi8Idk2rLP3QYmg3nhAAdSIPPi8Idk2rLP3QYmg3ng==')/posts('AAMkAGNhMGQwY2ZmLTEzZDctNDNhZC05Y2I4LWIyOTgzNjk4YWExZQBGAAAAAADdwE6qobHzQo5d_R1eoqPKBwDxkJD2bSaUS7TYwOHY6vKrAAAAAAEMAADxkJD2bSaUS7TYwOHY6vKrAAAidMn9AAA=')/$ref",
"inReplyTo@odata.navigationLink": "https://graph.microsoft.com/v1.0/groups('1623c35a-b67a-473a-9b21-5e4891f22e70')/threads('AAQkAGNhMGQwY2ZmLTEzZDctNDNhZC05Y2I4LWIyOTgzNjk4YWExZQMkABAAdSIPPi8Idk2rLP3QYmg3nhAAdSIPPi8Idk2rLP3QYmg3ng==')/posts('AAMkAGNhMGQwY2ZmLTEzZDctNDNhZC05Y2I4LWIyOTgzNjk4YWExZQBGAAAAAADdwE6qobHzQo5d_R1eoqPKBwDxkJD2bSaUS7TYwOHY6vKrAAAAAAEMAADxkJD2bSaUS7TYwOHY6vKrAAAidMn9AAA=')",
"inReplyTo": {
"@odata.etag": "W/\"CQAAABYAAADxkJD2bSaUS7TYwOHY6vKrAAAiegI9\"",
"id": "AAMkAGNhMGQwY2ZmLTEzZDctNDNhZC05Y2I4LWIyOTgzNjk4YWExZQBGAAAAAADdwE6qobHzQo5d_R1eoqPKBwDxkJD2bSaUS7TYwOHY6vKrAAAAAAEMAADxkJD2bSaUS7TYwOHY6vKrAAAidMn9AAA=",
"createdDateTime": "2024-01-29T02:21:18Z",
"lastModifiedDateTime": "2024-01-29T02:21:19Z",
"changeKey": "CQAAABYAAADxkJD2bSaUS7TYwOHY6vKrAAAiegI9",
"categories": [],
"receivedDateTime": "2024-01-29T02:21:19Z",
"hasAttachments": true,
"body": {
"contentType": "html",
"content": "<html><body><div>\r\n<div dir=\"ltr\">\r\n<div style=\"color:black;font-size:12pt;font-family:Aptos,Aptos_EmbeddedFont,Aptos_MSFontService,Calibri,Helvetica,sans-serif;\">Test Reply</div></div></div>\r\n</body></html>" },
"from": {
"emailAddress": {
"name": "Dustin Corners",
"address": "Dustin.Corners@10rqc2.onmicrosoft.com"
}
},
"sender": {
"emailAddress": {
"name": "Dustin Corners",
"address": "Dustin.Corners@10rqc2.onmicrosoft.com"
}
}
}
}

View File

@ -0,0 +1,6 @@
package stub
import _ "embed"
//go:embed post-with-attachments.json
var PostWithAttachments string

View File

@ -143,6 +143,7 @@ func makeRestorePathsForEntry(
switch true { switch true {
case ent.Exchange != nil || case ent.Exchange != nil ||
(ent.Groups != nil && ent.Groups.ItemType == details.GroupsChannelMessage) || (ent.Groups != nil && ent.Groups.ItemType == details.GroupsChannelMessage) ||
(ent.Groups != nil && ent.Groups.ItemType == details.GroupsConversationPost) ||
(ent.SharePoint != nil && ent.SharePoint.ItemType == details.SharePointList): (ent.SharePoint != nil && ent.SharePoint.ItemType == details.SharePointList):
// TODO(ashmrtn): Eventually make Events have it's own function to handle // TODO(ashmrtn): Eventually make Events have it's own function to handle
// setting the restore destination properly. // setting the restore destination properly.

View File

@ -2,13 +2,16 @@ package api
import ( import (
"context" "context"
"strings"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/jaytaylor/html2text" "github.com/jaytaylor/html2text"
"github.com/microsoft/kiota-abstractions-go/serialization"
"github.com/microsoftgraph/msgraph-sdk-go/groups" "github.com/microsoftgraph/msgraph-sdk-go/groups"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/sanitize"
"github.com/alcionai/corso/src/internal/common/str" "github.com/alcionai/corso/src/internal/common/str"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
@ -190,3 +193,29 @@ func (c Conversations) getAttachments(
return result, totalSize, nil return result, totalSize, nil
} }
func bytesToPostable(body []byte) (serialization.Parsable, error) {
v, err := CreateFromBytes(body, models.CreatePostFromDiscriminatorValue)
if err != nil {
if !strings.Contains(err.Error(), invalidJSON) {
return nil, clues.Wrap(err, "deserializing bytes to message")
}
// If the JSON was invalid try sanitizing and deserializing again.
// Sanitizing should transform characters < 0x20 according to the spec where
// possible. The resulting JSON may still be invalid though.
body = sanitize.JSONBytes(body)
v, err = CreateFromBytes(body, models.CreatePostFromDiscriminatorValue)
}
return v, clues.Stack(err).OrNil()
}
func BytesToPostable(body []byte) (models.Postable, error) {
v, err := bytesToPostable(body)
if err != nil {
return nil, clues.Stack(err)
}
return v.(models.Postable), nil
}

View File

@ -12,6 +12,8 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
exchMock "github.com/alcionai/corso/src/internal/m365/service/exchange/mock"
stub "github.com/alcionai/corso/src/internal/m365/service/groups/mock"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/tester/tconfig" "github.com/alcionai/corso/src/internal/tester/tconfig"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
@ -115,6 +117,68 @@ func (suite *ConversationsAPIUnitSuite) TestConversationPostInfo() {
} }
} }
// TestBytesToPostable_InvalidError tests that the error message kiota returns
// for invalid JSON matches what we check for. This helps keep things in sync
// when kiota is updated.
func (suite *MailAPIUnitSuite) TestBytesToPostable_InvalidError() {
t := suite.T()
input := exchMock.MessageWithSpecialCharacters("m365 mail support test")
_, err := CreateFromBytes(input, models.CreatePostFromDiscriminatorValue)
require.Error(t, err, clues.ToCore(err))
assert.Contains(t, err.Error(), invalidJSON)
}
func (suite *ConversationsAPIUnitSuite) TestBytesToPostable() {
table := []struct {
name string
byteArray []byte
checkError assert.ErrorAssertionFunc
checkObject assert.ValueAssertionFunc
}{
{
name: "Empty Bytes",
byteArray: make([]byte, 0),
checkError: assert.Error,
checkObject: assert.Nil,
},
{
name: "post bytes",
// Note: inReplyTo is not serialized or deserialized by kiota so we can't
// test that aspect. The payload does contain inReplyTo data for future use.
byteArray: []byte(stub.PostWithAttachments),
checkError: assert.NoError,
checkObject: assert.NotNil,
},
// Using test data from exchMock package for these tests because posts are
// essentially email messages.
{
name: "malformed JSON bytes passes sanitization",
byteArray: exchMock.MessageWithSpecialCharacters("m365 mail support test"),
checkError: assert.NoError,
checkObject: assert.NotNil,
},
{
name: "invalid JSON bytes",
byteArray: append(
exchMock.MessageWithSpecialCharacters("m365 mail support test"),
[]byte("}")...),
checkError: assert.Error,
checkObject: assert.Nil,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
result, err := BytesToPostable(test.byteArray)
test.checkError(t, err, clues.ToCore(err))
test.checkObject(t, result)
})
}
}
type ConversationAPIIntgSuite struct { type ConversationAPIIntgSuite struct {
tester.Suite tester.Suite
its intgTesterSetup its intgTesterSetup

View File

@ -1,6 +1,8 @@
package metadata package metadata
import "strings" import (
"strings"
)
const ( const (
MetaFileSuffix = ".meta" MetaFileSuffix = ".meta"

View File

@ -9,9 +9,12 @@ func IsMetadataFile(p path.Path) bool {
case path.OneDriveService: case path.OneDriveService:
return HasMetaSuffix(p.Item()) return HasMetaSuffix(p.Item())
case path.SharePointService, path.GroupsService: case path.SharePointService:
return p.Category() == path.LibrariesCategory && HasMetaSuffix(p.Item()) return p.Category() == path.LibrariesCategory && HasMetaSuffix(p.Item())
case path.GroupsService:
return p.Category() == path.LibrariesCategory && HasMetaSuffix(p.Item()) ||
p.Category() == path.ConversationPostsCategory && HasMetaSuffix(p.Item())
default: default:
return false return false
} }

View File

@ -152,3 +152,108 @@ func (suite *MetadataUnitSuite) TestIsMetadataFile_Directories() {
} }
} }
} }
func (suite *MetadataUnitSuite) TestIsMetadataFile() {
table := []struct {
name string
service path.ServiceType
category path.CategoryType
isMetaFile bool
expected bool
}{
{
name: "onedrive .data file",
service: path.OneDriveService,
category: path.FilesCategory,
},
{
name: "sharepoint library .data file",
service: path.SharePointService,
category: path.LibrariesCategory,
},
{
name: "group library .data file",
service: path.GroupsService,
category: path.LibrariesCategory,
},
{
name: "group conversations .data file",
service: path.GroupsService,
category: path.ConversationPostsCategory,
},
{
name: "onedrive .meta file",
service: path.OneDriveService,
category: path.FilesCategory,
isMetaFile: true,
expected: true,
},
{
name: "sharepoint library .meta file",
service: path.SharePointService,
category: path.LibrariesCategory,
isMetaFile: true,
expected: true,
},
{
name: "group library .meta file",
service: path.GroupsService,
category: path.LibrariesCategory,
isMetaFile: true,
expected: true,
},
{
name: "group conversations .meta file",
service: path.GroupsService,
category: path.ConversationPostsCategory,
isMetaFile: true,
expected: true,
},
// For services which don't have metadata files, make sure the function
// returns false. We don't want .meta suffix (assuming it exists) in
// these cases to be interpreted as metadata files.
{
name: "exchange service",
service: path.ExchangeService,
category: path.EmailCategory,
isMetaFile: true,
},
{
name: "group channels",
service: path.GroupsService,
category: path.ChannelMessagesCategory,
isMetaFile: true,
},
{
name: "lists",
service: path.SharePointService,
category: path.ListsCategory,
isMetaFile: true,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
fileName := "file"
if test.isMetaFile {
fileName += metadata.MetaFileSuffix
} else {
fileName += metadata.DataFileSuffix
}
p, err := path.Build(
"t",
"u",
test.service,
test.category,
true,
"some", "path", "for", fileName)
require.NoError(t, err, clues.ToCore(err))
actual := metadata.IsMetadataFile(p)
assert.Equal(t, test.expected, actual)
})
}
}