add export support in teamschats collections processor
This commit is contained in:
parent
3912a64963
commit
7ab1276d61
@ -137,7 +137,7 @@ func formatChannelMessage(
|
|||||||
return nil, clues.New("expected deserialized item to implement models.ChatMessageable")
|
return nil, clues.New("expected deserialized item to implement models.ChatMessageable")
|
||||||
}
|
}
|
||||||
|
|
||||||
mItem := makeMinimumChannelMesasge(msg)
|
mItem := makeMinimumChannelMessage(msg)
|
||||||
replies := msg.GetReplies()
|
replies := msg.GetReplies()
|
||||||
|
|
||||||
mcmar := minimumChannelMessageAndReplies{
|
mcmar := minimumChannelMessageAndReplies{
|
||||||
@ -146,7 +146,7 @@ func formatChannelMessage(
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, r := range replies {
|
for _, r := range replies {
|
||||||
mcmar.Replies = append(mcmar.Replies, makeMinimumChannelMesasge(r))
|
mcmar.Replies = append(mcmar.Replies, makeMinimumChannelMessage(r))
|
||||||
}
|
}
|
||||||
|
|
||||||
bs, err = marshalJSONContainingHTML(mcmar)
|
bs, err = marshalJSONContainingHTML(mcmar)
|
||||||
@ -172,7 +172,7 @@ func marshalJSONContainingHTML(a any) ([]byte, error) {
|
|||||||
return buffer.Bytes(), clues.Stack(err).OrNil()
|
return buffer.Bytes(), clues.Stack(err).OrNil()
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeMinimumChannelMesasge(item models.ChatMessageable) minimumChannelMessage {
|
func makeMinimumChannelMessage(item models.ChatMessageable) minimumChannelMessage {
|
||||||
var content string
|
var content string
|
||||||
|
|
||||||
if item.GetBody() != nil {
|
if item.GetBody() != nil {
|
||||||
|
|||||||
229
src/internal/m365/collection/teamsChats/export.go
Normal file
229
src/internal/m365/collection/teamsChats/export.go
Normal file
@ -0,0 +1,229 @@
|
|||||||
|
package teamschats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/alcionai/clues"
|
||||||
|
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||||
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
|
"github.com/alcionai/corso/src/pkg/export"
|
||||||
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
|
"github.com/alcionai/corso/src/pkg/metrics"
|
||||||
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
|
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewExportCollection(
|
||||||
|
baseDir string,
|
||||||
|
backingCollections []data.RestoreCollection,
|
||||||
|
backupVersion int,
|
||||||
|
cec control.ExportConfig,
|
||||||
|
stats *metrics.ExportStats,
|
||||||
|
) export.Collectioner {
|
||||||
|
return export.BaseCollection{
|
||||||
|
BaseDir: baseDir,
|
||||||
|
BackingCollection: backingCollections,
|
||||||
|
BackupVersion: backupVersion,
|
||||||
|
Cfg: cec,
|
||||||
|
Stream: streamItems,
|
||||||
|
Stats: stats,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// streamItems streams the items in the backingCollection into the export stream chan
|
||||||
|
func streamItems(
|
||||||
|
ctx context.Context,
|
||||||
|
drc []data.RestoreCollection,
|
||||||
|
backupVersion int,
|
||||||
|
cec control.ExportConfig,
|
||||||
|
ch chan<- export.Item,
|
||||||
|
stats *metrics.ExportStats,
|
||||||
|
) {
|
||||||
|
defer close(ch)
|
||||||
|
|
||||||
|
errs := fault.New(false)
|
||||||
|
|
||||||
|
for _, rc := range drc {
|
||||||
|
for item := range rc.Items(ctx, errs) {
|
||||||
|
body, err := formatChat(cec, item.ToReader())
|
||||||
|
if err != nil {
|
||||||
|
ch <- export.Item{
|
||||||
|
ID: item.ID(),
|
||||||
|
Error: err,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
stats.UpdateResourceCount(path.ChatsCategory)
|
||||||
|
body = metrics.ReaderWithStats(body, path.ChatsCategory, stats)
|
||||||
|
|
||||||
|
// messages are exported as json and should be named as such
|
||||||
|
name := item.ID() + ".json"
|
||||||
|
|
||||||
|
ch <- export.Item{
|
||||||
|
ID: item.ID(),
|
||||||
|
Name: name,
|
||||||
|
Body: body,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
items, recovered := errs.ItemsAndRecovered()
|
||||||
|
|
||||||
|
// Return all the items that we failed to source from the persistence layer
|
||||||
|
for _, item := range items {
|
||||||
|
ch <- export.Item{
|
||||||
|
ID: item.ID,
|
||||||
|
Error: &item,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, err := range recovered {
|
||||||
|
ch <- export.Item{
|
||||||
|
Error: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type (
|
||||||
|
minimumChat struct {
|
||||||
|
CreatedDateTime time.Time `json:"createdDateTime"`
|
||||||
|
LastUpdatedDateTime time.Time `json:"lastUpdatedDateTime"`
|
||||||
|
Topic string `json:"topic"`
|
||||||
|
Messages []minimumChatMessage `json:"replies,omitempty"`
|
||||||
|
Members []minimumChatMember `json:"members"`
|
||||||
|
}
|
||||||
|
|
||||||
|
minimumChatMember struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
VisibleHistoryStartedAt time.Time `json:"visibleHistoryStartedAt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
minimumChatMessage struct {
|
||||||
|
Attachments []minimumAttachment `json:"attachments"`
|
||||||
|
Content string `json:"content"`
|
||||||
|
CreatedDateTime time.Time `json:"createdDateTime"`
|
||||||
|
From string `json:"from"`
|
||||||
|
LastModifiedDateTime time.Time `json:"lastModifiedDateTime"`
|
||||||
|
IsDeleted bool `json:"isDeleted"`
|
||||||
|
}
|
||||||
|
|
||||||
|
minimumAttachment struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func formatChat(
|
||||||
|
cec control.ExportConfig,
|
||||||
|
rc io.ReadCloser,
|
||||||
|
) (io.ReadCloser, error) {
|
||||||
|
if cec.Format == control.JSONFormat {
|
||||||
|
return rc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
bs, err := io.ReadAll(rc)
|
||||||
|
if err != nil {
|
||||||
|
return nil, clues.Wrap(err, "reading item bytes")
|
||||||
|
}
|
||||||
|
|
||||||
|
defer rc.Close()
|
||||||
|
|
||||||
|
cfb, err := api.CreateFromBytes(bs, models.CreateChatFromDiscriminatorValue)
|
||||||
|
if err != nil {
|
||||||
|
return nil, clues.Wrap(err, "deserializing bytes to message")
|
||||||
|
}
|
||||||
|
|
||||||
|
chat, ok := cfb.(models.Chatable)
|
||||||
|
if !ok {
|
||||||
|
return nil, clues.New("expected deserialized item to implement models.Chatable")
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
members = chat.GetMembers()
|
||||||
|
messages = chat.GetMessages()
|
||||||
|
)
|
||||||
|
|
||||||
|
result := minimumChat{
|
||||||
|
CreatedDateTime: ptr.Val(chat.GetCreatedDateTime()),
|
||||||
|
LastUpdatedDateTime: ptr.Val(chat.GetLastUpdatedDateTime()),
|
||||||
|
Topic: ptr.Val(chat.GetTopic()),
|
||||||
|
Members: make([]minimumChatMember, 0, len(members)),
|
||||||
|
Messages: make([]minimumChatMessage, 0, len(messages)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, r := range messages {
|
||||||
|
result.Messages = append(result.Messages, makeMinimumChatMessage(r))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, r := range members {
|
||||||
|
result.Members = append(result.Members, makeMinimumChatMember(r))
|
||||||
|
}
|
||||||
|
|
||||||
|
bs, err = marshalJSONContainingHTML(result)
|
||||||
|
if err != nil {
|
||||||
|
return nil, clues.Wrap(err, "serializing minimized chat")
|
||||||
|
}
|
||||||
|
|
||||||
|
return io.NopCloser(bytes.NewReader(bs)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeMinimumChatMessage(item models.ChatMessageable) minimumChatMessage {
|
||||||
|
var content string
|
||||||
|
|
||||||
|
if item.GetBody() != nil {
|
||||||
|
content = ptr.Val(item.GetBody().GetContent())
|
||||||
|
}
|
||||||
|
|
||||||
|
attachments := item.GetAttachments()
|
||||||
|
minAttachments := make([]minimumAttachment, 0, len(attachments))
|
||||||
|
|
||||||
|
for _, a := range attachments {
|
||||||
|
minAttachments = append(minAttachments, minimumAttachment{
|
||||||
|
ID: ptr.Val(a.GetId()),
|
||||||
|
Name: ptr.Val(a.GetName()),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
var isDeleted bool
|
||||||
|
|
||||||
|
deletedAt, ok := ptr.ValOK(item.GetDeletedDateTime())
|
||||||
|
isDeleted = ok && deletedAt.After(time.Time{})
|
||||||
|
|
||||||
|
return minimumChatMessage{
|
||||||
|
Attachments: minAttachments,
|
||||||
|
Content: content,
|
||||||
|
CreatedDateTime: ptr.Val(item.GetCreatedDateTime()),
|
||||||
|
From: api.GetChatMessageFrom(item),
|
||||||
|
LastModifiedDateTime: ptr.Val(item.GetLastModifiedDateTime()),
|
||||||
|
IsDeleted: isDeleted,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeMinimumChatMember(item models.ConversationMemberable) minimumChatMember {
|
||||||
|
return minimumChatMember{
|
||||||
|
Name: ptr.Val(item.GetDisplayName()),
|
||||||
|
VisibleHistoryStartedAt: ptr.Val(item.GetVisibleHistoryStartDateTime()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// json.Marshal will replace many markup tags (ex: "<" and ">") with their unicode
|
||||||
|
// equivalent. In order to maintain parity with original content that contains html,
|
||||||
|
// we have to use this alternative encoding behavior.
|
||||||
|
// https://stackoverflow.com/questions/28595664/how-to-stop-json-marshal-from-escaping-and
|
||||||
|
func marshalJSONContainingHTML(a any) ([]byte, error) {
|
||||||
|
buffer := &bytes.Buffer{}
|
||||||
|
|
||||||
|
encoder := json.NewEncoder(buffer)
|
||||||
|
encoder.SetEscapeHTML(false)
|
||||||
|
|
||||||
|
err := encoder.Encode(a)
|
||||||
|
|
||||||
|
return buffer.Bytes(), clues.Stack(err).OrNil()
|
||||||
|
}
|
||||||
115
src/internal/m365/collection/teamsChats/export_test.go
Normal file
115
src/internal/m365/collection/teamsChats/export_test.go
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
package teamschats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/alcionai/clues"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
|
dataMock "github.com/alcionai/corso/src/internal/data/mock"
|
||||||
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
|
"github.com/alcionai/corso/src/internal/version"
|
||||||
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
|
"github.com/alcionai/corso/src/pkg/export"
|
||||||
|
"github.com/alcionai/corso/src/pkg/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ExportUnitSuite struct {
|
||||||
|
tester.Suite
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExportUnitSuite(t *testing.T) {
|
||||||
|
suite.Run(t, &ExportUnitSuite{Suite: tester.NewUnitSuite(t)})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *ExportUnitSuite) TestStreamItems() {
|
||||||
|
makeBody := func() io.ReadCloser {
|
||||||
|
return io.NopCloser(bytes.NewReader([]byte("{}")))
|
||||||
|
}
|
||||||
|
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
backingColl dataMock.Collection
|
||||||
|
expectName string
|
||||||
|
expectErr assert.ErrorAssertionFunc
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "no errors",
|
||||||
|
backingColl: dataMock.Collection{
|
||||||
|
ItemData: []data.Item{
|
||||||
|
&dataMock.Item{
|
||||||
|
ItemID: "zim",
|
||||||
|
Reader: makeBody(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectName: "zim.json",
|
||||||
|
expectErr: assert.NoError,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "only recoverable errors",
|
||||||
|
backingColl: dataMock.Collection{
|
||||||
|
ItemsRecoverableErrs: []error{
|
||||||
|
clues.New("The knowledge... it fills me! It is neat!"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectErr: assert.Error,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "items and recoverable errors",
|
||||||
|
backingColl: dataMock.Collection{
|
||||||
|
ItemData: []data.Item{
|
||||||
|
&dataMock.Item{
|
||||||
|
ItemID: "gir",
|
||||||
|
Reader: makeBody(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ItemsRecoverableErrs: []error{
|
||||||
|
clues.New("I miss my cupcake."),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectName: "gir.json",
|
||||||
|
expectErr: assert.Error,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range table {
|
||||||
|
suite.Run(test.name, func() {
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
|
ctx, flush := tester.NewContext(t)
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
ch := make(chan export.Item)
|
||||||
|
|
||||||
|
go streamItems(
|
||||||
|
ctx,
|
||||||
|
[]data.RestoreCollection{test.backingColl},
|
||||||
|
version.NoBackup,
|
||||||
|
control.DefaultExportConfig(),
|
||||||
|
ch,
|
||||||
|
&metrics.ExportStats{})
|
||||||
|
|
||||||
|
var (
|
||||||
|
itm export.Item
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
for i := range ch {
|
||||||
|
if i.Error == nil {
|
||||||
|
itm = i
|
||||||
|
} else {
|
||||||
|
err = i.Error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test.expectErr(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
|
assert.Equal(t, test.expectName, itm.Name, "item name")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user