connector data coll refactor (#1507)

A quick code movement before adding the
sharepoint datacollection production so that
we minimize graph_conn file bloat.
This commit is contained in:
Keepers 2022-11-17 14:04:16 -07:00 committed by GitHub
parent 3d96bfcffa
commit 2c913f9ef7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 686 additions and 574 deletions

View File

@ -0,0 +1,231 @@
package connector
import (
"context"
"fmt"
"strings"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/connector/exchange"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/onedrive"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
D "github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/selectors"
)
// ---------------------------------------------------------------------------
// Data Collections
// ---------------------------------------------------------------------------
// DataCollections utility function to launch backup operations for exchange and onedrive
func (gc *GraphConnector) DataCollections(ctx context.Context, sels selectors.Selector) ([]data.Collection, error) {
ctx, end := D.Span(ctx, "gc:dataCollections", D.Index("service", sels.Service.String()))
defer end()
err := verifyBackupInputs(sels, gc.Users)
if err != nil {
return nil, err
}
switch sels.Service {
case selectors.ServiceExchange:
return gc.ExchangeDataCollection(ctx, sels)
case selectors.ServiceOneDrive:
return gc.OneDriveDataCollections(ctx, sels)
default:
return nil, errors.Errorf("service %s not supported", sels)
}
}
func verifyBackupInputs(sel selectors.Selector, mapOfUsers map[string]string) error {
var personnel []string
// retrieve users from selectors
switch sel.Service {
case selectors.ServiceExchange:
backup, err := sel.ToExchangeBackup()
if err != nil {
return err
}
for _, scope := range backup.Scopes() {
temp := scope.Get(selectors.ExchangeUser)
personnel = append(personnel, temp...)
}
case selectors.ServiceOneDrive:
backup, err := sel.ToOneDriveBackup()
if err != nil {
return err
}
for _, user := range backup.Scopes() {
temp := user.Get(selectors.OneDriveUser)
personnel = append(personnel, temp...)
}
default:
return errors.New("service %s not supported")
}
// verify personnel
normUsers := map[string]struct{}{}
for k := range mapOfUsers {
normUsers[strings.ToLower(k)] = struct{}{}
}
for _, user := range personnel {
if _, ok := normUsers[strings.ToLower(user)]; !ok {
return fmt.Errorf("%s user not found within tenant", user)
}
}
return nil
}
// createCollections - utility function that retrieves M365
// IDs through Microsoft Graph API. The selectors.ExchangeScope
// determines the type of collections that are stored.
// to the GraphConnector struct.
func (gc *GraphConnector) createCollections(
ctx context.Context,
scope selectors.ExchangeScope,
) ([]*exchange.Collection, error) {
var errs *multierror.Error
users := scope.Get(selectors.ExchangeUser)
allCollections := make([]*exchange.Collection, 0)
// Create collection of ExchangeDataCollection
for _, user := range users {
collections := make(map[string]*exchange.Collection)
qp := graph.QueryParams{
User: user,
Scope: scope,
FailFast: gc.failFast,
Credentials: gc.credentials,
}
itemCategory := qp.Scope.Category().PathType()
foldersComplete, closer := observe.MessageWithCompletion(fmt.Sprintf("∙ %s - %s:", itemCategory.String(), user))
defer closer()
defer close(foldersComplete)
resolver, err := exchange.PopulateExchangeContainerResolver(
ctx,
qp,
qp.Scope.Category().PathType(),
)
if err != nil {
return nil, errors.Wrap(err, "getting folder cache")
}
err = exchange.FilterContainersAndFillCollections(
ctx,
qp,
collections,
gc.UpdateStatus,
resolver)
if err != nil {
return nil, errors.Wrap(err, "filling collections")
}
foldersComplete <- struct{}{}
for _, collection := range collections {
gc.incrementAwaitingMessages()
allCollections = append(allCollections, collection)
}
}
return allCollections, errs.ErrorOrNil()
}
// ExchangeDataCollections returns a DataCollection which the caller can
// use to read mailbox data out for the specified user
// Assumption: User exists
//
// Add iota to this call -> mail, contacts, calendar, etc.
func (gc *GraphConnector) ExchangeDataCollection(
ctx context.Context,
selector selectors.Selector,
) ([]data.Collection, error) {
eb, err := selector.ToExchangeBackup()
if err != nil {
return nil, errors.Wrap(err, "exchangeDataCollection: parsing selector")
}
var (
scopes = eb.DiscreteScopes(gc.GetUsers())
collections = []data.Collection{}
errs error
)
for _, scope := range scopes {
// Creates a map of collections based on scope
dcs, err := gc.createCollections(ctx, scope)
if err != nil {
user := scope.Get(selectors.ExchangeUser)
return nil, support.WrapAndAppend(user[0], err, errs)
}
for _, collection := range dcs {
collections = append(collections, collection)
}
}
return collections, errs
}
// OneDriveDataCollections returns a set of DataCollection which represents the OneDrive data
// for the specified user
func (gc *GraphConnector) OneDriveDataCollections(
ctx context.Context,
selector selectors.Selector,
) ([]data.Collection, error) {
odb, err := selector.ToOneDriveBackup()
if err != nil {
return nil, errors.Wrap(err, "oneDriveDataCollection: parsing selector")
}
collections := []data.Collection{}
scopes := odb.DiscreteScopes(gc.GetUsers())
var errs error
// for each scope that includes oneDrive items, get all
for _, scope := range scopes {
for _, user := range scope.Get(selectors.OneDriveUser) {
logger.Ctx(ctx).With("user", user).Debug("Creating OneDrive collections")
odcs, err := onedrive.NewCollections(
gc.credentials.AzureTenantID,
user,
scope,
&gc.graphService,
gc.UpdateStatus,
).Get(ctx)
if err != nil {
return nil, support.WrapAndAppend(user, err, errs)
}
collections = append(collections, odcs...)
}
}
for range collections {
gc.incrementAwaitingMessages()
}
return collections, errs
}

View File

@ -0,0 +1,418 @@
package connector
import (
"bytes"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/exchange"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/selectors"
)
// ---------------------------------------------------------------------------
// DataCollection tests
// ---------------------------------------------------------------------------
type ConnectorDataCollectionIntegrationSuite struct {
suite.Suite
connector *GraphConnector
user string
}
func TestConnectorDataCollectionIntegrationSuite(t *testing.T) {
if err := tester.RunOnAny(
tester.CorsoCITests,
tester.CorsoConnectorDataCollectionTests,
); err != nil {
t.Skip(err)
}
suite.Run(t, new(ConnectorDataCollectionIntegrationSuite))
}
func (suite *ConnectorDataCollectionIntegrationSuite) SetupSuite() {
ctx, flush := tester.NewContext()
defer flush()
_, err := tester.GetRequiredEnvVars(tester.M365AcctCredEnvs...)
require.NoError(suite.T(), err)
suite.connector = loadConnector(ctx, suite.T())
suite.user = tester.M365UserID(suite.T())
tester.LogTimeOfTest(suite.T())
}
// TestExchangeDataCollection verifies interface between operation and
// GraphConnector remains stable to receive a non-zero amount of Collections
// for the Exchange Package. Enabled exchange applications:
// - mail
// - contacts
// - events
func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection() {
ctx, flush := tester.NewContext()
defer flush()
connector := loadConnector(ctx, suite.T())
tests := []struct {
name string
getSelector func(t *testing.T) selectors.Selector
}{
{
name: suite.user + " Email",
getSelector: func(t *testing.T) selectors.Selector {
sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
return sel.Selector
},
},
{
name: suite.user + " Contacts",
getSelector: func(t *testing.T) selectors.Selector {
sel := selectors.NewExchangeBackup()
sel.Include(sel.ContactFolders(
[]string{suite.user},
[]string{exchange.DefaultContactFolder},
selectors.PrefixMatch()))
return sel.Selector
},
},
// {
// name: suite.user + " Events",
// getSelector: func(t *testing.T) selectors.Selector {
// sel := selectors.NewExchangeBackup()
// sel.Include(sel.EventCalendars(
// []string{suite.user},
// []string{exchange.DefaultCalendar},
// selectors.PrefixMatch(),
// ))
// return sel.Selector
// },
// },
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t))
require.NoError(t, err)
assert.Equal(t, len(collection), 1)
channel := collection[0].Items()
for object := range channel {
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(object.ToReader())
assert.NoError(t, err, "received a buf.Read error")
}
status := connector.AwaitStatus()
assert.NotZero(t, status.Successful)
t.Log(status.String())
})
}
}
// TestInvalidUserForDataCollections ensures verification process for users
func (suite *ConnectorDataCollectionIntegrationSuite) TestInvalidUserForDataCollections() {
ctx, flush := tester.NewContext()
defer flush()
invalidUser := "foo@example.com"
connector := loadConnector(ctx, suite.T())
tests := []struct {
name string
getSelector func(t *testing.T) selectors.Selector
}{
{
name: "invalid exchange backup user",
getSelector: func(t *testing.T) selectors.Selector {
sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders([]string{invalidUser}, selectors.Any()))
return sel.Selector
},
},
{
name: "Invalid onedrive backup user",
getSelector: func(t *testing.T) selectors.Selector {
sel := selectors.NewOneDriveBackup()
sel.Include(sel.Folders([]string{invalidUser}, selectors.Any()))
return sel.Selector
},
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
collections, err := connector.DataCollections(ctx, test.getSelector(t))
assert.Error(t, err)
assert.Empty(t, collections)
})
}
}
func (suite *GraphConnectorIntegrationSuite) TestEmptyCollections() {
dest := tester.DefaultTestRestoreDestination()
table := []struct {
name string
col []data.Collection
sel selectors.Selector
}{
{
name: "ExchangeNil",
col: nil,
sel: selectors.Selector{
Service: selectors.ServiceExchange,
},
},
{
name: "ExchangeEmpty",
col: []data.Collection{},
sel: selectors.Selector{
Service: selectors.ServiceExchange,
},
},
{
name: "OneDriveNil",
col: nil,
sel: selectors.Selector{
Service: selectors.ServiceOneDrive,
},
},
{
name: "OneDriveEmpty",
col: []data.Collection{},
sel: selectors.Selector{
Service: selectors.ServiceOneDrive,
},
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
ctx, flush := tester.NewContext()
defer flush()
deets, err := suite.connector.RestoreDataCollections(ctx, test.sel, dest, test.col)
require.NoError(t, err)
assert.NotNil(t, deets)
stats := suite.connector.AwaitStatus()
assert.Zero(t, stats.ObjectCount)
assert.Zero(t, stats.FolderCount)
assert.Zero(t, stats.Successful)
})
}
}
// ---------------------------------------------------------------------------
// CreateCollection tests
// ---------------------------------------------------------------------------
type ConnectorCreateCollectionIntegrationSuite struct {
suite.Suite
connector *GraphConnector
user string
}
func TestConnectorCreateCollectionIntegrationSuite(t *testing.T) {
if err := tester.RunOnAny(
tester.CorsoCITests,
tester.CorsoConnectorCreateCollectionTests,
); err != nil {
t.Skip(err)
}
suite.Run(t, new(ConnectorCreateCollectionIntegrationSuite))
}
func (suite *ConnectorCreateCollectionIntegrationSuite) SetupSuite() {
ctx, flush := tester.NewContext()
defer flush()
_, err := tester.GetRequiredEnvVars(tester.M365AcctCredEnvs...)
require.NoError(suite.T(), err)
suite.connector = loadConnector(ctx, suite.T())
suite.user = tester.M365UserID(suite.T())
tester.LogTimeOfTest(suite.T())
}
// TestMailSerializationRegression verifies that all mail data stored in the
// test account can be successfully downloaded into bytes and restored into
// M365 mail objects
func (suite *ConnectorCreateCollectionIntegrationSuite) TestMailSerializationRegression() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
connector := loadConnector(ctx, t)
sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
collection, err := connector.createCollections(ctx, sel.Scopes()[0])
require.NoError(t, err)
for _, edc := range collection {
suite.T().Run(edc.FullPath().String(), func(t *testing.T) {
streamChannel := edc.Items()
// Verify that each message can be restored
for stream := range streamChannel {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
message, err := support.CreateMessageFromBytes(buf.Bytes())
assert.NotNil(t, message)
assert.NoError(t, err)
}
})
}
status := connector.AwaitStatus()
suite.NotNil(status)
suite.Equal(status.ObjectCount, status.Successful)
}
// TestContactSerializationRegression verifies ability to query contact items
// and to store contact within Collection. Downloaded contacts are run through
// a regression test to ensure that downloaded items can be uploaded.
func (suite *ConnectorCreateCollectionIntegrationSuite) TestContactSerializationRegression() {
ctx, flush := tester.NewContext()
defer flush()
connector := loadConnector(ctx, suite.T())
tests := []struct {
name string
getCollection func(t *testing.T) []*exchange.Collection
}{
{
name: "Default Contact Folder",
getCollection: func(t *testing.T) []*exchange.Collection {
scope := selectors.
NewExchangeBackup().
ContactFolders([]string{suite.user}, []string{exchange.DefaultContactFolder}, selectors.PrefixMatch())[0]
collections, err := connector.createCollections(ctx, scope)
require.NoError(t, err)
return collections
},
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
edcs := test.getCollection(t)
require.Equal(t, len(edcs), 1)
edc := edcs[0]
assert.Equal(t, edc.FullPath().Folder(), exchange.DefaultContactFolder)
streamChannel := edc.Items()
count := 0
for stream := range streamChannel {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
contact, err := support.CreateContactFromBytes(buf.Bytes())
assert.NotNil(t, contact)
assert.NoError(t, err, "error on converting contact bytes: "+string(buf.Bytes()))
count++
}
assert.NotZero(t, count)
status := connector.AwaitStatus()
suite.NotNil(status)
suite.Equal(status.ObjectCount, status.Successful)
})
}
}
// TestEventsSerializationRegression ensures functionality of createCollections
// to be able to successfully query, download and restore event objects
func (suite *ConnectorCreateCollectionIntegrationSuite) TestEventsSerializationRegression() {
ctx, flush := tester.NewContext()
defer flush()
connector := loadConnector(ctx, suite.T())
tests := []struct {
name, expected string
getCollection func(t *testing.T) []*exchange.Collection
}{
{
name: "Default Event Calendar",
expected: exchange.DefaultCalendar,
getCollection: func(t *testing.T) []*exchange.Collection {
sel := selectors.NewExchangeBackup()
sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch()))
collections, err := connector.createCollections(ctx, sel.Scopes()[0])
require.NoError(t, err)
return collections
},
},
{
name: "Birthday Calendar",
expected: "Birthdays",
getCollection: func(t *testing.T) []*exchange.Collection {
sel := selectors.NewExchangeBackup()
sel.Include(sel.EventCalendars([]string{suite.user}, []string{"Birthdays"}, selectors.PrefixMatch()))
collections, err := connector.createCollections(ctx, sel.Scopes()[0])
require.NoError(t, err)
return collections
},
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
collections := test.getCollection(t)
require.Equal(t, len(collections), 1)
edc := collections[0]
assert.Equal(t, edc.FullPath().Folder(), test.expected)
streamChannel := edc.Items()
for stream := range streamChannel {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
event, err := support.CreateEventFromBytes(buf.Bytes())
assert.NotNil(t, event)
assert.NoError(t, err, "experienced error parsing event bytes: "+string(buf.Bytes()))
}
status := connector.AwaitStatus()
suite.NotNil(status)
suite.Equal(status.ObjectCount, status.Successful)
})
}
}
// TestAccessOfInboxAllUsers verifies that GraphConnector can
// support `--users *` for backup operations. Selector.DiscreteScopes
// returns all of the users within one scope. Only users who have
// messages in their inbox will have a collection returned.
// The final test insures that more than a 75% of the user collections are
// returned. If an error was experienced, the test will fail overall
func (suite *ConnectorCreateCollectionIntegrationSuite) TestAccessOfInboxAllUsers() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
connector := loadConnector(ctx, t)
sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders(selectors.Any(), []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
scopes := sel.DiscreteScopes(connector.GetUsers())
for _, scope := range scopes {
users := scope.Get(selectors.ExchangeUser)
standard := (len(users) / 4) * 3
collections, err := connector.createCollections(ctx, scope)
require.NoError(t, err)
suite.Greater(len(collections), standard)
}
}

