Restore into Custom Folder (#391)

Custom folder added for restore process workflow
This commit is contained in:
Danny 2022-08-03 12:14:45 -04:00 committed by GitHub
parent 640dfcf967
commit 186c2d9dc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 166 additions and 37 deletions

View File

@ -27,6 +27,9 @@ var _ data.StreamInfo = &Stream{}
const (
collectionChannelBufferSize = 1000
numberOfRetries = 4
// RestorePropertyTag defined: https://docs.microsoft.com/en-us/office/client-developer/outlook/mapi/pidtagmessageflags-canonical-property
RestorePropertyTag = "Integer 0x0E07"
RestoreCanonicalEnableValue = "4"
)
// ExchangeDataCollection represents exchange mailbox

View File

@ -1,16 +1,25 @@
package exchange
import (
"context"
"fmt"
"time"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/pkg/errors"
"github.com/alcionai/corso/internal/common"
"github.com/alcionai/corso/internal/connector/graph"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/control"
"github.com/alcionai/corso/pkg/logger"
)
var ErrFolderNotFound = errors.New("folder not found")
type exchangeService struct {
client msgraphsdk.GraphServiceClient
adapter msgraphsdk.GraphRequestAdapter
@ -40,9 +49,10 @@ func createService(credentials account.M365Config, shouldFailFast bool) (*exchan
return nil, err
}
service := exchangeService{
adapter: *adapter,
client: *msgraphsdk.NewGraphServiceClient(adapter),
failFast: shouldFailFast,
adapter: *adapter,
client: *msgraphsdk.NewGraphServiceClient(adapter),
failFast: shouldFailFast,
credentials: credentials,
}
return &service, err
}
@ -66,7 +76,7 @@ func DeleteMailFolder(gs graph.Service, user, folderID string) error {
// GetMailFolderID query function to retrieve the M365 ID based on the folder's displayName.
// @param folderName the target folder's display name. Case sensitive
// @returns a *string if the folder exists, nil otherwise
// @returns a *string if the folder exists. If the folder does not exist returns nil, error-> folder not found
func GetMailFolderID(service graph.Service, folderName, user string) (*string, error) {
var errs error
var folderId *string
@ -100,7 +110,90 @@ func GetMailFolderID(service graph.Service, folderName, user string) (*string, e
iterateError := pageIterator.Iterate(callbackFunc)
if iterateError != nil {
errs = support.WrapAndAppend(service.Adapter().GetBaseUrl(), iterateError, errs)
} else if folderId == nil {
return nil, ErrFolderNotFound
}
return folderId, errs
}
// GetCopyRestoreFolder utility function to create an unique folder for the restore process
func GetCopyRestoreFolder(service graph.Service, user string) (*string, error) {
now := time.Now().UTC()
newFolder := fmt.Sprintf("Corso_Restore_%s", common.FormatSimpleDateTime(now))
isFolder, err := GetMailFolderID(service, newFolder, user)
if err != nil {
// Verify unique folder was not found
if errors.Is(err, ErrFolderNotFound) {
fold, err := CreateMailFolder(service, user, newFolder)
if err != nil {
return nil, support.WrapAndAppend(user, err, err)
}
return fold.GetId(), nil
} else {
return nil, err
}
}
return isFolder, nil
}
// RestoreMailMessage utility function to place an exchange.Mail
// message into the user's M365 Exchange account.
// @param bits - byte array representation of exchange.Message from Corso backstore
// @param service - connector to M365 graph
// @param cp - collision policy that directs restore workflow
// @param destination - M365 Folder ID. Verified and sent by higher function. `copy` policy can use directly
func RestoreMailMessage(
ctx context.Context,
bits []byte,
service graph.Service,
cp control.CollisionPolicy,
destination,
user string,
) error {
// Creates messageable object from original bytes
originalMessage, err := support.CreateMessageFromBytes(bits)
if err != nil {
return err
}
// Sets fields from original message from storage
clone := support.ToMessage(originalMessage)
valueId := RestorePropertyTag
enableValue := RestoreCanonicalEnableValue
sv := models.NewSingleValueLegacyExtendedProperty()
sv.SetId(&valueId)
sv.SetValue(&enableValue)
svlep := []models.SingleValueLegacyExtendedPropertyable{sv}
clone.SetSingleValueExtendedProperties(svlep)
draft := false
clone.SetIsDraft(&draft)
// Switch workflow based on collision policy
switch cp {
default:
logger.Ctx(ctx).DPanicw("unrecognized restore policy; defaulting to copy",
"policy", cp)
fallthrough
case control.Copy:
return SendMailToBackStore(service, user, destination, clone)
}
}
// SendMailToBackStore function for transporting in-memory messageable item to M365 backstore
// @param user string represents M365 ID of user within the tenant
// @param destination represents M365 ID of a folder within the users's space
// @param message is a models.Messageable interface from "github.com/microsoftgraph/msgraph-sdk-go/models"
func SendMailToBackStore(service graph.Service, user, destination string, message models.Messageable) error {
sentMessage, err := service.Client().UsersById(user).MailFoldersById(destination).Messages().Post(message)
if err != nil {
return support.WrapAndAppend(": "+support.ConnectorStackErrorTrace(err), err, nil)
}
if sentMessage == nil {
return errors.New("message not Sent: blocked by server")
}
return nil
}

View File

@ -5,6 +5,7 @@ import (
"github.com/microsoftgraph/msgraph-sdk-go/models"
msfolder "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders"
msmessage "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages"
msitem "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages/item"
"github.com/pkg/errors"
"github.com/alcionai/corso/internal/connector/graph"
@ -92,6 +93,23 @@ func optionsForMessages(moreOps []string) (*msmessage.MessagesRequestBuilderGetR
return options, nil
}
// optionsForSingleMessage to select allowable option for a singular exchange.Mail object
// @params moreOps is []string of options (e.g. subject, content.Type)
// @return is first call in MessageById().GetWithRequestConfigurationAndResponseHandler
func OptionsForSingleMessage(moreOps []string) (*msitem.MessageItemRequestBuilderGetRequestConfiguration, error) {
selecting, err := buildOptions(moreOps, messages)
if err != nil {
return nil, err
}
requestParams := &msitem.MessageItemRequestBuilderGetQueryParameters{
Select: selecting,
}
options := &msitem.MessageItemRequestBuilderGetRequestConfiguration{
QueryParameters: requestParams,
}
return options, nil
}
// optionsForMailFolders transforms the options into a more dynamic call for MailFolders.
// @param moreOps is a []string of options(e.g. "displayName", "isHidden")
// @return is first call in MailFolders().GetWithRequestConfigurationAndResponseHandler(options, handler)

View File

@ -20,6 +20,7 @@ import (
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/control"
"github.com/alcionai/corso/pkg/logger"
"github.com/alcionai/corso/pkg/selectors"
)
@ -116,7 +117,7 @@ func (gc *GraphConnector) setTenantUsers() error {
options := &msuser.UsersRequestBuilderGetRequestConfiguration{
QueryParameters: requestParams,
}
response, err := gc.graphService.client.Users().GetWithRequestConfigurationAndResponseHandler(options, nil)
response, err := gc.Client().Users().GetWithRequestConfigurationAndResponseHandler(options, nil)
if err != nil {
return err
}
@ -226,14 +227,20 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []data.Collec
pathCounter = map[string]bool{}
attempts, successes int
errs error
folderId *string
)
gc.incrementAwaitingMessages()
policy := control.Copy // TODO policy to be updated from external source after completion of refactoring
for _, dc := range dcs {
// must be user.GetId(), PrimaryName no longer works 6-15-2022
user := dc.FullPath()[1]
items := dc.Items()
pathCounter[strings.Join(dc.FullPath(), "")] = true
if policy == control.Copy {
folderId, errs = exchange.GetCopyRestoreFolder(&gc.graphService, user)
if errs != nil {
return errs
}
}
var exit bool
for !exit {
@ -253,42 +260,21 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []data.Collec
errs = support.WrapAndAppend(data.UUID(), err, errs)
continue
}
message, err := support.CreateMessageFromBytes(buf.Bytes())
if err != nil {
errs = support.WrapAndAppend(data.UUID(), err, errs)
switch policy {
case control.Copy:
err = exchange.RestoreMailMessage(ctx, buf.Bytes(), &gc.graphService, control.Copy, *folderId, user)
if err != nil {
errs = support.WrapAndAppend(data.UUID(), err, errs)
}
default:
errs = support.WrapAndAppend(data.UUID(), errors.New("restore policy not supported"), errs)
continue
}
clone := support.ToMessage(message)
address := dc.FullPath()[3]
valueId := "Integer 0x0E07"
enableValue := "4"
sv := models.NewSingleValueLegacyExtendedProperty()
sv.SetId(&valueId)
sv.SetValue(&enableValue)
svlep := []models.SingleValueLegacyExtendedPropertyable{sv}
clone.SetSingleValueExtendedProperties(svlep)
draft := false
clone.SetIsDraft(&draft)
sentMessage, err := gc.graphService.client.UsersById(user).MailFoldersById(address).Messages().Post(clone)
if err != nil {
errs = support.WrapAndAppend(
data.UUID()+": "+support.ConnectorStackErrorTrace(err),
err, errs)
continue
// TODO: Add to retry Handler for the for failure
}
if sentMessage == nil {
errs = support.WrapAndAppend(data.UUID(), errors.New("Message not Sent: Blocked by server"), errs)
continue
}
successes++
// This completes the restore loop for a message..
}
}
}
gc.incrementAwaitingMessages()
status := support.CreateStatus(ctx, support.Restore, attempts, successes, len(pathCounter), errs)
// set the channel asynchronously so that this func doesn't block.
go func(cos *support.ConnectorOperationStatus) {

View File

@ -22,6 +22,7 @@ import (
type GraphConnectorIntegrationSuite struct {
suite.Suite
connector *GraphConnector
user string
}
func TestGraphConnectorIntegrationSuite(t *testing.T) {
@ -47,6 +48,7 @@ func (suite *GraphConnectorIntegrationSuite) SetupSuite() {
suite.connector, err = NewGraphConnector(a)
suite.NoError(err)
suite.user = "lidiah@8qzvrj.onmicrosoft.com"
}
func (suite *GraphConnectorIntegrationSuite) TestGraphConnector() {

View File

@ -0,0 +1,26 @@
// Code generated by "stringer -type=CollisionPolicy"; DO NOT EDIT.
package control
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[Unknown-0]
_ = x[Copy-1]
_ = x[Skip-2]
_ = x[Replace-3]
}
const _CollisionPolicy_name = "UnknownCopySkipReplace"
var _CollisionPolicy_index = [...]uint8{0, 7, 11, 15, 22}
func (i CollisionPolicy) String() string {
if i < 0 || i >= CollisionPolicy(len(_CollisionPolicy_index)-1) {
return "CollisionPolicy(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _CollisionPolicy_name[_CollisionPolicy_index[i]:_CollisionPolicy_index[i+1]]
}

View File

@ -3,6 +3,7 @@ package control
// CollisionPolicy describes how the datalayer behaves in case of a collision.
type CollisionPolicy int
//go:generate stringer -type=CollisionPolicy
const (
Unknown CollisionPolicy = iota
Copy