GC: Backup all users flag support for exchange (#631)

All-users flag support enabled for Graph-Connector
This commit is contained in:
Danny 2022-08-26 17:05:14 -04:00 committed by GitHub
parent 2ceafd8d62
commit ab92358ea6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 80 additions and 90 deletions

View File

@ -353,6 +353,7 @@ func (suite *ExchangeServiceSuite) TestIterativeFunctions() {
// with corresponding item IDs. New collections are created for each directory // with corresponding item IDs. New collections are created for each directory
callbackFunc := test.iterativeFunction( callbackFunc := test.iterativeFunction(
"testingTenant", "testingTenant",
userID,
test.scope, test.scope,
errs, false, errs, false,
suite.es.credentials, suite.es.credentials,

View File

@ -227,7 +227,7 @@ func SetupExchangeCollectionVars(scope selectors.ExchangeScope) (
nil nil
} }
if scope.IncludesCategory(selectors.ExchangeContactFolder) { if scope.IncludesCategory(selectors.ExchangeContact) {
return models.CreateContactFromDiscriminatorValue, return models.CreateContactFromDiscriminatorValue,
GetAllContactsForUser, GetAllContactsForUser,
IterateAllContactsForCollection, IterateAllContactsForCollection,

View File

@ -36,6 +36,7 @@ type displayable interface {
// @returns a callback func that works with msgraphgocore.PageIterator.Iterate function // @returns a callback func that works with msgraphgocore.PageIterator.Iterate function
type GraphIterateFunc func( type GraphIterateFunc func(
tenant string, tenant string,
user string,
scope selectors.ExchangeScope, scope selectors.ExchangeScope,
errs error, errs error,
failFast bool, failFast bool,
@ -50,6 +51,7 @@ type GraphIterateFunc func(
// placed into a Collection based on the parent folder // placed into a Collection based on the parent folder
func IterateSelectAllMessagesForCollections( func IterateSelectAllMessagesForCollections(
tenant string, tenant string,
user string,
scope selectors.ExchangeScope, scope selectors.ExchangeScope,
errs error, errs error,
failFast bool, failFast bool,
@ -95,6 +97,7 @@ func IterateSelectAllMessagesForCollections(
// the calendarID which originates from M365. // the calendarID which originates from M365.
func IterateSelectAllEventsForCollections( func IterateSelectAllEventsForCollections(
tenant string, tenant string,
user string,
scope selectors.ExchangeScope, scope selectors.ExchangeScope,
errs error, errs error,
failFast bool, failFast bool,
@ -103,8 +106,6 @@ func IterateSelectAllEventsForCollections(
statusCh chan<- *support.ConnectorOperationStatus, statusCh chan<- *support.ConnectorOperationStatus,
) func(any) bool { ) func(any) bool {
return func(eventItem any) bool { return func(eventItem any) bool {
user := scope.Get(selectors.ExchangeUser)[0]
event, ok := eventItem.(models.Eventable) event, ok := eventItem.(models.Eventable)
if !ok { if !ok {
errs = support.WrapAndAppend( errs = support.WrapAndAppend(
@ -173,6 +174,7 @@ func IterateSelectAllEventsForCollections(
// Contacts Ids are placed into a collection based upon the parent folder // Contacts Ids are placed into a collection based upon the parent folder
func IterateAllContactsForCollection( func IterateAllContactsForCollection(
tenant string, tenant string,
user string,
scope selectors.ExchangeScope, scope selectors.ExchangeScope,
errs error, errs error,
failFast bool, failFast bool,
@ -181,8 +183,6 @@ func IterateAllContactsForCollection(
statusCh chan<- *support.ConnectorOperationStatus, statusCh chan<- *support.ConnectorOperationStatus,
) func(any) bool { ) func(any) bool {
return func(contactsItem any) bool { return func(contactsItem any) bool {
user := scope.Get(selectors.ExchangeUser)[0]
contact, ok := contactsItem.(models.Contactable) contact, ok := contactsItem.(models.Contactable)
if !ok { if !ok {
errs = support.WrapAndAppend(user, errors.New("contact iteration failure"), errs) errs = support.WrapAndAppend(user, errors.New("contact iteration failure"), errs)
@ -209,8 +209,12 @@ func IterateAllContactsForCollection(
} }
} }
// IterateAndFilterMessagesForCollections is a filtering GraphIterateFunc
// that places exchange mail message ids belonging to specific directories
// into a Collection. Messages outside of those directories are omitted.
func IterateAndFilterMessagesForCollections( func IterateAndFilterMessagesForCollections(
tenant string, tenant string,
user string,
scope selectors.ExchangeScope, scope selectors.ExchangeScope,
errs error, errs error,
failFast bool, failFast bool,
@ -220,7 +224,6 @@ func IterateAndFilterMessagesForCollections(
) func(any) bool { ) func(any) bool {
var isFilterSet bool var isFilterSet bool
return func(messageItem any) bool { return func(messageItem any) bool {
user := scope.Get(selectors.ExchangeUser)[0]
if !isFilterSet { if !isFilterSet {
err := CollectMailFolders( err := CollectMailFolders(
@ -256,6 +259,7 @@ func IterateAndFilterMessagesForCollections(
func IterateFilterFolderDirectoriesForCollections( func IterateFilterFolderDirectoriesForCollections(
tenant string, tenant string,
user string,
scope selectors.ExchangeScope, scope selectors.ExchangeScope,
errs error, errs error,
failFast bool, failFast bool,
@ -268,7 +272,6 @@ func IterateFilterFolderDirectoriesForCollections(
err error err error
) )
return func(folderItem any) bool { return func(folderItem any) bool {
user := scope.Get(selectors.ExchangeUser)[0]
folder, ok := folderItem.(models.MailFolderable) folder, ok := folderItem.(models.MailFolderable)
if !ok { if !ok {
errs = support.WrapAndAppend( errs = support.WrapAndAppend(
@ -279,6 +282,10 @@ func IterateFilterFolderDirectoriesForCollections(
return true return true
} }
// Continue to iterate if folder name is empty
if folder.GetDisplayName() == nil {
return true
}
if !scope.Contains(selectors.ExchangeMailFolder, *folder.GetDisplayName()) { if !scope.Contains(selectors.ExchangeMailFolder, *folder.GetDisplayName()) {
return true return true
} }

View File

@ -133,6 +133,7 @@ func CollectMailFolders(
callbackFunc := IterateFilterFolderDirectoriesForCollections( callbackFunc := IterateFilterFolderDirectoriesForCollections(
tenant, tenant,
user,
scope, scope,
err, err,
failFast, failFast,

View File

@ -201,39 +201,23 @@ func (gc *GraphConnector) ExchangeDataCollection(
) ([]data.Collection, error) { ) ([]data.Collection, error) {
eb, err := selector.ToExchangeBackup() eb, err := selector.ToExchangeBackup()
if err != nil { if err != nil {
return nil, errors.Wrap(err, "collecting exchange data") return nil, errors.Wrap(err, "exchangeDataCollection: unable to parse selector")
} }
scopes := eb.DiscreteScopes(gc.GetUsers())
collections := []data.Collection{} collections := []data.Collection{}
scopes := eb.Scopes()
var errs error var errs error
// for each scope that includes mail messages, get all
for _, scope := range scopes { for _, scope := range scopes {
for _, user := range scope.Get(selectors.ExchangeUser) { // Creates a map of collections based on scope
// TODO: handle "get mail for all users" dcs, err := gc.createCollections(ctx, scope)
// this would probably no-op without this check, if err != nil {
// but we want it made obvious that we're punting. user := scope.Get(selectors.ExchangeUser)
if user == selectors.AnyTgt { return nil, support.WrapAndAppend(user[0], err, errs)
errs = support.WrapAndAppend( }
"all-users", for _, collection := range dcs {
errors.New("all users selector currently not handled"), collections = append(collections, collection)
errs)
continue
}
// Creates a map of collections based on scope
dcs, err := gc.createCollections(ctx, scope)
if err != nil {
return nil, support.WrapAndAppend(user, err, errs)
}
if len(dcs) > 0 {
for _, collection := range dcs {
collections = append(collections, collection)
}
}
} }
} }
return collections, errs return collections, errs
} }
@ -307,48 +291,52 @@ func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []data.Collec
func (gc *GraphConnector) createCollections( func (gc *GraphConnector) createCollections(
ctx context.Context, ctx context.Context,
scope selectors.ExchangeScope, scope selectors.ExchangeScope,
) (map[string]*exchange.Collection, error) { ) ([]*exchange.Collection, error) {
var ( var (
transformer absser.ParsableFactory transformer absser.ParsableFactory
query exchange.GraphQuery query exchange.GraphQuery
gIter exchange.GraphIterateFunc gIter exchange.GraphIterateFunc
errs error
) )
user := scope.Get(selectors.ExchangeUser)[0]
transformer, query, gIter, err := exchange.SetupExchangeCollectionVars(scope) transformer, query, gIter, err := exchange.SetupExchangeCollectionVars(scope)
if err != nil { if err != nil {
return nil, support.WrapAndAppend(user, err, nil) return nil, support.WrapAndAppend(gc.Service().Adapter().GetBaseUrl(), err, nil)
}
response, err := query(&gc.graphService, user)
if err != nil {
return nil, errors.Wrapf(
err,
"user %s M365 query: %s",
user, support.ConnectorStackErrorTrace(err))
} }
users := scope.Get(selectors.ExchangeUser)
allCollections := make([]*exchange.Collection, 0)
// Create collection of ExchangeDataCollection
for _, user := range users {
collections := make(map[string]*exchange.Collection)
response, err := query(&gc.graphService, user)
if err != nil {
return nil, errors.Wrapf(
err,
"user %s M365 query: %s",
user, support.ConnectorStackErrorTrace(err))
}
pageIterator, err := msgraphgocore.NewPageIterator(response, &gc.graphService.adapter, transformer) pageIterator, err := msgraphgocore.NewPageIterator(response, &gc.graphService.adapter, transformer)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Create collection of ExchangeDataCollection and create data Holder
collections := make(map[string]*exchange.Collection)
var errs error
// callbackFunc iterates through all models.Messageable and fills exchange.Collection.jobs[]
// with corresponding item IDs. New collections are created for each directory
callbackFunc := gIter(gc.tenant, scope, errs, gc.failFast, gc.credentials, collections, gc.statusCh)
iterateError := pageIterator.Iterate(callbackFunc)
if iterateError != nil {
errs = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), iterateError, errs)
}
if errs != nil {
return nil, errs // return error if snapshot is incomplete
}
for range collections { // callbackFunc iterates through all M365 object target and fills exchange.Collection.jobs[]
gc.incrementAwaitingMessages() // with corresponding item M365IDs. New collections are created for each directory.
// Each directory used the M365 Identifier. The use of ID stops collisions betweens users
callbackFunc := gIter(gc.tenant, user, scope, errs, gc.failFast, gc.credentials, collections, gc.statusCh)
iterateError := pageIterator.Iterate(callbackFunc)
if iterateError != nil {
errs = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), iterateError, errs)
}
if errs != nil {
return nil, errs // return error if snapshot is incomplete
}
for _, collection := range collections {
gc.incrementAwaitingMessages()
allCollections = append(allCollections, collection)
}
} }
return allCollections, errs
return collections, errs
} }
// AwaitStatus updates status field based on item within statusChannel. // AwaitStatus updates status field based on item within statusChannel.

View File

@ -12,6 +12,7 @@ import (
"github.com/alcionai/corso/internal/connector/mockconnector" "github.com/alcionai/corso/internal/connector/mockconnector"
"github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/data" "github.com/alcionai/corso/internal/data"
"github.com/alcionai/corso/internal/tester"
"github.com/alcionai/corso/pkg/account" "github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/credentials" "github.com/alcionai/corso/pkg/credentials"
) )
@ -24,6 +25,7 @@ type DisconnectedGraphConnectorSuite struct {
} }
func TestDisconnectedGraphSuite(t *testing.T) { func TestDisconnectedGraphSuite(t *testing.T) {
tester.LogTimeOfTest(t)
suite.Run(t, new(DisconnectedGraphConnectorSuite)) suite.Run(t, new(DisconnectedGraphConnectorSuite))
} }

View File

@ -80,6 +80,8 @@ func (suite *GraphConnectorIntegrationSuite) TestSetTenantUsers() {
// GraphConnector remains stable to receive a non-zero amount of Collections // GraphConnector remains stable to receive a non-zero amount of Collections
// for the Exchange Package. Enabled exchange applications: // for the Exchange Package. Enabled exchange applications:
// - mail // - mail
// - contacts
// - events
func (suite *GraphConnectorIntegrationSuite) TestExchangeDataCollection() { func (suite *GraphConnectorIntegrationSuite) TestExchangeDataCollection() {
t := suite.T() t := suite.T()
connector := loadConnector(t) connector := loadConnector(t)
@ -248,35 +250,24 @@ func (suite *GraphConnectorIntegrationSuite) TestRestoreMessages() {
assert.Equal(t, status.FolderCount, 1) assert.Equal(t, status.FolderCount, 1)
} }
// TestGraphConnector_SingleMailFolderCollectionQuery verifies single folder support // TestAccessOfInboxAllUsers verifies that GraphConnector can
// for Backup operation // support `--all-users` for backup operations. Selector.DiscreteScopes
func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_SingleMailFolderCollectionQuery() { // returns all of the users within one scope. Only users who have
// messages in their inbox will have a collection returned.
// The final test insures that more than a 75% of the user collections are
// returned. If an error was experienced, the test will fail overall
func (suite *GraphConnectorIntegrationSuite) TestAccessOfInboxAllUsers() {
t := suite.T() t := suite.T()
connector := loadConnector(t)
sel := selectors.NewExchangeBackup() sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders([]string{suite.user}, []string{"Inbox"})) sel.Include(sel.MailFolders(selectors.Any(), []string{"Inbox"}))
scopes := sel.Scopes() scopes := sel.DiscreteScopes(connector.GetUsers())
for _, scope := range scopes { for _, scope := range scopes {
collections, err := suite.connector.createCollections(context.Background(), scope) users := scope.Get(selectors.ExchangeUser)
standard := (len(users) / 4) * 3
collections, err := connector.createCollections(context.Background(), scope)
require.NoError(t, err) require.NoError(t, err)
suite.Equal(len(collections), 1) suite.Greater(len(collections), standard)
for _, edc := range collections {
number := 0
streamChannel := edc.Items()
// Verify that each message can be restored
for stream := range streamChannel {
testName := fmt.Sprintf("%s_InboxMessage_%d", edc.FullPath()[1], number)
suite.T().Run(testName, func(t *testing.T) {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
message, err := support.CreateMessageFromBytes(buf.Bytes())
assert.NotNil(t, message)
assert.NoError(t, err)
number++
})
}
}
} }
} }