View File

@ -6,10 +6,8 @@ import (
"context"
"fmt"
"runtime/trace"
"strings"
"sync"
"github.com/hashicorp/go-multierror"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
"github.com/microsoftgraph/msgraph-sdk-go/models"
@ -21,14 +19,16 @@ import (
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
D "github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/selectors"
)
// ---------------------------------------------------------------------------
// Graph Connector
// ---------------------------------------------------------------------------
// GraphConnector is a struct used to wrap the GraphServiceClient and
// GraphRequestAdapter from the msgraph-sdk-go. Additional fields are for
// bookkeeping and interfacing with other component.
@ -209,42 +209,6 @@ func buildFromMap(isKey bool, mapping map[string]string) []string {
return returnString
}
// ExchangeDataStream returns a DataCollection which the caller can
// use to read mailbox data out for the specified user
// Assumption: User exists
//
// Add iota to this call -> mail, contacts, calendar, etc.
func (gc *GraphConnector) ExchangeDataCollection(
ctx context.Context,
selector selectors.Selector,
) ([]data.Collection, error) {
eb, err := selector.ToExchangeBackup()
if err != nil {
return nil, errors.Wrap(err, "exchangeDataCollection: parsing selector")
}
var (
scopes = eb.DiscreteScopes(gc.GetUsers())
collections = []data.Collection{}
errs error
)
for _, scope := range scopes {
// Creates a map of collections based on scope
dcs, err := gc.createCollections(ctx, scope)
if err != nil {
user := scope.Get(selectors.ExchangeUser)
return nil, support.WrapAndAppend(user[0], err, errs)
}
for _, collection := range dcs {
collections = append(collections, collection)
}
}
return collections, errs
}
// RestoreDataCollections restores data from the specified collections
// into M365 using the GraphAPI.
// SideEffect: gc.status is updated at the completion of operation
@ -278,67 +242,6 @@ func (gc *GraphConnector) RestoreDataCollections(
return deets, err
}
// createCollections - utility function that retrieves M365
// IDs through Microsoft Graph API. The selectors.ExchangeScope
// determines the type of collections that are stored.
// to the GraphConnector struct.
func (gc *GraphConnector) createCollections(
ctx context.Context,
scope selectors.ExchangeScope,
) ([]*exchange.Collection, error) {
var errs *multierror.Error
users := scope.Get(selectors.ExchangeUser)
allCollections := make([]*exchange.Collection, 0)
// Create collection of ExchangeDataCollection
for _, user := range users {
collections := make(map[string]*exchange.Collection)
qp := graph.QueryParams{
User: user,
Scope: scope,
FailFast: gc.failFast,
Credentials: gc.credentials,
}
itemCategory := qp.Scope.Category().PathType()
foldersComplete, closer := observe.MessageWithCompletion(fmt.Sprintf("∙ %s - %s:", itemCategory.String(), user))
defer closer()
defer close(foldersComplete)
resolver, err := exchange.PopulateExchangeContainerResolver(
ctx,
qp,
qp.Scope.Category().PathType(),
)
if err != nil {
return nil, errors.Wrap(err, "getting folder cache")
}
err = exchange.FilterContainersAndFillCollections(
ctx,
qp,
collections,
gc.UpdateStatus,
resolver)
if err != nil {
return nil, errors.Wrap(err, "filling collections")
}
foldersComplete <- struct{}{}
for _, collection := range collections {
gc.incrementAwaitingMessages()
allCollections = append(allCollections, collection)
}
}
return allCollections, errs.ErrorOrNil()
}
// AwaitStatus waits for all gc tasks to complete and then returns status
func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus {
defer func() {
@ -378,6 +281,10 @@ func (gc *GraphConnector) incrementAwaitingMessages() {
gc.wg.Add(1)
}
// ---------------------------------------------------------------------------
// Helper Funcs
// ---------------------------------------------------------------------------
// IsRecoverableError returns true iff error is a RecoverableGCEerror
func IsRecoverableError(e error) bool {
var recoverable support.RecoverableGCError
@ -389,113 +296,3 @@ func IsNonRecoverableError(e error) bool {
var nonRecoverable support.NonRecoverableGCError
return errors.As(e, &nonRecoverable)
}
// DataCollections utility function to launch backup operations for exchange and onedrive
func (gc *GraphConnector) DataCollections(ctx context.Context, sels selectors.Selector) ([]data.Collection, error) {
ctx, end := D.Span(ctx, "gc:dataCollections", D.Index("service", sels.Service.String()))
defer end()
err := verifyBackupInputs(sels, gc.Users)
if err != nil {
return nil, err
}
switch sels.Service {
case selectors.ServiceExchange:
return gc.ExchangeDataCollection(ctx, sels)
case selectors.ServiceOneDrive:
return gc.OneDriveDataCollections(ctx, sels)
default:
return nil, errors.Errorf("service %s not supported", sels)
}
}
// OneDriveDataCollections returns a set of DataCollection which represents the OneDrive data
// for the specified user
func (gc *GraphConnector) OneDriveDataCollections(
ctx context.Context,
selector selectors.Selector,
) ([]data.Collection, error) {
odb, err := selector.ToOneDriveBackup()
if err != nil {
return nil, errors.Wrap(err, "oneDriveDataCollection: parsing selector")
}
collections := []data.Collection{}
scopes := odb.DiscreteScopes(gc.GetUsers())
var errs error
// for each scope that includes oneDrive items, get all
for _, scope := range scopes {
for _, user := range scope.Get(selectors.OneDriveUser) {
logger.Ctx(ctx).With("user", user).Debug("Creating OneDrive collections")
odcs, err := onedrive.NewCollections(
gc.credentials.AzureTenantID,
user,
scope,
&gc.graphService,
gc.UpdateStatus,
).Get(ctx)
if err != nil {
return nil, support.WrapAndAppend(user, err, errs)
}
collections = append(collections, odcs...)
}
}
for range collections {
gc.incrementAwaitingMessages()
}
return collections, errs
}
func verifyBackupInputs(sel selectors.Selector, mapOfUsers map[string]string) error {
var personnel []string
// retrieve users from selectors
switch sel.Service {
case selectors.ServiceExchange:
backup, err := sel.ToExchangeBackup()
if err != nil {
return err
}
for _, scope := range backup.Scopes() {
temp := scope.Get(selectors.ExchangeUser)
personnel = append(personnel, temp...)
}
case selectors.ServiceOneDrive:
backup, err := sel.ToOneDriveBackup()
if err != nil {
return err
}
for _, user := range backup.Scopes() {
temp := user.Get(selectors.OneDriveUser)
personnel = append(personnel, temp...)
}
default:
return errors.New("service %s not supported")
}
// verify personnel
normUsers := map[string]struct{}{}
for k := range mapOfUsers {
normUsers[strings.ToLower(k)] = struct{}{}
}
for _, user := range personnel {
if _, ok := normUsers[strings.ToLower(user)]; !ok {
return fmt.Errorf("%s user not found within tenant", user)
}
}
return nil
}

View File

@ -1,6 +1,7 @@
package connector
import (
"context"
"io"
"reflect"
"testing"
@ -13,6 +14,7 @@ import (
"github.com/alcionai/corso/src/internal/connector/mockconnector"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
@ -878,3 +880,11 @@ func getSelectorWith(service path.ServiceType) selectors.Selector {
Service: s,
}
}
func loadConnector(ctx context.Context, t *testing.T) *GraphConnector {
a := tester.NewM365Account(t)
connector, err := NewGraphConnector(ctx, a)
require.NoError(t, err)
return connector
}

View File

@ -1,8 +1,6 @@
package connector
import (
"bytes"
"context"
"testing"
"time"
@ -12,7 +10,6 @@ import (
"github.com/alcionai/corso/src/internal/connector/exchange"
"github.com/alcionai/corso/src/internal/connector/mockconnector"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control"
@ -26,14 +23,6 @@ type GraphConnectorIntegrationSuite struct {
user string
}
func loadConnector(ctx context.Context, t *testing.T) *GraphConnector {
a := tester.NewM365Account(t)
connector, err := NewGraphConnector(ctx, a)
require.NoError(t, err)
return connector
}
func TestGraphConnectorIntegrationSuite(t *testing.T) {
if err := tester.RunOnAny(
tester.CorsoCITests,
@ -80,287 +69,6 @@ func (suite *GraphConnectorIntegrationSuite) TestSetTenantUsers() {
suite.Greater(len(newConnector.Users), 0)
}
// TestInvalidUserForDataCollections ensures verification process for users
func (suite *GraphConnectorIntegrationSuite) TestInvalidUserForDataCollections() {
ctx, flush := tester.NewContext()
defer flush()
invalidUser := "foo@example.com"
connector := loadConnector(ctx, suite.T())
tests := []struct {
name string
getSelector func(t *testing.T) selectors.Selector
}{
{
name: "invalid exchange backup user",
getSelector: func(t *testing.T) selectors.Selector {
sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders([]string{invalidUser}, selectors.Any()))
return sel.Selector
},
},
{
name: "Invalid onedrive backup user",
getSelector: func(t *testing.T) selectors.Selector {
sel := selectors.NewOneDriveBackup()
sel.Include(sel.Folders([]string{invalidUser}, selectors.Any()))
return sel.Selector
},
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
collections, err := connector.DataCollections(ctx, test.getSelector(t))
assert.Error(t, err)
assert.Empty(t, collections)
})
}
}
// TestExchangeDataCollection verifies interface between operation and
// GraphConnector remains stable to receive a non-zero amount of Collections
// for the Exchange Package. Enabled exchange applications:
// - mail
// - contacts
// - events
func (suite *GraphConnectorIntegrationSuite) TestExchangeDataCollection() {
ctx, flush := tester.NewContext()
defer flush()
connector := loadConnector(ctx, suite.T())
tests := []struct {
name string
getSelector func(t *testing.T) selectors.Selector
}{
{
name: suite.user + " Email",
getSelector: func(t *testing.T) selectors.Selector {
sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
return sel.Selector
},
},
{
name: suite.user + " Contacts",
getSelector: func(t *testing.T) selectors.Selector {
sel := selectors.NewExchangeBackup()
sel.Include(sel.ContactFolders(
[]string{suite.user},
[]string{exchange.DefaultContactFolder},
selectors.PrefixMatch()))
return sel.Selector
},
},
// {
// name: suite.user + " Events",
// getSelector: func(t *testing.T) selectors.Selector {
// sel := selectors.NewExchangeBackup()
// sel.Include(sel.EventCalendars([]string{suite.user},
// []string{exchange.DefaultCalendar}, selectors.PrefixMatch()))
// return sel.Selector
// },
// },
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t))
require.NoError(t, err)
assert.Equal(t, len(collection), 1)
channel := collection[0].Items()
for object := range channel {
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(object.ToReader())
assert.NoError(t, err, "received a buf.Read error")
}
status := connector.AwaitStatus()
assert.NotZero(t, status.Successful)
t.Log(status.String())
})
}
}
// TestMailSerializationRegression verifies that all mail data stored in the
// test account can be successfully downloaded into bytes and restored into
// M365 mail objects
func (suite *GraphConnectorIntegrationSuite) TestMailSerializationRegression() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
connector := loadConnector(ctx, t)
sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
collection, err := connector.createCollections(ctx, sel.Scopes()[0])
require.NoError(t, err)
for _, edc := range collection {
suite.T().Run(edc.FullPath().String(), func(t *testing.T) {
streamChannel := edc.Items()
// Verify that each message can be restored
for stream := range streamChannel {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
message, err := support.CreateMessageFromBytes(buf.Bytes())
assert.NotNil(t, message)
assert.NoError(t, err)
}
})
}
status := connector.AwaitStatus()
suite.NotNil(status)
suite.Equal(status.ObjectCount, status.Successful)
}
// TestContactSerializationRegression verifies ability to query contact items
// and to store contact within Collection. Downloaded contacts are run through
// a regression test to ensure that downloaded items can be uploaded.
func (suite *GraphConnectorIntegrationSuite) TestContactSerializationRegression() {
ctx, flush := tester.NewContext()
defer flush()
connector := loadConnector(ctx, suite.T())
tests := []struct {
name string
getCollection func(t *testing.T) []*exchange.Collection
}{
{
name: "Default Contact Folder",
getCollection: func(t *testing.T) []*exchange.Collection {
scope := selectors.
NewExchangeBackup().
ContactFolders([]string{suite.user}, []string{exchange.DefaultContactFolder}, selectors.PrefixMatch())[0]
collections, err := connector.createCollections(ctx, scope)
require.NoError(t, err)
return collections
},
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
edcs := test.getCollection(t)
require.Equal(t, len(edcs), 1)
edc := edcs[0]
assert.Equal(t, edc.FullPath().Folder(), exchange.DefaultContactFolder)
streamChannel := edc.Items()
count := 0
for stream := range streamChannel {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
contact, err := support.CreateContactFromBytes(buf.Bytes())
assert.NotNil(t, contact)
assert.NoError(t, err, "error on converting contact bytes: "+string(buf.Bytes()))
count++
}
assert.NotZero(t, count)
status := connector.AwaitStatus()
suite.NotNil(status)
suite.Equal(status.ObjectCount, status.Successful)
})
}
}
// TestEventsSerializationRegression ensures functionality of createCollections
// to be able to successfully query, download and restore event objects
func (suite *GraphConnectorIntegrationSuite) TestEventsSerializationRegression() {
ctx, flush := tester.NewContext()
defer flush()
connector := loadConnector(ctx, suite.T())
tests := []struct {
name, expected string
getCollection func(t *testing.T) []*exchange.Collection
}{
{
name: "Default Event Calendar",
expected: exchange.DefaultCalendar,
getCollection: func(t *testing.T) []*exchange.Collection {
sel := selectors.NewExchangeBackup()
sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch()))
collections, err := connector.createCollections(ctx, sel.Scopes()[0])
require.NoError(t, err)
return collections
},
},
{
name: "Birthday Calendar",
expected: "Birthdays",
getCollection: func(t *testing.T) []*exchange.Collection {
sel := selectors.NewExchangeBackup()
sel.Include(sel.EventCalendars([]string{suite.user}, []string{"Birthdays"}, selectors.PrefixMatch()))
collections, err := connector.createCollections(ctx, sel.Scopes()[0])
require.NoError(t, err)
return collections
},
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
collections := test.getCollection(t)
require.Equal(t, len(collections), 1)
edc := collections[0]
assert.Equal(t, edc.FullPath().Folder(), test.expected)
streamChannel := edc.Items()
for stream := range streamChannel {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
event, err := support.CreateEventFromBytes(buf.Bytes())
assert.NotNil(t, event)
assert.NoError(t, err, "experienced error parsing event bytes: "+string(buf.Bytes()))
}
status := connector.AwaitStatus()
suite.NotNil(status)
suite.Equal(status.ObjectCount, status.Successful)
})
}
}
// TestAccessOfInboxAllUsers verifies that GraphConnector can
// support `--users *` for backup operations. Selector.DiscreteScopes
// returns all of the users within one scope. Only users who have
// messages in their inbox will have a collection returned.
// The final test insures that more than a 75% of the user collections are
// returned. If an error was experienced, the test will fail overall
func (suite *GraphConnectorIntegrationSuite) TestAccessOfInboxAllUsers() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
connector := loadConnector(ctx, t)
sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders(selectors.Any(), []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
scopes := sel.DiscreteScopes(connector.GetUsers())
for _, scope := range scopes {
users := scope.Get(selectors.ExchangeUser)
standard := (len(users) / 4) * 3
collections, err := connector.createCollections(ctx, scope)
require.NoError(t, err)
suite.Greater(len(collections), standard)
}
}
func (suite *GraphConnectorIntegrationSuite) TestMailFetch() {
ctx, flush := tester.NewContext()
defer flush()
@ -409,63 +117,9 @@ func (suite *GraphConnectorIntegrationSuite) TestMailFetch() {
}
}
///------------------------------------------------------------
//-------------------------------------------------------------
// Exchange Functions
//-------------------------------------------------------
func (suite *GraphConnectorIntegrationSuite) TestEmptyCollections() {
dest := tester.DefaultTestRestoreDestination()
table := []struct {
name string
col []data.Collection
sel selectors.Selector
}{
{
name: "ExchangeNil",
col: nil,
sel: selectors.Selector{
Service: selectors.ServiceExchange,
},
},
{
name: "ExchangeEmpty",
col: []data.Collection{},
sel: selectors.Selector{
Service: selectors.ServiceExchange,
},
},
{
name: "OneDriveNil",
col: nil,
sel: selectors.Selector{
Service: selectors.ServiceOneDrive,
},
},
{
name: "OneDriveEmpty",
col: []data.Collection{},
sel: selectors.Selector{
Service: selectors.ServiceOneDrive,
},
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
ctx, flush := tester.NewContext()
defer flush()
deets, err := suite.connector.RestoreDataCollections(ctx, test.sel, dest, test.col)
require.NoError(t, err)
assert.NotNil(t, deets)
stats := suite.connector.AwaitStatus()
assert.Zero(t, stats.ObjectCount)
assert.Zero(t, stats.FolderCount)
assert.Zero(t, stats.Successful)
})
}
}
//-------------------------------------------------------------
func runRestoreBackupTest(
t *testing.T,

View File

@ -9,21 +9,23 @@ import (
)
const (
CorsoLoadTests = "CORSO_LOAD_TESTS"
CorsoCITests = "CORSO_CI_TESTS"
CorsoCLIBackupTests = "CORSO_COMMAND_LINE_BACKUP_TESTS"
CorsoCLIConfigTests = "CORSO_COMMAND_LINE_CONFIG_TESTS"
CorsoCLIRepoTests = "CORSO_COMMAND_LINE_REPO_TESTS"
CorsoCLIRestoreTests = "CORSO_COMMAND_LINE_RESTORE_TESTS"
CorsoCLITests = "CORSO_COMMAND_LINE_TESTS"
CorsoGraphConnectorTests = "CORSO_GRAPH_CONNECTOR_TESTS"
CorsoGraphConnectorExchangeTests = "CORSO_GRAPH_CONNECTOR_EXCHANGE_TESTS"
CorsoGraphConnectorOneDriveTests = "CORSO_GRAPH_CONNECTOR_ONE_DRIVE_TESTS"
CorsoKopiaWrapperTests = "CORSO_KOPIA_WRAPPER_TESTS"
CorsoModelStoreTests = "CORSO_MODEL_STORE_TESTS"
CorsoOneDriveTests = "CORSO_ONE_DRIVE_TESTS"
CorsoOperationTests = "CORSO_OPERATION_TESTS"
CorsoRepositoryTests = "CORSO_REPOSITORY_TESTS"
CorsoLoadTests = "CORSO_LOAD_TESTS"
CorsoCITests = "CORSO_CI_TESTS"
CorsoCLIBackupTests = "CORSO_COMMAND_LINE_BACKUP_TESTS"
CorsoCLIConfigTests = "CORSO_COMMAND_LINE_CONFIG_TESTS"
CorsoCLIRepoTests = "CORSO_COMMAND_LINE_REPO_TESTS"
CorsoCLIRestoreTests = "CORSO_COMMAND_LINE_RESTORE_TESTS"
CorsoCLITests = "CORSO_COMMAND_LINE_TESTS"
CorsoConnectorCreateCollectionTests = "CORSO_CONNECTOR_CREATE_COLLECTION_TESTS"
CorsoConnectorDataCollectionTests = "CORSO_CONNECTOR_DATA_COLLECTION_TESTS"
CorsoGraphConnectorTests = "CORSO_GRAPH_CONNECTOR_TESTS"
CorsoGraphConnectorExchangeTests = "CORSO_GRAPH_CONNECTOR_EXCHANGE_TESTS"
CorsoGraphConnectorOneDriveTests = "CORSO_GRAPH_CONNECTOR_ONE_DRIVE_TESTS"
CorsoKopiaWrapperTests = "CORSO_KOPIA_WRAPPER_TESTS"
CorsoModelStoreTests = "CORSO_MODEL_STORE_TESTS"
CorsoOneDriveTests = "CORSO_ONE_DRIVE_TESTS"
CorsoOperationTests = "CORSO_OPERATION_TESTS"
CorsoRepositoryTests = "CORSO_REPOSITORY_TESTS"
)
// File needs to be a single message .json