From 1ef300d6c2276d4bd807ff5f85ef1530bc1f6dc7 Mon Sep 17 00:00:00 2001 From: Keepers Date: Tue, 17 Jan 2023 13:27:37 -0700 Subject: [PATCH] add item fetching and serialization to api (#2150) ## Description Adds the per item collection streaming calls to the api interface. Primarily migrates a "getItem" and a "serializeItem" acton into the api pkg, out from exchange_data_collection. Building an ExchangeInfo is now also housed in api. ## Does this PR need a docs update or release note? - [x] :no_entry: No ## Type of change - [x] :broom: Tech Debt/Cleanup ## Issue(s) * #1996 ## Test Plan - [x] :zap: Unit test - [x] :green_heart: E2E --- src/cmd/getM365/getItem.go | 103 +++---- src/internal/connector/exchange/api/api.go | 13 +- .../connector/exchange/api/contacts.go | 77 ++++- .../{contact_test.go => api/contacts_test.go} | 14 +- src/internal/connector/exchange/api/events.go | 146 ++++++++- .../{event_test.go => api/events_test.go} | 13 +- src/internal/connector/exchange/api/mail.go | 118 +++++++- .../{message_test.go => api/mail_test.go} | 17 +- src/internal/connector/exchange/contact.go | 36 --- src/internal/connector/exchange/event.go | 82 ------ .../exchange/exchange_data_collection.go | 278 ++---------------- .../exchange/exchange_data_collection_test.go | 21 +- src/internal/connector/exchange/message.go | 49 --- .../connector/exchange/service_iterators.go | 39 +-- .../connector/exchange/service_restore.go | 15 +- 15 files changed, 492 insertions(+), 529 deletions(-) rename src/internal/connector/exchange/{contact_test.go => api/contacts_test.go} (84%) rename src/internal/connector/exchange/{event_test.go => api/events_test.go} (96%) rename src/internal/connector/exchange/{message_test.go => api/mail_test.go} (90%) delete mode 100644 src/internal/connector/exchange/contact.go delete mode 100644 src/internal/connector/exchange/event.go delete mode 100644 src/internal/connector/exchange/message.go diff --git a/src/cmd/getM365/getItem.go b/src/cmd/getM365/getItem.go index 10648006a..9c2f8f135 100644 --- a/src/cmd/getM365/getItem.go +++ b/src/cmd/getM365/getItem.go @@ -5,11 +5,11 @@ package main import ( - "bytes" "context" "fmt" "os" + "github.com/microsoft/kiota-abstractions-go/serialization" kw "github.com/microsoft/kiota-serialization-json-go" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -18,12 +18,10 @@ import ( "github.com/alcionai/corso/src/cli/utils" "github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/connector" - "github.com/alcionai/corso/src/internal/connector/exchange" "github.com/alcionai/corso/src/internal/connector/exchange/api" "github.com/alcionai/corso/src/internal/connector/graph" - "github.com/alcionai/corso/src/internal/connector/support" - "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/credentials" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -77,12 +75,12 @@ func handleGetCommand(cmd *cobra.Command, args []string) error { return nil } - gc, creds, err := getGC(ctx) + _, creds, err := getGC(ctx) if err != nil { return err } - err = runDisplayM365JSON(ctx, gc.Service, creds) + err = runDisplayM365JSON(ctx, creds, user, m365ID) if err != nil { return Only(ctx, errors.Wrapf(err, "unable to create mock from M365: %s", m365ID)) } @@ -92,13 +90,14 @@ func handleGetCommand(cmd *cobra.Command, args []string) error { func runDisplayM365JSON( ctx context.Context, - gs graph.Servicer, creds account.M365Config, + user, itemID string, ) error { var ( - get api.GraphRetrievalFunc - serializeFunc exchange.GraphSerializeFunc - cat = graph.StringToPathCategory(category) + bs []byte + err error + cat = graph.StringToPathCategory(category) + sw = kw.NewJsonSerializationWriter() ) ac, err := api.NewClient(creds) @@ -107,58 +106,60 @@ func runDisplayM365JSON( } switch cat { - case path.EmailCategory, path.EventsCategory, path.ContactsCategory: - get, serializeFunc = exchange.GetQueryAndSerializeFunc(ac, cat) + case path.EmailCategory: + bs, err = getItem(ctx, ac.Mail(), user, itemID) + case path.EventsCategory: + bs, err = getItem(ctx, ac.Events(), user, itemID) + case path.ContactsCategory: + bs, err = getItem(ctx, ac.Contacts(), user, itemID) default: return fmt.Errorf("unable to process category: %s", cat) } - channel := make(chan data.Stream, 1) - - sw := kw.NewJsonSerializationWriter() - - response, err := get(ctx, user, m365ID) - if err != nil { - return errors.Wrap(err, support.ConnectorStackErrorTrace(err)) - } - - // First return is the number of bytes that were serialized. Ignored - _, err = serializeFunc(ctx, gs.Client(), sw, channel, response, user) - close(channel) - if err != nil { return err } - for item := range channel { - buf := &bytes.Buffer{} + str := string(bs) - _, err := buf.ReadFrom(item.ToReader()) - if err != nil { - return errors.Wrapf(err, "unable to parse given data: %s", m365ID) - } - - byteArray := buf.Bytes() - newValue := string(byteArray) - - err = sw.WriteStringValue("", &newValue) - if err != nil { - return errors.Wrapf(err, "unable to %s to string value", m365ID) - } - - array, err := sw.GetSerializedContent() - if err != nil { - return errors.Wrapf(err, "unable to serialize new value from M365:%s", m365ID) - } - - fmt.Println(string(array)) - - //lint:ignore SA4004 only expecting one item - return nil + err = sw.WriteStringValue("", &str) + if err != nil { + return errors.Wrapf(err, "unable to %s to string value", itemID) } - // This should never happen - return errors.New("m365 object not serialized") + array, err := sw.GetSerializedContent() + if err != nil { + return errors.Wrapf(err, "unable to serialize new value from M365:%s", itemID) + } + + fmt.Println(string(array)) + + return nil +} + +type itemer interface { + GetItem( + ctx context.Context, + user, itemID string, + ) (serialization.Parsable, *details.ExchangeInfo, error) + Serialize( + ctx context.Context, + item serialization.Parsable, + user, itemID string, + ) ([]byte, error) +} + +func getItem( + ctx context.Context, + itm itemer, + user, itemID string, +) ([]byte, error) { + sp, _, err := itm.GetItem(ctx, user, itemID) + if err != nil { + return nil, errors.Wrap(err, "getting item") + } + + return itm.Serialize(ctx, sp, user, itemID) } //------------------------------------------------------------------------------- diff --git a/src/internal/connector/exchange/api/api.go b/src/internal/connector/exchange/api/api.go index 3fd15409f..6edd68f57 100644 --- a/src/internal/connector/exchange/api/api.go +++ b/src/internal/connector/exchange/api/api.go @@ -2,6 +2,7 @@ package api import ( "context" + "time" "github.com/microsoft/kiota-abstractions-go/serialization" "github.com/pkg/errors" @@ -11,9 +12,11 @@ import ( ) // --------------------------------------------------------------------------- -// common types +// common types and consts // --------------------------------------------------------------------------- +const numberOfRetries = 3 + // DeltaUpdate holds the results of a current delta token. It normally // gets produced when aggregating the addition and removal of items in // a delta-queriable folder. @@ -106,3 +109,11 @@ func checkIDAndName(c graph.Container) error { return nil } + +func orNow(t *time.Time) time.Time { + if t == nil { + return time.Now().UTC() + } + + return *t +} diff --git a/src/internal/connector/exchange/api/contacts.go b/src/internal/connector/exchange/api/contacts.go index ab41ff4b3..e12f4b795 100644 --- a/src/internal/connector/exchange/api/contacts.go +++ b/src/internal/connector/exchange/api/contacts.go @@ -2,15 +2,19 @@ package api import ( "context" + "fmt" + "time" "github.com/hashicorp/go-multierror" "github.com/microsoft/kiota-abstractions-go/serialization" + kioser "github.com/microsoft/kiota-serialization-json-go" "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/users" "github.com/pkg/errors" "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/pkg/backup/details" ) // --------------------------------------------------------------------------- @@ -52,12 +56,17 @@ func (c Contacts) DeleteContactFolder( return c.stable.Client().UsersById(user).ContactFoldersById(folderID).Delete(ctx, nil) } -// RetrieveContactDataForUser is a GraphRetrievalFun that returns all associated fields. -func (c Contacts) RetrieveContactDataForUser( +// GetItem retrieves a Contactable item. +func (c Contacts) GetItem( ctx context.Context, - user, m365ID string, -) (serialization.Parsable, error) { - return c.stable.Client().UsersById(user).ContactsById(m365ID).Get(ctx, nil) + user, itemID string, +) (serialization.Parsable, *details.ExchangeInfo, error) { + cont, err := c.stable.Client().UsersById(user).ContactsById(itemID).Get(ctx, nil) + if err != nil { + return nil, nil, err + } + + return cont, ContactInfo(cont), nil } // GetAllContactFolderNamesForUser is a GraphQuery function for getting @@ -224,3 +233,61 @@ func (c Contacts) GetAddedAndRemovedItemIDs( return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() } + +// --------------------------------------------------------------------------- +// Serialization +// --------------------------------------------------------------------------- + +// Serialize rserializes the item into a byte slice. +func (c Contacts) Serialize( + ctx context.Context, + item serialization.Parsable, + user, itemID string, +) ([]byte, error) { + contact, ok := item.(models.Contactable) + if !ok { + return nil, fmt.Errorf("expected Contactable, got %T", item) + } + + var ( + err error + writer = kioser.NewJsonSerializationWriter() + ) + + defer writer.Close() + + if err = writer.WriteObjectValue("", contact); err != nil { + return nil, support.SetNonRecoverableError(errors.Wrap(err, itemID)) + } + + bs, err := writer.GetSerializedContent() + if err != nil { + return nil, errors.Wrap(err, "serializing contact") + } + + return bs, nil +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func ContactInfo(contact models.Contactable) *details.ExchangeInfo { + name := "" + created := time.Time{} + + if contact.GetDisplayName() != nil { + name = *contact.GetDisplayName() + } + + if contact.GetCreatedDateTime() != nil { + created = *contact.GetCreatedDateTime() + } + + return &details.ExchangeInfo{ + ItemType: details.ExchangeContact, + ContactName: name, + Created: created, + Modified: orNow(contact.GetLastModifiedDateTime()), + } +} diff --git a/src/internal/connector/exchange/contact_test.go b/src/internal/connector/exchange/api/contacts_test.go similarity index 84% rename from src/internal/connector/exchange/contact_test.go rename to src/internal/connector/exchange/api/contacts_test.go index e01782520..411250146 100644 --- a/src/internal/connector/exchange/contact_test.go +++ b/src/internal/connector/exchange/api/contacts_test.go @@ -1,4 +1,4 @@ -package exchange +package api import ( "testing" @@ -11,15 +11,15 @@ import ( "github.com/alcionai/corso/src/pkg/backup/details" ) -type ContactSuite struct { +type ContactsAPIUnitSuite struct { suite.Suite } -func TestContactSuite(t *testing.T) { - suite.Run(t, &ContactSuite{}) +func TestContactsAPIUnitSuite(t *testing.T) { + suite.Run(t, new(ContactsAPIUnitSuite)) } -func (suite *ContactSuite) TestContactInfo() { +func (suite *ContactsAPIUnitSuite) TestContactInfo() { initial := time.Now() tests := []struct { @@ -37,7 +37,6 @@ func (suite *ContactSuite) TestContactInfo() { ItemType: details.ExchangeContact, Created: initial, Modified: initial, - Size: 10, } return contact, i }, @@ -54,7 +53,6 @@ func (suite *ContactSuite) TestContactInfo() { ContactName: aPerson, Created: initial, Modified: initial, - Size: 10, } return contact, i }, @@ -63,7 +61,7 @@ func (suite *ContactSuite) TestContactInfo() { for _, test := range tests { suite.T().Run(test.name, func(t *testing.T) { contact, expected := test.contactAndRP() - assert.Equal(t, expected, ContactInfo(contact, 10)) + assert.Equal(t, expected, ContactInfo(contact)) }) } } diff --git a/src/internal/connector/exchange/api/events.go b/src/internal/connector/exchange/api/events.go index bd37a361a..ba75cc648 100644 --- a/src/internal/connector/exchange/api/events.go +++ b/src/internal/connector/exchange/api/events.go @@ -2,15 +2,21 @@ package api import ( "context" + "fmt" + "time" "github.com/hashicorp/go-multierror" "github.com/microsoft/kiota-abstractions-go/serialization" + kioser "github.com/microsoft/kiota-serialization-json-go" "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/users" "github.com/pkg/errors" + "github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" ) @@ -52,12 +58,17 @@ func (c Events) DeleteCalendar( return c.stable.Client().UsersById(user).CalendarsById(calendarID).Delete(ctx, nil) } -// RetrieveEventDataForUser is a GraphRetrievalFunc that returns event data. -func (c Events) RetrieveEventDataForUser( +// GetItem retrieves an Eventable item. +func (c Events) GetItem( ctx context.Context, - user, m365ID string, -) (serialization.Parsable, error) { - return c.stable.Client().UsersById(user).EventsById(m365ID).Get(ctx, nil) + user, itemID string, +) (serialization.Parsable, *details.ExchangeInfo, error) { + evt, err := c.stable.Client().UsersById(user).EventsById(itemID).Get(ctx, nil) + if err != nil { + return nil, nil, err + } + + return evt, EventInfo(evt), nil } func (c Client) GetAllCalendarNamesForUser( @@ -190,6 +201,66 @@ func (c Events) GetAddedAndRemovedItemIDs( return added, nil, DeltaUpdate{}, errs.ErrorOrNil() } +// --------------------------------------------------------------------------- +// Serialization +// --------------------------------------------------------------------------- + +// Serialize retrieves attachment data identified by the event item, and then +// serializes it into a byte slice. +func (c Events) Serialize( + ctx context.Context, + item serialization.Parsable, + user, itemID string, +) ([]byte, error) { + event, ok := item.(models.Eventable) + if !ok { + return nil, fmt.Errorf("expected Eventable, got %T", item) + } + + var ( + err error + writer = kioser.NewJsonSerializationWriter() + ) + + defer writer.Close() + + if *event.GetHasAttachments() { + // getting all the attachments might take a couple attempts due to filesize + var retriesErr error + + for count := 0; count < numberOfRetries; count++ { + attached, err := c.stable. + Client(). + UsersById(user). + EventsById(itemID). + Attachments(). + Get(ctx, nil) + retriesErr = err + + if err == nil { + event.SetAttachments(attached.GetValue()) + break + } + } + + if retriesErr != nil { + logger.Ctx(ctx).Debug("exceeded maximum retries") + return nil, support.WrapAndAppend(itemID, errors.Wrap(retriesErr, "attachment failed"), nil) + } + } + + if err = writer.WriteObjectValue("", event); err != nil { + return nil, support.SetNonRecoverableError(errors.Wrap(err, itemID)) + } + + bs, err := writer.GetSerializedContent() + if err != nil { + return nil, errors.Wrap(err, "serializing calendar event") + } + + return bs, nil +} + // --------------------------------------------------------------------------- // helper funcs // --------------------------------------------------------------------------- @@ -216,3 +287,68 @@ func (c CalendarDisplayable) GetDisplayName() *string { func (c CalendarDisplayable) GetParentFolderId() *string { return nil } + +func EventInfo(evt models.Eventable) *details.ExchangeInfo { + var ( + organizer, subject string + recurs bool + start = time.Time{} + end = time.Time{} + created = time.Time{} + ) + + if evt.GetOrganizer() != nil && + evt.GetOrganizer().GetEmailAddress() != nil && + evt.GetOrganizer().GetEmailAddress().GetAddress() != nil { + organizer = *evt.GetOrganizer(). + GetEmailAddress(). + GetAddress() + } + + if evt.GetSubject() != nil { + subject = *evt.GetSubject() + } + + if evt.GetRecurrence() != nil { + recurs = true + } + + if evt.GetStart() != nil && + evt.GetStart().GetDateTime() != nil { + // timeString has 'Z' literal added to ensure the stored + // DateTime is not: time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC) + startTime := *evt.GetStart().GetDateTime() + "Z" + + output, err := common.ParseTime(startTime) + if err == nil { + start = output + } + } + + if evt.GetEnd() != nil && + evt.GetEnd().GetDateTime() != nil { + // timeString has 'Z' literal added to ensure the stored + // DateTime is not: time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC) + endTime := *evt.GetEnd().GetDateTime() + "Z" + + output, err := common.ParseTime(endTime) + if err == nil { + end = output + } + } + + if evt.GetCreatedDateTime() != nil { + created = *evt.GetCreatedDateTime() + } + + return &details.ExchangeInfo{ + ItemType: details.ExchangeEvent, + Organizer: organizer, + Subject: subject, + EventStart: start, + EventEnd: end, + EventRecurs: recurs, + Created: created, + Modified: orNow(evt.GetLastModifiedDateTime()), + } +} diff --git a/src/internal/connector/exchange/event_test.go b/src/internal/connector/exchange/api/events_test.go similarity index 96% rename from src/internal/connector/exchange/event_test.go rename to src/internal/connector/exchange/api/events_test.go index 386f451ab..a41a48e5a 100644 --- a/src/internal/connector/exchange/event_test.go +++ b/src/internal/connector/exchange/api/events_test.go @@ -1,4 +1,4 @@ -package exchange +package api import ( "testing" @@ -15,17 +15,17 @@ import ( "github.com/alcionai/corso/src/pkg/backup/details" ) -type EventSuite struct { +type EventsAPIUnitSuite struct { suite.Suite } -func TestEventSuite(t *testing.T) { - suite.Run(t, &EventSuite{}) +func TestEventsAPIUnitSuite(t *testing.T) { + suite.Run(t, new(EventsAPIUnitSuite)) } // TestEventInfo verifies that searchable event metadata // can be properly retrieved from a models.Eventable object -func (suite *EventSuite) TestEventInfo() { +func (suite *EventsAPIUnitSuite) TestEventInfo() { // Exchange stores start/end times in UTC and the below compares hours // directly so we need to "normalize" the timezone here. initial := time.Now().UTC() @@ -136,7 +136,6 @@ func (suite *EventSuite) TestEventInfo() { Organizer: organizer, EventStart: eventTime, EventEnd: eventEndTime, - Size: 10, } }, }, @@ -144,7 +143,7 @@ func (suite *EventSuite) TestEventInfo() { for _, test := range tests { suite.T().Run(test.name, func(t *testing.T) { event, expected := test.evtAndRP() - result := EventInfo(event, 10) + result := EventInfo(event) assert.Equal(t, expected.Subject, result.Subject, "subject") assert.Equal(t, expected.Sender, result.Sender, "sender") diff --git a/src/internal/connector/exchange/api/mail.go b/src/internal/connector/exchange/api/mail.go index bf6739384..b3f67ceb8 100644 --- a/src/internal/connector/exchange/api/mail.go +++ b/src/internal/connector/exchange/api/mail.go @@ -2,15 +2,20 @@ package api import ( "context" + "fmt" + "time" "github.com/hashicorp/go-multierror" "github.com/microsoft/kiota-abstractions-go/serialization" + kioser "github.com/microsoft/kiota-serialization-json-go" "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/users" "github.com/pkg/errors" "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/logger" ) // --------------------------------------------------------------------------- @@ -92,12 +97,17 @@ func (c Mail) GetContainerByID( return service.Client().UsersById(userID).MailFoldersById(dirID).Get(ctx, ofmf) } -// RetrieveMessageDataForUser is a GraphRetrievalFunc that returns message data. -func (c Mail) RetrieveMessageDataForUser( +// GetItem retrieves a Messageable item. +func (c Mail) GetItem( ctx context.Context, - user, m365ID string, -) (serialization.Parsable, error) { - return c.stable.Client().UsersById(user).MessagesById(m365ID).Get(ctx, nil) + user, itemID string, +) (serialization.Parsable, *details.ExchangeInfo, error) { + mail, err := c.stable.Client().UsersById(user).MessagesById(itemID).Get(ctx, nil) + if err != nil { + return nil, nil, err + } + + return mail, MailInfo(mail), nil } // EnumerateContainers iterates through all of the users current @@ -223,3 +233,101 @@ func (c Mail) GetAddedAndRemovedItemIDs( return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() } + +// --------------------------------------------------------------------------- +// Serialization +// --------------------------------------------------------------------------- + +// Serialize retrieves attachment data identified by the mail item, and then +// serializes it into a byte slice. +func (c Mail) Serialize( + ctx context.Context, + item serialization.Parsable, + user, itemID string, +) ([]byte, error) { + msg, ok := item.(models.Messageable) + if !ok { + return nil, fmt.Errorf("expected Messageable, got %T", item) + } + + var ( + err error + writer = kioser.NewJsonSerializationWriter() + ) + + defer writer.Close() + + if *msg.GetHasAttachments() { + // getting all the attachments might take a couple attempts due to filesize + var retriesErr error + + for count := 0; count < numberOfRetries; count++ { + attached, err := c.stable. + Client(). + UsersById(user). + MessagesById(itemID). + Attachments(). + Get(ctx, nil) + retriesErr = err + + if err == nil { + msg.SetAttachments(attached.GetValue()) + break + } + } + + if retriesErr != nil { + logger.Ctx(ctx).Debug("exceeded maximum retries") + return nil, support.WrapAndAppend(itemID, errors.Wrap(retriesErr, "attachment failed"), nil) + } + } + + if err = writer.WriteObjectValue("", msg); err != nil { + return nil, support.SetNonRecoverableError(errors.Wrap(err, itemID)) + } + + bs, err := writer.GetSerializedContent() + if err != nil { + return nil, errors.Wrap(err, "serializing email") + } + + return bs, nil +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func MailInfo(msg models.Messageable) *details.ExchangeInfo { + sender := "" + subject := "" + received := time.Time{} + created := time.Time{} + + if msg.GetSender() != nil && + msg.GetSender().GetEmailAddress() != nil && + msg.GetSender().GetEmailAddress().GetAddress() != nil { + sender = *msg.GetSender().GetEmailAddress().GetAddress() + } + + if msg.GetSubject() != nil { + subject = *msg.GetSubject() + } + + if msg.GetReceivedDateTime() != nil { + received = *msg.GetReceivedDateTime() + } + + if msg.GetCreatedDateTime() != nil { + created = *msg.GetCreatedDateTime() + } + + return &details.ExchangeInfo{ + ItemType: details.ExchangeMail, + Sender: sender, + Subject: subject, + Received: received, + Created: created, + Modified: orNow(msg.GetLastModifiedDateTime()), + } +} diff --git a/src/internal/connector/exchange/message_test.go b/src/internal/connector/exchange/api/mail_test.go similarity index 90% rename from src/internal/connector/exchange/message_test.go rename to src/internal/connector/exchange/api/mail_test.go index 8b5751610..5611586e2 100644 --- a/src/internal/connector/exchange/message_test.go +++ b/src/internal/connector/exchange/api/mail_test.go @@ -1,4 +1,4 @@ -package exchange +package api import ( "testing" @@ -10,15 +10,15 @@ import ( "github.com/alcionai/corso/src/pkg/backup/details" ) -type MessageSuite struct { +type MailAPIUnitSuite struct { suite.Suite } -func TestMessageSuite(t *testing.T) { - suite.Run(t, &MessageSuite{}) +func TestMailAPIUnitSuite(t *testing.T) { + suite.Run(t, new(MailAPIUnitSuite)) } -func (suite *MessageSuite) TestMessageInfo() { +func (suite *MailAPIUnitSuite) TestMailInfo() { initial := time.Now() tests := []struct { @@ -36,7 +36,6 @@ func (suite *MessageSuite) TestMessageInfo() { ItemType: details.ExchangeMail, Created: initial, Modified: initial, - Size: 10, } return msg, i }, @@ -58,7 +57,6 @@ func (suite *MessageSuite) TestMessageInfo() { Sender: sender, Created: initial, Modified: initial, - Size: 10, } return msg, i }, @@ -76,7 +74,6 @@ func (suite *MessageSuite) TestMessageInfo() { Subject: subject, Created: initial, Modified: initial, - Size: 10, } return msg, i }, @@ -94,7 +91,6 @@ func (suite *MessageSuite) TestMessageInfo() { Received: now, Created: initial, Modified: initial, - Size: 10, } return msg, i }, @@ -122,7 +118,6 @@ func (suite *MessageSuite) TestMessageInfo() { Received: now, Created: initial, Modified: initial, - Size: 10, } return msg, i }, @@ -131,7 +126,7 @@ func (suite *MessageSuite) TestMessageInfo() { for _, tt := range tests { suite.T().Run(tt.name, func(t *testing.T) { msg, expected := tt.msgAndRP() - suite.Equal(expected, MessageInfo(msg, 10)) + suite.Equal(expected, MailInfo(msg)) }) } } diff --git a/src/internal/connector/exchange/contact.go b/src/internal/connector/exchange/contact.go deleted file mode 100644 index 82bac9601..000000000 --- a/src/internal/connector/exchange/contact.go +++ /dev/null @@ -1,36 +0,0 @@ -package exchange - -import ( - "time" - - "github.com/microsoftgraph/msgraph-sdk-go/models" - - "github.com/alcionai/corso/src/pkg/backup/details" -) - -// ContactInfo translate models.Contactable metadata into searchable content -func ContactInfo(contact models.Contactable, size int64) *details.ExchangeInfo { - name := "" - created := time.Time{} - modified := time.Time{} - - if contact.GetDisplayName() != nil { - name = *contact.GetDisplayName() - } - - if contact.GetCreatedDateTime() != nil { - created = *contact.GetCreatedDateTime() - } - - if contact.GetLastModifiedDateTime() != nil { - modified = *contact.GetLastModifiedDateTime() - } - - return &details.ExchangeInfo{ - ItemType: details.ExchangeContact, - ContactName: name, - Created: created, - Modified: modified, - Size: size, - } -} diff --git a/src/internal/connector/exchange/event.go b/src/internal/connector/exchange/event.go deleted file mode 100644 index 775570d52..000000000 --- a/src/internal/connector/exchange/event.go +++ /dev/null @@ -1,82 +0,0 @@ -package exchange - -import ( - "time" - - "github.com/microsoftgraph/msgraph-sdk-go/models" - - "github.com/alcionai/corso/src/internal/common" - "github.com/alcionai/corso/src/pkg/backup/details" -) - -// EventInfo searchable metadata for stored event objects. -func EventInfo(evt models.Eventable, size int64) *details.ExchangeInfo { - var ( - organizer, subject string - recurs bool - start = time.Time{} - end = time.Time{} - created = time.Time{} - modified = time.Time{} - ) - - if evt.GetOrganizer() != nil && - evt.GetOrganizer().GetEmailAddress() != nil && - evt.GetOrganizer().GetEmailAddress().GetAddress() != nil { - organizer = *evt.GetOrganizer(). - GetEmailAddress(). - GetAddress() - } - - if evt.GetSubject() != nil { - subject = *evt.GetSubject() - } - - if evt.GetRecurrence() != nil { - recurs = true - } - - if evt.GetStart() != nil && - evt.GetStart().GetDateTime() != nil { - // timeString has 'Z' literal added to ensure the stored - // DateTime is not: time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC) - startTime := *evt.GetStart().GetDateTime() + "Z" - - output, err := common.ParseTime(startTime) - if err == nil { - start = output - } - } - - if evt.GetEnd() != nil && - evt.GetEnd().GetDateTime() != nil { - // timeString has 'Z' literal added to ensure the stored - // DateTime is not: time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC) - endTime := *evt.GetEnd().GetDateTime() + "Z" - - output, err := common.ParseTime(endTime) - if err == nil { - end = output - } - } - - if evt.GetCreatedDateTime() != nil { - created = *evt.GetCreatedDateTime() - } - - if evt.GetLastModifiedDateTime() != nil { - modified = *evt.GetLastModifiedDateTime() - } - - return &details.ExchangeInfo{ - ItemType: details.ExchangeEvent, - Organizer: organizer, - Subject: subject, - EventStart: start, - EventEnd: end, - EventRecurs: recurs, - Created: created, - Modified: modified, - Size: size, - } -} diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index 7fc3faadd..950cf7aaf 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -6,20 +6,13 @@ package exchange import ( "bytes" "context" - "fmt" "io" "sync" "sync/atomic" "time" - absser "github.com/microsoft/kiota-abstractions-go/serialization" - kioser "github.com/microsoft/kiota-serialization-json-go" - msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" - "github.com/microsoftgraph/msgraph-sdk-go/models" - "github.com/pkg/errors" + "github.com/microsoft/kiota-abstractions-go/serialization" - "github.com/alcionai/corso/src/internal/connector/exchange/api" - "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/observe" @@ -45,6 +38,18 @@ const ( urlPrefetchChannelBufferSize = 4 ) +type itemer interface { + GetItem( + ctx context.Context, + user, itemID string, + ) (serialization.Parsable, *details.ExchangeInfo, error) + Serialize( + ctx context.Context, + item serialization.Parsable, + user, itemID string, + ) ([]byte, error) +} + // Collection implements the interface from data.Collection // Structure holds data for an Exchange application for a single user type Collection struct { @@ -57,9 +62,7 @@ type Collection struct { // removed is a list of item IDs that were deleted from, or moved out, of a container removed map[string]struct{} - // service - client/adapter pair used to access M365 back store - service graph.Servicer - ac api.Client + items itemer category path.CategoryType statusUpdater support.StatusUpdater @@ -89,14 +92,12 @@ func NewCollection( user string, curr, prev path.Path, category path.CategoryType, - ac api.Client, - service graph.Servicer, + items itemer, statusUpdater support.StatusUpdater, ctrlOpts control.Options, doNotMergeItems bool, ) Collection { collection := Collection{ - ac: ac, category: category, ctrl: ctrlOpts, data: make(chan data.Stream, collectionChannelBufferSize), @@ -105,10 +106,10 @@ func NewCollection( added: make(map[string]struct{}, 0), removed: make(map[string]struct{}, 0), prevPath: prev, - service: service, state: stateOf(prev, curr), statusUpdater: statusUpdater, user: user, + items: items, } return collection @@ -137,22 +138,6 @@ func (col *Collection) Items() <-chan data.Stream { return col.data } -// GetQueryAndSerializeFunc helper function that returns the two functions functions -// required to convert M365 identifier into a byte array filled with the serialized data -func GetQueryAndSerializeFunc(ac api.Client, category path.CategoryType) (api.GraphRetrievalFunc, GraphSerializeFunc) { - switch category { - case path.ContactsCategory: - return ac.Contacts().RetrieveContactDataForUser, serializeAndStreamContact - case path.EventsCategory: - return ac.Events().RetrieveEventDataForUser, serializeAndStreamEvent - case path.EmailCategory: - return ac.Mail().RetrieveMessageDataForUser, serializeAndStreamMessage - // Unsupported options returns nil, nil - default: - return nil, nil - } -} - // FullPath returns the Collection's fullPath []string func (col *Collection) FullPath() path.Path { return col.fullPath @@ -208,15 +193,6 @@ func (col *Collection) streamItems(ctx context.Context) { }() } - // get QueryBasedonIdentifier - // verify that it is the correct type in called function - // serializationFunction - query, serializeFunc := GetQueryAndSerializeFunc(col.ac, col.category) - if query == nil { - errs = fmt.Errorf("unrecognized collection type: %s", col.category) - return - } - // Limit the max number of active requests to GC semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize) defer close(semaphoreCh) @@ -265,16 +241,17 @@ func (col *Collection) streamItems(ctx context.Context) { defer func() { <-semaphoreCh }() var ( - response absser.Parsable - err error + item serialization.Parsable + info *details.ExchangeInfo + err error ) for i := 1; i <= numberOfRetries; i++ { - response, err = query(ctx, user, id) + item, info, err = col.items.GetItem(ctx, user, id) if err == nil { break } - // TODO: Tweak sleep times + if i < numberOfRetries { time.Sleep(time.Duration(3*(i+1)) * time.Second) } @@ -285,20 +262,23 @@ func (col *Collection) streamItems(ctx context.Context) { return } - byteCount, err := serializeFunc( - ctx, - col.service.Client(), - kioser.NewJsonSerializationWriter(), - col.data, - response, - user) + data, err := col.items.Serialize(ctx, item, user, id) if err != nil { errUpdater(user, err) return } + info.Size = int64(len(data)) + + col.data <- &Stream{ + id: id, + message: data, + info: info, + modTime: info.Modified, + } + atomic.AddInt64(&success, 1) - atomic.AddInt64(&totalBytes, int64(byteCount)) + atomic.AddInt64(&totalBytes, info.Size) if colProgress != nil { colProgress <- struct{}{} @@ -328,200 +308,6 @@ func (col *Collection) finishPopulation(ctx context.Context, success int, totalB col.statusUpdater(status) } -type modTimer interface { - GetLastModifiedDateTime() *time.Time -} - -func getModTime(mt modTimer) time.Time { - res := time.Now().UTC() - - if t := mt.GetLastModifiedDateTime(); t != nil { - res = *t - } - - return res -} - -// GraphSerializeFunc are class of functions that are used by Collections to transform GraphRetrievalFunc -// responses into data.Stream items contained within the Collection -type GraphSerializeFunc func( - ctx context.Context, - client *msgraphsdk.GraphServiceClient, - objectWriter *kioser.JsonSerializationWriter, - dataChannel chan<- data.Stream, - parsable absser.Parsable, - user string, -) (int, error) - -// serializeAndStreamEvent is a GraphSerializeFunc used to serialize models.Eventable objects into -// data.Stream objects. Returns an error the process finishes unsuccessfully. -func serializeAndStreamEvent( - ctx context.Context, - client *msgraphsdk.GraphServiceClient, - objectWriter *kioser.JsonSerializationWriter, - dataChannel chan<- data.Stream, - parsable absser.Parsable, - user string, -) (int, error) { - var err error - - defer objectWriter.Close() - - event, ok := parsable.(models.Eventable) - if !ok { - return 0, fmt.Errorf("expected Eventable, got %T", parsable) - } - - if *event.GetHasAttachments() { - var retriesErr error - - for count := 0; count < numberOfRetries; count++ { - attached, err := client. - UsersById(user). - EventsById(*event.GetId()). - Attachments(). - Get(ctx, nil) - retriesErr = err - - if err == nil && attached != nil { - event.SetAttachments(attached.GetValue()) - break - } - } - - if retriesErr != nil { - logger.Ctx(ctx).Debug("exceeded maximum retries") - - return 0, support.WrapAndAppend( - *event.GetId(), - errors.Wrap(retriesErr, "attachment failed"), - nil) - } - } - - err = objectWriter.WriteObjectValue("", event) - if err != nil { - return 0, support.SetNonRecoverableError(errors.Wrap(err, *event.GetId())) - } - - byteArray, err := objectWriter.GetSerializedContent() - if err != nil { - return 0, support.WrapAndAppend(*event.GetId(), errors.Wrap(err, "serializing content"), nil) - } - - if len(byteArray) > 0 { - dataChannel <- &Stream{ - id: *event.GetId(), - message: byteArray, - info: EventInfo(event, int64(len(byteArray))), - modTime: getModTime(event), - } - } - - return len(byteArray), nil -} - -// serializeAndStreamContact is a GraphSerializeFunc for models.Contactable -func serializeAndStreamContact( - ctx context.Context, - client *msgraphsdk.GraphServiceClient, - objectWriter *kioser.JsonSerializationWriter, - dataChannel chan<- data.Stream, - parsable absser.Parsable, - user string, -) (int, error) { - defer objectWriter.Close() - - contact, ok := parsable.(models.Contactable) - if !ok { - return 0, fmt.Errorf("expected Contactable, got %T", parsable) - } - - err := objectWriter.WriteObjectValue("", contact) - if err != nil { - return 0, support.SetNonRecoverableError(errors.Wrap(err, *contact.GetId())) - } - - bs, err := objectWriter.GetSerializedContent() - if err != nil { - return 0, support.WrapAndAppend(*contact.GetId(), err, nil) - } - - if len(bs) > 0 { - dataChannel <- &Stream{ - id: *contact.GetId(), - message: bs, - info: ContactInfo(contact, int64(len(bs))), - modTime: getModTime(contact), - } - } - - return len(bs), nil -} - -// serializeAndStreamMessage is the GraphSerializeFunc for models.Messageable -func serializeAndStreamMessage( - ctx context.Context, - client *msgraphsdk.GraphServiceClient, - objectWriter *kioser.JsonSerializationWriter, - dataChannel chan<- data.Stream, - parsable absser.Parsable, - user string, -) (int, error) { - var err error - - defer objectWriter.Close() - - msg, ok := parsable.(models.Messageable) - if !ok { - return 0, fmt.Errorf("expected Messageable, got %T", parsable) - } - - if *msg.GetHasAttachments() { - // getting all the attachments might take a couple attempts due to filesize - var retriesErr error - - for count := 0; count < numberOfRetries; count++ { - attached, err := client. - UsersById(user). - MessagesById(*msg.GetId()). - Attachments(). - Get(ctx, nil) - retriesErr = err - - if err == nil { - msg.SetAttachments(attached.GetValue()) - break - } - } - - if retriesErr != nil { - logger.Ctx(ctx).Debug("exceeded maximum retries") - return 0, support.WrapAndAppend(*msg.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil) - } - } - - err = objectWriter.WriteObjectValue("", msg) - if err != nil { - return 0, support.SetNonRecoverableError(errors.Wrapf(err, "%s", *msg.GetId())) - } - - bs, err := objectWriter.GetSerializedContent() - if err != nil { - err = support.WrapAndAppend(*msg.GetId(), errors.Wrap(err, "serializing mail content"), nil) - return 0, support.SetNonRecoverableError(err) - } - - dataChannel <- &Stream{ - id: *msg.GetId(), - message: bs, - info: MessageInfo(msg, int64(len(bs))), - modTime: getModTime(msg), - } - - return len(bs), nil -} - // Stream represents a single item retrieved from exchange type Stream struct { id string diff --git a/src/internal/connector/exchange/exchange_data_collection_test.go b/src/internal/connector/exchange/exchange_data_collection_test.go index 79efc59f4..a63a7caf8 100644 --- a/src/internal/connector/exchange/exchange_data_collection_test.go +++ b/src/internal/connector/exchange/exchange_data_collection_test.go @@ -2,18 +2,33 @@ package exchange import ( "bytes" + "context" "testing" + "github.com/microsoft/kiota-abstractions-go/serialization" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/alcionai/corso/src/internal/connector/exchange/api" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/path" ) +type mockItemer struct{} + +func (mi mockItemer) GetItem( + context.Context, + string, string, +) (serialization.Parsable, *details.ExchangeInfo, error) { + return nil, nil, nil +} + +func (mi mockItemer) Serialize(context.Context, serialization.Parsable, string, string) ([]byte, error) { + return nil, nil +} + type ExchangeDataCollectionSuite struct { suite.Suite } @@ -137,7 +152,9 @@ func (suite *ExchangeDataCollectionSuite) TestNewCollection_state() { c := NewCollection( "u", test.curr, test.prev, - 0, api.Client{}, nil, nil, control.Options{}, + 0, + mockItemer{}, nil, + control.Options{}, false) assert.Equal(t, test.expect, c.State()) }) diff --git a/src/internal/connector/exchange/message.go b/src/internal/connector/exchange/message.go deleted file mode 100644 index 9d1e065fb..000000000 --- a/src/internal/connector/exchange/message.go +++ /dev/null @@ -1,49 +0,0 @@ -package exchange - -import ( - "time" - - "github.com/microsoftgraph/msgraph-sdk-go/models" - - "github.com/alcionai/corso/src/pkg/backup/details" -) - -func MessageInfo(msg models.Messageable, size int64) *details.ExchangeInfo { - sender := "" - subject := "" - received := time.Time{} - created := time.Time{} - modified := time.Time{} - - if msg.GetSender() != nil && - msg.GetSender().GetEmailAddress() != nil && - msg.GetSender().GetEmailAddress().GetAddress() != nil { - sender = *msg.GetSender().GetEmailAddress().GetAddress() - } - - if msg.GetSubject() != nil { - subject = *msg.GetSubject() - } - - if msg.GetReceivedDateTime() != nil { - received = *msg.GetReceivedDateTime() - } - - if msg.GetCreatedDateTime() != nil { - created = *msg.GetCreatedDateTime() - } - - if msg.GetLastModifiedDateTime() != nil { - modified = *msg.GetLastModifiedDateTime() - } - - return &details.ExchangeInfo{ - ItemType: details.ExchangeMail, - Sender: sender, - Subject: subject, - Received: received, - Created: created, - Modified: modified, - Size: size, - } -} diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index 70f2190c5..0880ad233 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -2,6 +2,7 @@ package exchange import ( "context" + "fmt" "github.com/pkg/errors" @@ -56,19 +57,16 @@ func filterContainersAndFillCollections( return err } + ibt, err := itemerByType(ac, scope.Category().PathType()) + if err != nil { + return err + } + for _, c := range resolver.Items() { if ctrlOpts.FailFast && errs != nil { return errs } - // cannot be moved out of the loop, - // else we run into state issues. - service, err := createService(qp.Credentials) - if err != nil { - errs = support.WrapAndAppend(qp.ResourceOwner, err, errs) - continue - } - cID := *c.GetId() delete(tombstones, cID) @@ -118,8 +116,7 @@ func filterContainersAndFillCollections( currPath, prevPath, scope.Category().PathType(), - ac, - service, + ibt, statusUpdater, ctrlOpts, newDelta.Reset) @@ -148,12 +145,6 @@ func filterContainersAndFillCollections( // in the `previousPath` set, but does not exist in the current container // resolver (which contains all the resource owners' current containers). for id, p := range tombstones { - service, err := createService(qp.Credentials) - if err != nil { - errs = support.WrapAndAppend(p, err, errs) - continue - } - if collections[id] != nil { errs = support.WrapAndAppend(p, errors.New("conflict: tombstone exists for a live collection"), errs) continue @@ -178,8 +169,7 @@ func filterContainersAndFillCollections( nil, // marks the collection as deleted prevPath, scope.Category().PathType(), - ac, - service, + ibt, statusUpdater, ctrlOpts, false) @@ -231,3 +221,16 @@ func pathFromPrevString(ps string) (path.Path, error) { return p, nil } + +func itemerByType(ac api.Client, category path.CategoryType) (itemer, error) { + switch category { + case path.EmailCategory: + return ac.Mail(), nil + case path.EventsCategory: + return ac.Events(), nil + case path.ContactsCategory: + return ac.Contacts(), nil + default: + return nil, fmt.Errorf("category %s not supported by getFetchIDFunc", category) + } +} diff --git a/src/internal/connector/exchange/service_restore.go b/src/internal/connector/exchange/service_restore.go index 06c56ac9d..3f88f6efe 100644 --- a/src/internal/connector/exchange/service_restore.go +++ b/src/internal/connector/exchange/service_restore.go @@ -84,7 +84,10 @@ func RestoreExchangeContact( return nil, errors.New("msgraph contact post fail: REST response not received") } - return ContactInfo(contact, int64(len(bits))), nil + info := api.ContactInfo(contact) + info.Size = int64(len(bits)) + + return info, nil } // RestoreExchangeEvent restores a contact to the @bits byte @@ -153,7 +156,10 @@ func RestoreExchangeEvent( } } - return EventInfo(event, int64(len(bits))), errs + info := api.EventInfo(event) + info.Size = int64(len(bits)) + + return info, errs } // RestoreMailMessage utility function to place an exchange.Mail @@ -215,7 +221,10 @@ func RestoreMailMessage( } } - return MessageInfo(clone, int64(len(bits))), nil + info := api.MailInfo(clone) + info.Size = int64(len(bits)) + + return info, nil } // attachmentBytes is a helper to retrieve the attachment content from a models.Attachmentable