GC: Create Consistent Snapshot (#292)

GraphConnector: Query consistency feature added
See document for implementation details: [document](https://www.notion.so/alcion/GraphConnector-RestorePoint-Consistency-for-Queries-81cce329c6e84cd79121cda0d511e1f6)
This commit is contained in:
Danny 2022-07-08 15:45:25 -04:00 committed by GitHub
parent 0a5aa8ce73
commit 60ab132960
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 303 additions and 143 deletions

View File

@ -9,7 +9,7 @@ require (
github.com/kopia/kopia v0.11.1
github.com/microsoft/kiota-abstractions-go v0.8.1
github.com/microsoft/kiota-authentication-azure-go v0.3.0
github.com/microsoft/kiota-serialization-json-go v0.5.3
github.com/microsoft/kiota-serialization-json-go v0.5.4
github.com/microsoftgraph/msgraph-sdk-go v0.28.0
github.com/microsoftgraph/msgraph-sdk-go-core v0.26.1
github.com/pkg/errors v0.9.1
@ -60,7 +60,7 @@ require (
github.com/klauspost/pgzip v1.2.5 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/microsoft/kiota-http-go v0.5.1 // indirect
github.com/microsoft/kiota-http-go v0.5.2 // indirect
github.com/microsoft/kiota-serialization-text-go v0.4.1 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/minio-go/v7 v7.0.30 // indirect

View File

@ -246,8 +246,12 @@ github.com/microsoft/kiota-authentication-azure-go v0.3.0 h1:iLyy5qldAjBiYMGMk1r
github.com/microsoft/kiota-authentication-azure-go v0.3.0/go.mod h1:qyZWSCug2eG1zrRnCSacyFHGsgQa4aSCWn3EOkY9Z1M=
github.com/microsoft/kiota-http-go v0.5.1 h1:8QLZBfvPRvISUO+qWvv6fBrxaBH5n0V/Venq7Fq51cg=
github.com/microsoft/kiota-http-go v0.5.1/go.mod h1:WqEFNw3rMEatymG4Xh3rLSTxaKq80rJdQ/CSSh7m6jI=
github.com/microsoft/kiota-http-go v0.5.2 h1:BS/bK2xHLT8TT+p0uZKxwu+lkXDAPByugYP2n1nV0Uo=
github.com/microsoft/kiota-http-go v0.5.2/go.mod h1:WqEFNw3rMEatymG4Xh3rLSTxaKq80rJdQ/CSSh7m6jI=
github.com/microsoft/kiota-serialization-json-go v0.5.3 h1:NrRyed65WYhEH5NwZTzplWs+eoECEYtLpAQ5Dhwq1wc=
github.com/microsoft/kiota-serialization-json-go v0.5.3/go.mod h1:GI9vrssO1EvqzDtvMKuhjALn40phZOWkeeaMgtCk6xE=
github.com/microsoft/kiota-serialization-json-go v0.5.4 h1:BpkTYq1AeZPCnSsp3zpzfNL9hx3xb1/LPFteV6tbhMQ=
github.com/microsoft/kiota-serialization-json-go v0.5.4/go.mod h1:GI9vrssO1EvqzDtvMKuhjALn40phZOWkeeaMgtCk6xE=
github.com/microsoft/kiota-serialization-text-go v0.4.1 h1:6QPH7+geUPCpaSZkKCQw0Scngx2IF0vKodrvvWWiu2A=
github.com/microsoft/kiota-serialization-text-go v0.4.1/go.mod h1:DsriFnVBDCc4D84qxG3j8q/1Sxu16JILfhxMZm3kdfw=
github.com/microsoftgraph/msgraph-sdk-go v0.28.0 h1:BolP/vNW7gsNXivg/qikcdftOicLMgMm3Z/6PpSFDvU=

View File

@ -5,7 +5,6 @@ package connector
import (
"bytes"
"context"
"fmt"
az "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
ka "github.com/microsoft/kiota-authentication-azure-go"
@ -14,7 +13,6 @@ import (
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
"github.com/microsoftgraph/msgraph-sdk-go/models"
msuser "github.com/microsoftgraph/msgraph-sdk-go/users"
msfolder "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders"
"github.com/pkg/errors"
"github.com/alcionai/corso/internal/connector/exchange"
@ -119,9 +117,13 @@ func (gc *GraphConnector) GetUsers() []string {
return buildFromMap(true, gc.Users)
}
// GetUsersIds returns the M365 id for the user
func (gc *GraphConnector) GetUsersIds() []string {
return buildFromMap(false, gc.Users)
}
// buildFromMap helper function for returning []string from map.
// Returns list of keys iff true; otherwise returns a list of values
func buildFromMap(isKey bool, mapping map[string]string) []string {
returnString := make([]string, 0)
if isKey {
@ -139,7 +141,6 @@ func buildFromMap(isKey bool, mapping map[string]string) []string {
// ExchangeDataStream returns a DataCollection which the caller can
// use to read mailbox data out for the specified user
// Assumption: User exists
// TODO: https://github.com/alcionai/corso/issues/135
// Add iota to this call -> mail, contacts, calendar, etc.
func (gc *GraphConnector) ExchangeDataCollection(ctx context.Context, selector selectors.Selector) ([]DataCollection, error) {
eb, err := selector.ToExchangeBackup()
@ -184,20 +185,6 @@ func (gc *GraphConnector) ExchangeDataCollection(ctx context.Context, selector s
return collections, errs
}
// optionsForMailFolders creates transforms the 'select' into a more dynamic call for MailFolders.
// var moreOps is a comma separated string of options(e.g. "displayName, isHidden")
// return is first call in MailFolders().GetWithRequestConfigurationAndResponseHandler(options, handler)
func optionsForMailFolders(moreOps []string) *msfolder.MailFoldersRequestBuilderGetRequestConfiguration {
selecting := append(moreOps, "id")
requestParameters := &msfolder.MailFoldersRequestBuilderGetQueryParameters{
Select: selecting,
}
options := &msfolder.MailFoldersRequestBuilderGetRequestConfiguration{
QueryParameters: requestParameters,
}
return options
}
// RestoreMessages: Utility function to connect to M365 backstore
// and upload messages from DataCollection.
// FullPath: tenantId, userId, <mailCategory>, FolderId
@ -229,6 +216,7 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dc DataCollection
}
clone := support.ToMessage(message)
address := dc.FullPath()[3]
// details on valueId settings: https://docs.microsoft.com/en-us/openspecs/exchange_server_protocols/ms-oxprops/77844470-22ca-43fb-993d-c53e96cf9cd6
valueId := "Integer 0x0E07"
enableValue := "4"
sv := models.NewSingleValueLegacyExtendedProperty()
@ -258,70 +246,61 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dc DataCollection
// serializeMessages: Temp Function as place Holder until Collections have been added
// to the GraphConnector struct.
func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([]DataCollection, error) {
options := optionsForMailFolders([]string{})
response, err := gc.client.UsersById(user).MailFolders().GetWithRequestConfigurationAndResponseHandler(options, nil)
options := optionsForMessageSnapshot()
response, err := gc.client.UsersById(user).Messages().GetWithRequestConfigurationAndResponseHandler(options, nil)
if err != nil {
return nil, err
}
if response == nil {
return nil, fmt.Errorf("unable to access folders for %s", user)
pageIterator, err := msgraphgocore.NewPageIterator(response, &gc.adapter, models.CreateMessageCollectionResponseFromDiscriminatorValue)
if err != nil {
return nil, err
}
folderList := make([]string, 0)
for _, folderable := range response.GetValue() {
folderList = append(folderList, *folderable.GetId())
tasklist := NewTaskList() // map[folder][] messageIds
callbackFunc := func(messageItem any) bool {
message, ok := messageItem.(models.Messageable)
if !ok {
err = support.WrapAndAppendf(gc.adapter.GetBaseUrl(), errors.New("message iteration failure"), err)
return true
}
// Saving to messages to list. Indexed by folder
tasklist.AddTask(*message.GetParentFolderId(), *message.GetId())
return true
}
iterateError := pageIterator.Iterate(callbackFunc)
if iterateError != nil {
err = support.WrapAndAppend(gc.adapter.GetBaseUrl(), iterateError, err)
}
if err != nil {
return nil, err // return error if snapshot is incomplete
}
// Time to create Exchange data Holder
collections := make([]DataCollection, 0)
var errs error
var totalItems, success int
for _, aFolder := range folderList {
// get all user's mail messages
result, err := gc.client.UsersById(user).MailFoldersById(aFolder).Messages().Get()
if err != nil {
errs = support.WrapAndAppend(user, err, errs)
}
if result == nil {
errs = support.WrapAndAppend(user, fmt.Errorf("nil response on message query, folder: %s", aFolder), errs)
continue
}
// set up a page iterator for retrieving further message batches
pageIterator, err := msgraphgocore.NewPageIterator(result, &gc.adapter, models.CreateMessageCollectionResponseFromDiscriminatorValue)
if err != nil {
errs = support.WrapAndAppend(user, fmt.Errorf("iterator failed initialization: %v", err), errs)
continue
}
// prep writing mail attachments
objectWriter := kw.NewJsonSerializationWriter()
var errs error
var attemptedItems, success int
for aFolder, tasks := range tasklist {
// prep the items for handoff to the backup consumer
edc := NewExchangeDataCollection(user, []string{gc.tenant, user, mailCategory, aFolder})
// iterate through the remaining pages of mail
stats := iteratorStats{
count: totalItems,
errs: errs,
for _, task := range tasks {
response, err := gc.client.UsersById(user).MessagesById(task).Get()
if err != nil {
details := support.ConnectorStackErrorTrace(err)
errs = support.WrapAndAppend(user, errors.Wrapf(err, "unable to retrieve %s, %s", task, details), errs)
continue
}
cbf := gc.serializeMessageIteratorCallback(ctx, objectWriter, edc, user, &stats)
err = pageIterator.Iterate(cbf)
totalItems = stats.count
errs = stats.errs
err = gc.messageToDataCollection(ctx, objectWriter, edc, response, user)
if err != nil {
errs = support.WrapAndAppend(user, err, errs)
errs = support.WrapAndAppendf(user, err, errs)
}
}
// Todo Retry Handler to be implemented
edc.FinishPopulation()
attemptedItems += len(tasks)
success += edc.Length()
collections = append(collections, &edc)
}
status, err := support.CreateStatus(support.Backup, totalItems, success, len(folderList), errs)
status, err := support.CreateStatus(support.Backup, attemptedItems, success, len(tasklist), errs)
if err == nil {
gc.SetStatus(*status)
logger.Ctx(ctx).Debugw(gc.PrintableStatus())
@ -329,30 +308,14 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([
return collections, errs
}
type iteratorStats struct {
count int
errs error
}
func (gc *GraphConnector) serializeMessageIteratorCallback(
// messageToDataCollection transfers message objects to objects within DataCollection
func (gc *GraphConnector) messageToDataCollection(
ctx context.Context,
objectWriter *kw.JsonSerializationWriter,
edc ExchangeDataCollection,
message models.Messageable,
user string,
stats *iteratorStats,
) func(messageItem interface{}) bool {
return func(messageItem interface{}) bool {
stats.count++
message, ok := messageItem.(models.Messageable)
if !ok {
stats.errs = support.WrapAndAppend(
user,
errors.New("non-message return for user: "+user),
stats.errs)
return true
}
) error {
if *message.GetHasAttachments() {
// getting all the attachments might take a couple attempts due to filesize
var retriesErr error
@ -370,38 +333,23 @@ func (gc *GraphConnector) serializeMessageIteratorCallback(
}
if retriesErr != nil {
logger.Ctx(ctx).Debug("exceeded maximum retries")
stats.errs = support.WrapAndAppend(
*message.GetId(),
errors.Wrap(retriesErr, "attachment failed"),
stats.errs)
return support.WrapAndAppend(*message.GetId(), errors.Wrap(retriesErr, "attachment failed"), nil)
}
}
defer objectWriter.Close()
err := objectWriter.WriteObjectValue("", message)
if err != nil {
stats.errs = support.WrapAndAppend(
*message.GetId(),
support.SetNonRecoverableError(err),
stats.errs)
return true
return support.SetNonRecoverableError(errors.Wrapf(err, "%s", *message.GetId()))
}
byteArray, err := objectWriter.GetSerializedContent()
objectWriter.Close()
if err != nil {
stats.errs = support.WrapAndAppend(
*message.GetId(),
errors.Wrap(err, "serializing mail content"),
stats.errs)
return true
return support.WrapAndAppend(*message.GetId(), errors.Wrap(err, "serializing mail content"), nil)
}
if byteArray != nil {
edc.PopulateCollection(&ExchangeData{id: *message.GetId(), message: byteArray, info: exchange.MessageInfo(message)})
}
return true
}
return nil
}
// SetStatus helper function

View File

@ -21,7 +21,7 @@ type GraphConnectorIntegrationSuite struct {
connector *GraphConnector
}
func TestGraphConnectorIntetgrationSuite(t *testing.T) {
func TestGraphConnectorIntegrationSuite(t *testing.T) {
if err := ctesting.RunOnAny(
ctesting.CorsoCITests,
ctesting.CorsoGraphConnectorTests,
@ -143,17 +143,6 @@ func (suite *DisconnectedGraphConnectorSuite) TestBadConnection() {
}
}
// Contains is a helper method for verifying if element
// is contained within the slice
func Contains(elems []string, value string) bool {
for _, s := range elems {
if value == s {
return true
}
}
return false
}
func (suite *DisconnectedGraphConnectorSuite) TestBuild() {
names := make(map[string]string)
names["Al"] = "Bundy"
@ -235,3 +224,75 @@ func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_ErrorChecking()
})
}
}
func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_TaskList() {
tasks := NewTaskList()
tasks.AddTask("person1", "Go to store")
tasks.AddTask("person1", "drop off mail")
values := tasks["person1"]
suite.Equal(len(values), 2)
nonValues := tasks["unknown"]
suite.Zero(len(nonValues))
}
func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_TestOptionsForMailFolders() {
tests := []struct {
name string
params []string
isError bool
}{
{
name: "Accepted",
params: []string{"displayName"},
isError: false,
},
{
name: "Multiple Accepted",
params: []string{"displayName", "parentFolderId"},
isError: false,
},
{
name: "Incorrect param",
params: []string{"status"},
isError: true,
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
_, err := optionsForMailFolders(test.params)
suite.Equal(test.isError, err != nil)
})
}
}
func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_TestOptionsForMessages() {
tests := []struct {
name string
params []string
isError bool
}{
{
name: "Accepted",
params: []string{"subject"},
isError: false,
},
{
name: "Multiple Accepted",
params: []string{"webLink", "parentFolderId"},
isError: false,
},
{
name: "Incorrect param",
params: []string{"status"},
isError: true,
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
_, err := optionsForMessages(test.params)
suite.Equal(test.isError, err != nil)
})
}
}

View File

@ -0,0 +1,26 @@
// Code generated by "stringer -type=optionIdentifier"; DO NOT EDIT.
package connector
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[unknown-0]
_ = x[folders-1]
_ = x[messages-2]
_ = x[users-3]
}
const _optionIdentifier_name = "unknownfoldersmessagesusers"
var _optionIdentifier_index = [...]uint8{0, 7, 14, 22, 27}
func (i optionIdentifier) String() string {
if i < 0 || i >= optionIdentifier(len(_optionIdentifier_index)-1) {
return "optionIdentifier(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _optionIdentifier_name[_optionIdentifier_index[i]:_optionIdentifier_index[i+1]]
}

View File

@ -0,0 +1,121 @@
package connector
import (
"errors"
msfolder "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders"
msmessage "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages"
)
// TaskList is a a generic map of a list of items with a string index
type TaskList map[string][]string
type optionIdentifier int
//go:generate stringer -type=optionIdentifier
const (
unknown optionIdentifier = iota
folders
messages
users
)
// NewTaskList constructor for TaskList
func NewTaskList() TaskList {
return make(map[string][]string, 0)
}
// AddTask helper method to ensure that keys and items are created properly
func (tl *TaskList) AddTask(key, value string) {
aMap := *tl
_, isCreated := aMap[key]
if isCreated {
aMap[key] = append(aMap[key], value)
} else {
aMap[key] = []string{value}
}
}
// Contains is a helper method for verifying if element
// is contained within the slice
func Contains(elems []string, value string) bool {
for _, s := range elems {
if value == s {
return true
}
}
return false
}
// optionsForMailFolders creates transforms the 'select' into a more dynamic call for MailFolders.
// var moreOps is a []string of options(e.g. "displayName", "isHidden")
// return is first call in MailFolders().GetWithRequestConfigurationAndResponseHandler(options, handler)
func optionsForMailFolders(moreOps []string) (*msfolder.MailFoldersRequestBuilderGetRequestConfiguration, error) {
selecting, err := buildOptions(moreOps, folders)
if err != nil {
return nil, err
}
requestParameters := &msfolder.MailFoldersRequestBuilderGetQueryParameters{
Select: selecting,
}
options := &msfolder.MailFoldersRequestBuilderGetRequestConfiguration{
QueryParameters: requestParameters,
}
return options, nil
}
func optionsForMessageSnapshot() *msmessage.MessagesRequestBuilderGetRequestConfiguration {
selecting := []string{"id", "parentFolderId"}
options := &msmessage.MessagesRequestBuilderGetRequestConfiguration{
QueryParameters: &msmessage.MessagesRequestBuilderGetQueryParameters{
Select: selecting,
},
}
return options
}
func optionsForMessages(moreOps []string) (*msmessage.MessagesRequestBuilderGetRequestConfiguration, error) {
selecting, err := buildOptions(moreOps, messages)
if err != nil {
return nil, err
}
requestParameters := &msmessage.MessagesRequestBuilderGetQueryParameters{
Select: selecting,
}
options := &msmessage.MessagesRequestBuilderGetRequestConfiguration{
QueryParameters: requestParameters,
}
return options, nil
}
// CheckOptions Utility Method for verifying if select options are valid the m365 object type
// returns a list of valid options
func buildOptions(options []string, selection optionIdentifier) ([]string, error) {
var allowedOptions []string
fieldsForFolders := []string{"displayName", "isHidden", "parentFolderId", "totalItemCount"}
fieldsForUsers := []string{"birthday", "businessPhones", "city", "companyName", "department", "displayName", "employeeId"}
fieldsForMessages := []string{"conservationId", "conversationIndex", "parentFolderId", "subject", "webLink"}
returnedOptions := []string{"id"}
switch selection {
case folders:
allowedOptions = fieldsForFolders
case users:
allowedOptions = fieldsForUsers
case messages:
allowedOptions = fieldsForMessages
default:
return nil, errors.New("unsupported option")
}
for _, entry := range options {
result := Contains(allowedOptions, entry)
if result {
returnedOptions = append(returnedOptions, entry)
} else {
return nil, errors.New("unsupported option")
}
}
return returnedOptions, nil
}