add item fetching and serialization to api (#2150)

## Description

Adds the per item collection streaming calls to
the api interface.  Primarily migrates a "getItem"
and a "serializeItem" acton into the api pkg, out
from exchange_data_collection.  Building an
ExchangeInfo is now also housed in api.

## Does this PR need a docs update or release note?

- [x]  No 

## Type of change

- [x] 🧹 Tech Debt/Cleanup

## Issue(s)

* #1996

## Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-01-17 13:27:37 -07:00 committed by GitHub
parent 77b2cd604d
commit 1ef300d6c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 492 additions and 529 deletions

View File

@ -5,11 +5,11 @@
package main
import (
"bytes"
"context"
"fmt"
"os"
"github.com/microsoft/kiota-abstractions-go/serialization"
kw "github.com/microsoft/kiota-serialization-json-go"
"github.com/pkg/errors"
"github.com/spf13/cobra"
@ -18,12 +18,10 @@ import (
"github.com/alcionai/corso/src/cli/utils"
"github.com/alcionai/corso/src/internal/common"
"github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/exchange"
"github.com/alcionai/corso/src/internal/connector/exchange/api"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/credentials"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
@ -77,12 +75,12 @@ func handleGetCommand(cmd *cobra.Command, args []string) error {
return nil
}
gc, creds, err := getGC(ctx)
_, creds, err := getGC(ctx)
if err != nil {
return err
}
err = runDisplayM365JSON(ctx, gc.Service, creds)
err = runDisplayM365JSON(ctx, creds, user, m365ID)
if err != nil {
return Only(ctx, errors.Wrapf(err, "unable to create mock from M365: %s", m365ID))
}
@ -92,13 +90,14 @@ func handleGetCommand(cmd *cobra.Command, args []string) error {
func runDisplayM365JSON(
ctx context.Context,
gs graph.Servicer,
creds account.M365Config,
user, itemID string,
) error {
var (
get api.GraphRetrievalFunc
serializeFunc exchange.GraphSerializeFunc
cat = graph.StringToPathCategory(category)
bs []byte
err error
cat = graph.StringToPathCategory(category)
sw = kw.NewJsonSerializationWriter()
)
ac, err := api.NewClient(creds)
@ -107,58 +106,60 @@ func runDisplayM365JSON(
}
switch cat {
case path.EmailCategory, path.EventsCategory, path.ContactsCategory:
get, serializeFunc = exchange.GetQueryAndSerializeFunc(ac, cat)
case path.EmailCategory:
bs, err = getItem(ctx, ac.Mail(), user, itemID)
case path.EventsCategory:
bs, err = getItem(ctx, ac.Events(), user, itemID)
case path.ContactsCategory:
bs, err = getItem(ctx, ac.Contacts(), user, itemID)
default:
return fmt.Errorf("unable to process category: %s", cat)
}
channel := make(chan data.Stream, 1)
sw := kw.NewJsonSerializationWriter()
response, err := get(ctx, user, m365ID)
if err != nil {
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
}
// First return is the number of bytes that were serialized. Ignored
_, err = serializeFunc(ctx, gs.Client(), sw, channel, response, user)
close(channel)
if err != nil {
return err
}
for item := range channel {
buf := &bytes.Buffer{}
str := string(bs)
_, err := buf.ReadFrom(item.ToReader())
if err != nil {
return errors.Wrapf(err, "unable to parse given data: %s", m365ID)
}
byteArray := buf.Bytes()
newValue := string(byteArray)
err = sw.WriteStringValue("", &newValue)
if err != nil {
return errors.Wrapf(err, "unable to %s to string value", m365ID)
}
array, err := sw.GetSerializedContent()
if err != nil {
return errors.Wrapf(err, "unable to serialize new value from M365:%s", m365ID)
}
fmt.Println(string(array))
//lint:ignore SA4004 only expecting one item
return nil
err = sw.WriteStringValue("", &str)
if err != nil {
return errors.Wrapf(err, "unable to %s to string value", itemID)
}
// This should never happen
return errors.New("m365 object not serialized")
array, err := sw.GetSerializedContent()
if err != nil {
return errors.Wrapf(err, "unable to serialize new value from M365:%s", itemID)
}
fmt.Println(string(array))
return nil
}
type itemer interface {
GetItem(
ctx context.Context,
user, itemID string,
) (serialization.Parsable, *details.ExchangeInfo, error)
Serialize(
ctx context.Context,
item serialization.Parsable,
user, itemID string,
) ([]byte, error)
}
func getItem(
ctx context.Context,
itm itemer,
user, itemID string,
) ([]byte, error) {
sp, _, err := itm.GetItem(ctx, user, itemID)
if err != nil {
return nil, errors.Wrap(err, "getting item")
}
return itm.Serialize(ctx, sp, user, itemID)
}
//-------------------------------------------------------------------------------

View File

@ -2,6 +2,7 @@ package api
import (
"context"
"time"
"github.com/microsoft/kiota-abstractions-go/serialization"
"github.com/pkg/errors"
@ -11,9 +12,11 @@ import (
)
// ---------------------------------------------------------------------------
// common types
// common types and consts
// ---------------------------------------------------------------------------
const numberOfRetries = 3
// DeltaUpdate holds the results of a current delta token. It normally
// gets produced when aggregating the addition and removal of items in
// a delta-queriable folder.
@ -106,3 +109,11 @@ func checkIDAndName(c graph.Container) error {
return nil
}
func orNow(t *time.Time) time.Time {
if t == nil {
return time.Now().UTC()
}
return *t
}

View File

@ -2,15 +2,19 @@ package api
import (
"context"
"fmt"
"time"
"github.com/hashicorp/go-multierror"
"github.com/microsoft/kiota-abstractions-go/serialization"
kioser "github.com/microsoft/kiota-serialization-json-go"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/microsoftgraph/msgraph-sdk-go/users"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/pkg/backup/details"
)
// ---------------------------------------------------------------------------
@ -52,12 +56,17 @@ func (c Contacts) DeleteContactFolder(
return c.stable.Client().UsersById(user).ContactFoldersById(folderID).Delete(ctx, nil)
}
// RetrieveContactDataForUser is a GraphRetrievalFun that returns all associated fields.
func (c Contacts) RetrieveContactDataForUser(
// GetItem retrieves a Contactable item.
func (c Contacts) GetItem(
ctx context.Context,
user, m365ID string,
) (serialization.Parsable, error) {
return c.stable.Client().UsersById(user).ContactsById(m365ID).Get(ctx, nil)
user, itemID string,
) (serialization.Parsable, *details.ExchangeInfo, error) {
cont, err := c.stable.Client().UsersById(user).ContactsById(itemID).Get(ctx, nil)
if err != nil {
return nil, nil, err
}
return cont, ContactInfo(cont), nil
}
// GetAllContactFolderNamesForUser is a GraphQuery function for getting
@ -224,3 +233,61 @@ func (c Contacts) GetAddedAndRemovedItemIDs(
return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
}
// ---------------------------------------------------------------------------
// Serialization
// ---------------------------------------------------------------------------
// Serialize rserializes the item into a byte slice.
func (c Contacts) Serialize(
ctx context.Context,
item serialization.Parsable,
user, itemID string,
) ([]byte, error) {
contact, ok := item.(models.Contactable)
if !ok {
return nil, fmt.Errorf("expected Contactable, got %T", item)
}
var (
err error
writer = kioser.NewJsonSerializationWriter()
)
defer writer.Close()
if err = writer.WriteObjectValue("", contact); err != nil {
return nil, support.SetNonRecoverableError(errors.Wrap(err, itemID))
}
bs, err := writer.GetSerializedContent()
if err != nil {
return nil, errors.Wrap(err, "serializing contact")
}
return bs, nil
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
func ContactInfo(contact models.Contactable) *details.ExchangeInfo {
name := ""
created := time.Time{}
if contact.GetDisplayName() != nil {
name = *contact.GetDisplayName()
}
if contact.GetCreatedDateTime() != nil {
created = *contact.GetCreatedDateTime()
}
return &details.ExchangeInfo{
ItemType: details.ExchangeContact,
ContactName: name,
Created: created,
Modified: orNow(contact.GetLastModifiedDateTime()),
}
}

View File

@ -1,4 +1,4 @@
package exchange
package api
import (
"testing"
@ -11,15 +11,15 @@ import (
"github.com/alcionai/corso/src/pkg/backup/details"
)
type ContactSuite struct {
type ContactsAPIUnitSuite struct {
suite.Suite
}
func TestContactSuite(t *testing.T) {
suite.Run(t, &ContactSuite{})
func TestContactsAPIUnitSuite(t *testing.T) {
suite.Run(t, new(ContactsAPIUnitSuite))
}
func (suite *ContactSuite) TestContactInfo() {
func (suite *ContactsAPIUnitSuite) TestContactInfo() {
initial := time.Now()
tests := []struct {
@ -37,7 +37,6 @@ func (suite *ContactSuite) TestContactInfo() {
ItemType: details.ExchangeContact,
Created: initial,
Modified: initial,
Size: 10,
}
return contact, i
},
@ -54,7 +53,6 @@ func (suite *ContactSuite) TestContactInfo() {
ContactName: aPerson,
Created: initial,
Modified: initial,
Size: 10,
}
return contact, i
},
@ -63,7 +61,7 @@ func (suite *ContactSuite) TestContactInfo() {
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
contact, expected := test.contactAndRP()
assert.Equal(t, expected, ContactInfo(contact, 10))
assert.Equal(t, expected, ContactInfo(contact))
})
}
}

View File

@ -2,15 +2,21 @@ package api
import (
"context"
"fmt"
"time"
"github.com/hashicorp/go-multierror"
"github.com/microsoft/kiota-abstractions-go/serialization"
kioser "github.com/microsoft/kiota-serialization-json-go"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/microsoftgraph/msgraph-sdk-go/users"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/common"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
)
@ -52,12 +58,17 @@ func (c Events) DeleteCalendar(
return c.stable.Client().UsersById(user).CalendarsById(calendarID).Delete(ctx, nil)
}
// RetrieveEventDataForUser is a GraphRetrievalFunc that returns event data.
func (c Events) RetrieveEventDataForUser(
// GetItem retrieves an Eventable item.
func (c Events) GetItem(
ctx context.Context,
user, m365ID string,
) (serialization.Parsable, error) {
return c.stable.Client().UsersById(user).EventsById(m365ID).Get(ctx, nil)
user, itemID string,
) (serialization.Parsable, *details.ExchangeInfo, error) {
evt, err := c.stable.Client().UsersById(user).EventsById(itemID).Get(ctx, nil)
if err != nil {
return nil, nil, err
}
return evt, EventInfo(evt), nil
}
func (c Client) GetAllCalendarNamesForUser(
@ -190,6 +201,66 @@ func (c Events) GetAddedAndRemovedItemIDs(
return added, nil, DeltaUpdate{}, errs.ErrorOrNil()
}
// ---------------------------------------------------------------------------
// Serialization
// ---------------------------------------------------------------------------
// Serialize retrieves attachment data identified by the event item, and then
// serializes it into a byte slice.
func (c Events) Serialize(
ctx context.Context,
item serialization.Parsable,
user, itemID string,
) ([]byte, error) {
event, ok := item.(models.Eventable)
if !ok {
return nil, fmt.Errorf("expected Eventable, got %T", item)
}
var (
err error
writer = kioser.NewJsonSerializationWriter()
)
defer writer.Close()
if *event.GetHasAttachments() {
// getting all the attachments might take a couple attempts due to filesize
var retriesErr error
for count := 0; count < numberOfRetries; count++ {
attached, err := c.stable.
Client().
UsersById(user).
EventsById(itemID).
Attachments().
Get(ctx, nil)
retriesErr = err
if err == nil {
event.SetAttachments(attached.GetValue())
break
}
}
if retriesErr != nil {
logger.Ctx(ctx).Debug("exceeded maximum retries")
return nil, support.WrapAndAppend(itemID, errors.Wrap(retriesErr, "attachment failed"), nil)
}
}
if err = writer.WriteObjectValue("", event); err != nil {
return nil, support.SetNonRecoverableError(errors.Wrap(err, itemID))
}
bs, err := writer.GetSerializedContent()
if err != nil {
return nil, errors.Wrap(err, "serializing calendar event")
}
return bs, nil
}
// ---------------------------------------------------------------------------
// helper funcs
// ---------------------------------------------------------------------------
@ -216,3 +287,68 @@ func (c CalendarDisplayable) GetDisplayName() *string {
func (c CalendarDisplayable) GetParentFolderId() *string {
return nil
}
func EventInfo(evt models.Eventable) *details.ExchangeInfo {
var (
organizer, subject string
recurs bool
start = time.Time{}
end = time.Time{}
created = time.Time{}
)
if evt.GetOrganizer() != nil &&
evt.GetOrganizer().GetEmailAddress() != nil &&
evt.GetOrganizer().GetEmailAddress().GetAddress() != nil {
organizer = *evt.GetOrganizer().
GetEmailAddress().
GetAddress()
}
if evt.GetSubject() != nil {
subject = *evt.GetSubject()
}
if evt.GetRecurrence() != nil {
recurs = true
}
if evt.GetStart() != nil &&
evt.GetStart().GetDateTime() != nil {
// timeString has 'Z' literal added to ensure the stored
// DateTime is not: time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
startTime := *evt.GetStart().GetDateTime() + "Z"
output, err := common.ParseTime(startTime)
if err == nil {
start = output
}
}
if evt.GetEnd() != nil &&
evt.GetEnd().GetDateTime() != nil {
// timeString has 'Z' literal added to ensure the stored
// DateTime is not: time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
endTime := *evt.GetEnd().GetDateTime() + "Z"
output, err := common.ParseTime(endTime)
if err == nil {
end = output
}
}
if evt.GetCreatedDateTime() != nil {
created = *evt.GetCreatedDateTime()
}
return &details.ExchangeInfo{
ItemType: details.ExchangeEvent,
Organizer: organizer,
Subject: subject,
EventStart: start,
EventEnd: end,
EventRecurs: recurs,
Created: created,
Modified: orNow(evt.GetLastModifiedDateTime()),
}
}

View File

@ -1,4 +1,4 @@
package exchange
package api
import (
"testing"
@ -15,17 +15,17 @@ import (
"github.com/alcionai/corso/src/pkg/backup/details"
)
type EventSuite struct {
type EventsAPIUnitSuite struct {
suite.Suite
}
func TestEventSuite(t *testing.T) {
suite.Run(t, &EventSuite{})
func TestEventsAPIUnitSuite(t *testing.T) {
suite.Run(t, new(EventsAPIUnitSuite))
}
// TestEventInfo verifies that searchable event metadata
// can be properly retrieved from a models.Eventable object
func (suite *EventSuite) TestEventInfo() {
func (suite *EventsAPIUnitSuite) TestEventInfo() {
// Exchange stores start/end times in UTC and the below compares hours
// directly so we need to "normalize" the timezone here.
initial := time.Now().UTC()
@ -136,7 +136,6 @@ func (suite *EventSuite) TestEventInfo() {
Organizer: organizer,
EventStart: eventTime,
EventEnd: eventEndTime,
Size: 10,
}
},
},
@ -144,7 +143,7 @@ func (suite *EventSuite) TestEventInfo() {
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
event, expected := test.evtAndRP()
result := EventInfo(event, 10)
result := EventInfo(event)
assert.Equal(t, expected.Subject, result.Subject, "subject")
assert.Equal(t, expected.Sender, result.Sender, "sender")

View File

@ -2,15 +2,20 @@ package api
import (
"context"
"fmt"
"time"
"github.com/hashicorp/go-multierror"
"github.com/microsoft/kiota-abstractions-go/serialization"
kioser "github.com/microsoft/kiota-serialization-json-go"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/microsoftgraph/msgraph-sdk-go/users"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/logger"
)
// ---------------------------------------------------------------------------
@ -92,12 +97,17 @@ func (c Mail) GetContainerByID(
return service.Client().UsersById(userID).MailFoldersById(dirID).Get(ctx, ofmf)
}
// RetrieveMessageDataForUser is a GraphRetrievalFunc that returns message data.
func (c Mail) RetrieveMessageDataForUser(
// GetItem retrieves a Messageable item.
func (c Mail) GetItem(
ctx context.Context,
user, m365ID string,
) (serialization.Parsable, error) {
return c.stable.Client().UsersById(user).MessagesById(m365ID).Get(ctx, nil)
user, itemID string,
) (serialization.Parsable, *details.ExchangeInfo, error) {
mail, err := c.stable.Client().UsersById(user).MessagesById(itemID).Get(ctx, nil)
if err != nil {
return nil, nil, err
}
return mail, MailInfo(mail), nil
}
// EnumerateContainers iterates through all of the users current
@ -223,3 +233,101 @@ func (c Mail) GetAddedAndRemovedItemIDs(
return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
}
// ---------------------------------------------------------------------------
// Serialization
// ---------------------------------------------------------------------------
// Serialize retrieves attachment data identified by the mail item, and then
// serializes it into a byte slice.
func (c Mail) Serialize(
ctx context.Context,
item serialization.Parsable,
user, itemID string,
) ([]byte, error) {
msg, ok := item.(models.Messageable)
if !ok {
return nil, fmt.Errorf("expected Messageable, got %T", item)
}
var (
err error
writer = kioser.NewJsonSerializationWriter()
)
defer writer.Close()
if *msg.GetHasAttachments() {
// getting all the attachments might take a couple attempts due to filesize
var retriesErr error
for count := 0; count < numberOfRetries; count++ {
attached, err := c.stable.
Client().
UsersById(user).
MessagesById(itemID).
Attachments().
Get(ctx, nil)
retriesErr = err
if err == nil {
msg.SetAttachments(attached.GetValue())
break
}
}
if retriesErr != nil {
logger.Ctx(ctx).Debug("exceeded maximum retries")
return nil, support.WrapAndAppend(itemID, errors.Wrap(retriesErr, "attachment failed"), nil)
}
}
if err = writer.WriteObjectValue("", msg); err != nil {
return nil, support.SetNonRecoverableError(errors.Wrap(err, itemID))
}
bs, err := writer.GetSerializedContent()
if err != nil {
return nil, errors.Wrap(err, "serializing email")
}
return bs, nil
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
func MailInfo(msg models.Messageable) *details.ExchangeInfo {
sender := ""
subject := ""
received := time.Time{}
created := time.Time{}
if msg.GetSender() != nil &&
msg.GetSender().GetEmailAddress() != nil &&
msg.GetSender().GetEmailAddress().GetAddress() != nil {
sender = *msg.GetSender().GetEmailAddress().GetAddress()
}
if msg.GetSubject() != nil {
subject = *msg.GetSubject()
}
if msg.GetReceivedDateTime() != nil {
received = *msg.GetReceivedDateTime()
}
if msg.GetCreatedDateTime() != nil {
created = *msg.GetCreatedDateTime()
}
return &details.ExchangeInfo{
ItemType: details.ExchangeMail,
Sender: sender,
Subject: subject,
Received: received,
Created: created,
Modified: orNow(msg.GetLastModifiedDateTime()),
}
}

View File

@ -1,4 +1,4 @@
package exchange
package api
import (
"testing"
@ -10,15 +10,15 @@ import (
"github.com/alcionai/corso/src/pkg/backup/details"
)
type MessageSuite struct {
type MailAPIUnitSuite struct {
suite.Suite
}
func TestMessageSuite(t *testing.T) {
suite.Run(t, &MessageSuite{})
func TestMailAPIUnitSuite(t *testing.T) {
suite.Run(t, new(MailAPIUnitSuite))
}
func (suite *MessageSuite) TestMessageInfo() {
func (suite *MailAPIUnitSuite) TestMailInfo() {
initial := time.Now()
tests := []struct {
@ -36,7 +36,6 @@ func (suite *MessageSuite) TestMessageInfo() {
ItemType: details.ExchangeMail,
Created: initial,
Modified: initial,
Size: 10,
}
return msg, i
},
@ -58,7 +57,6 @@ func (suite *MessageSuite) TestMessageInfo() {
Sender: sender,
Created: initial,
Modified: initial,
Size: 10,
}
return msg, i
},
@ -76,7 +74,6 @@ func (suite *MessageSuite) TestMessageInfo() {
Subject: subject,
Created: initial,
Modified: initial,
Size: 10,
}
return msg, i
},
@ -94,7 +91,6 @@ func (suite *MessageSuite) TestMessageInfo() {
Received: now,
Created: initial,
Modified: initial,
Size: 10,
}
return msg, i
},
@ -122,7 +118,6 @@ func (suite *MessageSuite) TestMessageInfo() {
Received: now,
Created: initial,
Modified: initial,
Size: 10,
}
return msg, i
},
@ -131,7 +126,7 @@ func (suite *MessageSuite) TestMessageInfo() {
for _, tt := range tests {
suite.T().Run(tt.name, func(t *testing.T) {
msg, expected := tt.msgAndRP()
suite.Equal(expected, MessageInfo(msg, 10))
suite.Equal(expected, MailInfo(msg))
})
}
}

View File

@ -1,36 +0,0 @@
package exchange
import (
"time"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/pkg/backup/details"
)
// ContactInfo translate models.Contactable metadata into searchable content
func ContactInfo(contact models.Contactable, size int64) *details.ExchangeInfo {
name := ""
created := time.Time{}
modified := time.Time{}
if contact.GetDisplayName() != nil {
name = *contact.GetDisplayName()
}
if contact.GetCreatedDateTime() != nil {
created = *contact.GetCreatedDateTime()
}
if contact.GetLastModifiedDateTime() != nil {
modified = *contact.GetLastModifiedDateTime()
}
return &details.ExchangeInfo{
ItemType: details.ExchangeContact,
ContactName: name,
Created: created,
Modified: modified,
Size: size,
}
}

View File

@ -1,82 +0,0 @@
package exchange
import (
"time"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common"
"github.com/alcionai/corso/src/pkg/backup/details"
)
// EventInfo searchable metadata for stored event objects.
func EventInfo(evt models.Eventable, size int64) *details.ExchangeInfo {
var (
organizer, subject string
recurs bool
start = time.Time{}
end = time.Time{}
created = time.Time{}
modified = time.Time{}
)
if evt.GetOrganizer() != nil &&
evt.GetOrganizer().GetEmailAddress() != nil &&
evt.GetOrganizer().GetEmailAddress().GetAddress() != nil {
organizer = *evt.GetOrganizer().
GetEmailAddress().
GetAddress()
}
if evt.GetSubject() != nil {
subject = *evt.GetSubject()
}
if evt.GetRecurrence() != nil {
recurs = true
}
if evt.GetStart() != nil &&
evt.GetStart().GetDateTime() != nil {
// timeString has 'Z' literal added to ensure the stored
// DateTime is not: time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
startTime := *evt.GetStart().GetDateTime() + "Z"
output, err := common.ParseTime(startTime)
if err == nil {
start = output
}
}
if evt.GetEnd() != nil &&
evt.GetEnd().GetDateTime() != nil {
// timeString has 'Z' literal added to ensure the stored
// DateTime is not: time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)
endTime := *evt.GetEnd().GetDateTime() + "Z"
output, err := common.ParseTime(endTime)
if err == nil {
end = output
}
}
if evt.GetCreatedDateTime() != nil {
created = *evt.GetCreatedDateTime()
}
if evt.GetLastModifiedDateTime() != nil {
modified = *evt.GetLastModifiedDateTime()
}
return &details.ExchangeInfo{
ItemType: details.ExchangeEvent,
Organizer: organizer,
Subject: subject,
EventStart: start,
EventEnd: end,
EventRecurs: recurs,
Created: created,
Modified: modified,
Size: size,
}
}

View File

@ -6,20 +6,13 @@ package exchange
import (
"bytes"
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
absser "github.com/microsoft/kiota-abstractions-go/serialization"
kioser "github.com/microsoft/kiota-serialization-json-go"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/pkg/errors"
"github.com/microsoft/kiota-abstractions-go/serialization"
"github.com/alcionai/corso/src/internal/connector/exchange/api"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/observe"
@ -45,6 +38,18 @@ const (
urlPrefetchChannelBufferSize = 4
)
type itemer interface {
GetItem(
ctx context.Context,
user, itemID string,
) (serialization.Parsable, *details.ExchangeInfo, error)
Serialize(
ctx context.Context,
item serialization.Parsable,
user, itemID string,
) ([]byte, error)
}
// Collection implements the interface from data.Collection
// Structure holds data for an Exchange application for a single user
type Collection struct {
@ -57,9 +62,7 @@ type Collection struct {
// removed is a list of item IDs that were deleted from, or moved out, of a container
removed map[string]struct{}
// service - client/adapter pair used to access M365 back store
service graph.Servicer
ac api.Client
items itemer
category path.CategoryType
statusUpdater support.StatusUpdater
@ -89,14 +92,12 @@ func NewCollection(
user string,
curr, prev path.Path,
category path.CategoryType,
ac api.Client,
service graph.Servicer,
items itemer,
statusUpdater support.StatusUpdater,
ctrlOpts control.Options,
doNotMergeItems bool,
) Collection {
collection := Collection{
ac: ac,
category: category,
ctrl: ctrlOpts,
data: make(chan data.Stream, collectionChannelBufferSize),
@ -105,10 +106,10 @@ func NewCollection(
added: make(map[string]struct{}, 0),
removed: make(map[string]struct{}, 0),
prevPath: prev,
service: service,
state: stateOf(prev, curr),
statusUpdater: statusUpdater,
user: user,
items: items,
}
return collection
@ -137,22 +138,6 @@ func (col *Collection) Items() <-chan data.Stream {
return col.data
}
// GetQueryAndSerializeFunc helper function that returns the two functions functions
// required to convert M365 identifier into a byte array filled with the serialized data
func GetQueryAndSerializeFunc(ac api.Client, category path.CategoryType) (api.GraphRetrievalFunc, GraphSerializeFunc) {
switch category {
case path.ContactsCategory:
return ac.Contacts().RetrieveContactDataForUser, serializeAndStreamContact
case path.EventsCategory:
return ac.Events().RetrieveEventDataForUser, serializeAndStreamEvent
case path.EmailCategory:
return ac.Mail().RetrieveMessageDataForUser, serializeAndStreamMessage
// Unsupported options returns nil, nil
default:
return nil, nil
}
}
// FullPath returns the Collection's fullPath []string
func (col *Collection) FullPath() path.Path {
return col.fullPath
@ -208,15 +193,6 @@ func (col *Collection) streamItems(ctx context.Context) {
}()
}
// get QueryBasedonIdentifier
// verify that it is the correct type in called function
// serializationFunction
query, serializeFunc := GetQueryAndSerializeFunc(col.ac, col.category)
if query == nil {
errs = fmt.Errorf("unrecognized collection type: %s", col.category)
return
}
// Limit the max number of active requests to GC
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
defer close(semaphoreCh)
@ -265,16 +241,17 @@ func (col *Collection) streamItems(ctx context.Context) {
defer func() { <-semaphoreCh }()
var (
response absser.Parsable
err error
item serialization.Parsable
info *details.ExchangeInfo
err error
)
for i := 1; i <= numberOfRetries; i++ {
response, err = query(ctx, user, id)
item, info, err = col.items.GetItem(ctx, user, id)
if err == nil {
break
}
// TODO: Tweak sleep times
if i < numberOfRetries {
time.Sleep(time.Duration(3*(i+1)) * time.Second)
}
@ -285,20 +262,23 @@ func (col *Collection) streamItems(ctx context.Context) {
return
}
byteCount, err := serializeFunc(
ctx,
col.service.Client(),
kioser.NewJsonSerializationWriter(),
col.data,
response,
user)
data, err := col.items.Serialize(ctx, item, user, id)
if err != nil {
errUpdater(user, err)
return
}
info.Size = int64(len(data))
col.data <- &Stream{
id: id,
message: data,
info: info,
modTime: info.Modified,
}
atomic.AddInt64(&success, 1)
atomic.AddInt64(&totalBytes, int64(byteCount))
atomic.AddInt64(&totalBytes, info.Size)
if colProgress != nil {
colProgress <- struct{}{}
@ -328,200 +308,6 @@ func (col *Collection) finishPopulation(ctx context.Context, success int, totalB
col.statusUpdater(status)
}
type modTimer interface {
GetLastModifiedDateTime() *time.Time
}
func getModTime(mt modTimer) time.Time {
res := time.Now().UTC()
if t := mt.GetLastModifiedDateTime(); t != nil {
res = *t
}
return res
}
// GraphSerializeFunc are class of functions that are used by Collections to transform GraphRetrievalFunc
// responses into data.Stream items contained within the Collection
type GraphSerializeFunc func(
ctx context.Context,
client *msgraphsdk.GraphServiceClient,
objectWriter *kioser.JsonSerializationWriter,
dataChannel chan<- data.Stream,
parsable absser.Parsable,
user string,
) (int, error)
// serializeAndStreamEvent is a GraphSerializeFunc used to serialize models.Eventable objects into
// data.Stream objects. Returns an error the process finishes unsuccessfully.
func serializeAndStreamEvent(
ctx context.Context,
client *msgraphsdk.GraphServiceClient,
objectWriter *kioser.JsonSerializationWriter,
dataChannel chan<- data.Stream,
parsable absser.Parsable,
user string,
) (int, error) {
var err error
defer objectWriter.Close()
event, ok := parsable.(models.Eventable)
if !ok {
return 0, fmt.Errorf("expected Eventable, got %T", parsable)
}
if *event.GetHasAttachments() {
var retriesErr error
for count := 0; count < numberOfRetries; count++ {
attached, err := client.
UsersById(user).
EventsById(*event.GetId()).
Attachments().
Get(ctx, nil)
retriesErr = err
if err == nil && attached != nil {
event.SetAttachments(attached.GetValue())
break
}
}
if retriesErr != nil {
logger.Ctx(ctx).Debug("exceeded maximum retries")
return 0, support.WrapAndAppend(
*event.GetId(),
errors.Wrap(retriesErr, "attachment failed"),
nil)
}
}
err = objectWriter.WriteObjectValue("", event)
if err != nil {
return 0, support.SetNonRecoverableError(errors.Wrap(err, *event.GetId()))
}
byteArray, err := objectWriter.GetSerializedContent()
if err != nil {
return 0, support.WrapAndAppend(*event.GetId(), errors.Wrap(err, "serializing content"), nil)
}
if len(byteArray) > 0 {
dataChannel <- &Stream{
id: *event.GetId(),
message: byteArray,
info: EventInfo(event, int64(len(byteArray))),
modTime: getModTime(event),
}
}
return len(byteArray), nil
}
// serializeAndStreamContact is a GraphSerializeFunc for models.Contactable
func serializeAndStreamContact(
ctx context.Context,
client *msgraphsdk.GraphServiceClient,
objectWriter *kioser.JsonSerializationWriter,
dataChannel chan<- data.Stream,
parsable absser.Parsable,
user string,
) (int, error) {
defer objectWriter.Close()
contact, ok := parsable.(models.Contactable)
if !ok {
return 0, fmt.Errorf("expected Contactable, got %T", parsable)
}
err := objectWriter.WriteObjectValue("", contact)
if err != nil {
return 0, support.SetNonRecoverableError(errors.Wrap(err, *contact.GetId()))
}
bs, err := objectWriter.GetSerializedContent()
if err != nil {
return 0, support.WrapAndAppend(*contact.GetId(), err, nil)
}
if len(bs) > 0 {
dataChannel <- &Stream{
id: *contact.GetId(),
message: bs,
info: ContactInfo(contact, int64(len(bs))),
modTime: getModTime(contact),
}
}
return len(bs), nil
}
// serializeAndStreamMessage is the GraphSerializeFunc for models.Messageable
func serializeAndStreamMessage(
ctx context.Context,
client *msgraphsdk.GraphServiceClient,
objectWriter *kioser.JsonSerializationWriter,
dataChannel chan<- data.Stream,
parsable absser.Parsable,
user string,
) (int, error) {
var err error
defer objectWriter.Close()
msg, ok := parsable.(models.Messageable)
if !ok {
return 0, fmt.Errorf("expected Messageable, got %T", parsable)
}
if *msg.GetHasAttachments() {
// getting all the attachments might take a couple attempts due to filesize
var retriesErr error
for count := 0; count < numberOfRetries; count++ {
attached, err := client.
UsersById(user).
MessagesById(*msg.GetId()).
Attachments().
Get(ctx, nil)
retriesErr = err
if err == nil {
msg.SetAttachments(attached.GetValue())
break
}
}
if retriesErr != nil {
logger.Ctx(ctx).Debug("exceeded maximum retries")
return 0, support.WrapAndAppend(*msg.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil)
}
}
err = objectWriter.WriteObjectValue("", msg)
if err != nil {
return 0, support.SetNonRecoverableError(errors.Wrapf(err, "%s", *msg.GetId()))
}
bs, err := objectWriter.GetSerializedContent()
if err != nil {
err = support.WrapAndAppend(*msg.GetId(), errors.Wrap(err, "serializing mail content"), nil)
return 0, support.SetNonRecoverableError(err)
}
dataChannel <- &Stream{
id: *msg.GetId(),
message: bs,
info: MessageInfo(msg, int64(len(bs))),
modTime: getModTime(msg),
}
return len(bs), nil
}
// Stream represents a single item retrieved from exchange
type Stream struct {
id string

View File

@ -2,18 +2,33 @@ package exchange
import (
"bytes"
"context"
"testing"
"github.com/microsoft/kiota-abstractions-go/serialization"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/exchange/api"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/path"
)
type mockItemer struct{}
func (mi mockItemer) GetItem(
context.Context,
string, string,
) (serialization.Parsable, *details.ExchangeInfo, error) {
return nil, nil, nil
}
func (mi mockItemer) Serialize(context.Context, serialization.Parsable, string, string) ([]byte, error) {
return nil, nil
}
type ExchangeDataCollectionSuite struct {
suite.Suite
}
@ -137,7 +152,9 @@ func (suite *ExchangeDataCollectionSuite) TestNewCollection_state() {
c := NewCollection(
"u",
test.curr, test.prev,
0, api.Client{}, nil, nil, control.Options{},
0,
mockItemer{}, nil,
control.Options{},
false)
assert.Equal(t, test.expect, c.State())
})

View File

@ -1,49 +0,0 @@
package exchange
import (
"time"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/pkg/backup/details"
)
func MessageInfo(msg models.Messageable, size int64) *details.ExchangeInfo {
sender := ""
subject := ""
received := time.Time{}
created := time.Time{}
modified := time.Time{}
if msg.GetSender() != nil &&
msg.GetSender().GetEmailAddress() != nil &&
msg.GetSender().GetEmailAddress().GetAddress() != nil {
sender = *msg.GetSender().GetEmailAddress().GetAddress()
}
if msg.GetSubject() != nil {
subject = *msg.GetSubject()
}
if msg.GetReceivedDateTime() != nil {
received = *msg.GetReceivedDateTime()
}
if msg.GetCreatedDateTime() != nil {
created = *msg.GetCreatedDateTime()
}
if msg.GetLastModifiedDateTime() != nil {
modified = *msg.GetLastModifiedDateTime()
}
return &details.ExchangeInfo{
ItemType: details.ExchangeMail,
Sender: sender,
Subject: subject,
Received: received,
Created: created,
Modified: modified,
Size: size,
}
}

View File

@ -2,6 +2,7 @@ package exchange
import (
"context"
"fmt"
"github.com/pkg/errors"
@ -56,19 +57,16 @@ func filterContainersAndFillCollections(
return err
}
ibt, err := itemerByType(ac, scope.Category().PathType())
if err != nil {
return err
}
for _, c := range resolver.Items() {
if ctrlOpts.FailFast && errs != nil {
return errs
}
// cannot be moved out of the loop,
// else we run into state issues.
service, err := createService(qp.Credentials)
if err != nil {
errs = support.WrapAndAppend(qp.ResourceOwner, err, errs)
continue
}
cID := *c.GetId()
delete(tombstones, cID)
@ -118,8 +116,7 @@ func filterContainersAndFillCollections(
currPath,
prevPath,
scope.Category().PathType(),
ac,
service,
ibt,
statusUpdater,
ctrlOpts,
newDelta.Reset)
@ -148,12 +145,6 @@ func filterContainersAndFillCollections(
// in the `previousPath` set, but does not exist in the current container
// resolver (which contains all the resource owners' current containers).
for id, p := range tombstones {
service, err := createService(qp.Credentials)
if err != nil {
errs = support.WrapAndAppend(p, err, errs)
continue
}
if collections[id] != nil {
errs = support.WrapAndAppend(p, errors.New("conflict: tombstone exists for a live collection"), errs)
continue
@ -178,8 +169,7 @@ func filterContainersAndFillCollections(
nil, // marks the collection as deleted
prevPath,
scope.Category().PathType(),
ac,
service,
ibt,
statusUpdater,
ctrlOpts,
false)
@ -231,3 +221,16 @@ func pathFromPrevString(ps string) (path.Path, error) {
return p, nil
}
func itemerByType(ac api.Client, category path.CategoryType) (itemer, error) {
switch category {
case path.EmailCategory:
return ac.Mail(), nil
case path.EventsCategory:
return ac.Events(), nil
case path.ContactsCategory:
return ac.Contacts(), nil
default:
return nil, fmt.Errorf("category %s not supported by getFetchIDFunc", category)
}
}

View File

@ -84,7 +84,10 @@ func RestoreExchangeContact(
return nil, errors.New("msgraph contact post fail: REST response not received")
}
return ContactInfo(contact, int64(len(bits))), nil
info := api.ContactInfo(contact)
info.Size = int64(len(bits))
return info, nil
}
// RestoreExchangeEvent restores a contact to the @bits byte
@ -153,7 +156,10 @@ func RestoreExchangeEvent(
}
}
return EventInfo(event, int64(len(bits))), errs
info := api.EventInfo(event)
info.Size = int64(len(bits))
return info, errs
}
// RestoreMailMessage utility function to place an exchange.Mail
@ -215,7 +221,10 @@ func RestoreMailMessage(
}
}
return MessageInfo(clone, int64(len(bits))), nil
info := api.MailInfo(clone)
info.Size = int64(len(bits))
return info, nil
}
// attachmentBytes is a helper to retrieve the attachment content from a models.Attachmentable