connector data coll refactor
A quick code movement before adding the sharepoint datacollection production so that we minimize graph_conn file bloat.
This commit is contained in:
parent
765fd6222b
commit
67888abad0
231
src/internal/connector/data_collections.go
Normal file
231
src/internal/connector/data_collections.go
Normal file
@ -0,0 +1,231 @@
|
|||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-multierror"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/connector/exchange"
|
||||||
|
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||||
|
"github.com/alcionai/corso/src/internal/connector/onedrive"
|
||||||
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
|
D "github.com/alcionai/corso/src/internal/diagnostics"
|
||||||
|
"github.com/alcionai/corso/src/internal/observe"
|
||||||
|
"github.com/alcionai/corso/src/pkg/logger"
|
||||||
|
"github.com/alcionai/corso/src/pkg/selectors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Data Collections
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// DataCollections utility function to launch backup operations for exchange and onedrive
|
||||||
|
func (gc *GraphConnector) DataCollections(ctx context.Context, sels selectors.Selector) ([]data.Collection, error) {
|
||||||
|
ctx, end := D.Span(ctx, "gc:dataCollections", D.Index("service", sels.Service.String()))
|
||||||
|
defer end()
|
||||||
|
|
||||||
|
err := verifyBackupInputs(sels, gc.Users)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch sels.Service {
|
||||||
|
case selectors.ServiceExchange:
|
||||||
|
return gc.ExchangeDataCollection(ctx, sels)
|
||||||
|
case selectors.ServiceOneDrive:
|
||||||
|
return gc.OneDriveDataCollections(ctx, sels)
|
||||||
|
default:
|
||||||
|
return nil, errors.Errorf("service %s not supported", sels)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyBackupInputs(sel selectors.Selector, mapOfUsers map[string]string) error {
|
||||||
|
var personnel []string
|
||||||
|
|
||||||
|
// retrieve users from selectors
|
||||||
|
switch sel.Service {
|
||||||
|
case selectors.ServiceExchange:
|
||||||
|
backup, err := sel.ToExchangeBackup()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, scope := range backup.Scopes() {
|
||||||
|
temp := scope.Get(selectors.ExchangeUser)
|
||||||
|
personnel = append(personnel, temp...)
|
||||||
|
}
|
||||||
|
case selectors.ServiceOneDrive:
|
||||||
|
backup, err := sel.ToOneDriveBackup()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, user := range backup.Scopes() {
|
||||||
|
temp := user.Get(selectors.OneDriveUser)
|
||||||
|
personnel = append(personnel, temp...)
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
return errors.New("service %s not supported")
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify personnel
|
||||||
|
normUsers := map[string]struct{}{}
|
||||||
|
|
||||||
|
for k := range mapOfUsers {
|
||||||
|
normUsers[strings.ToLower(k)] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, user := range personnel {
|
||||||
|
if _, ok := normUsers[strings.ToLower(user)]; !ok {
|
||||||
|
return fmt.Errorf("%s user not found within tenant", user)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// createCollections - utility function that retrieves M365
|
||||||
|
// IDs through Microsoft Graph API. The selectors.ExchangeScope
|
||||||
|
// determines the type of collections that are stored.
|
||||||
|
// to the GraphConnector struct.
|
||||||
|
func (gc *GraphConnector) createCollections(
|
||||||
|
ctx context.Context,
|
||||||
|
scope selectors.ExchangeScope,
|
||||||
|
) ([]*exchange.Collection, error) {
|
||||||
|
var errs *multierror.Error
|
||||||
|
|
||||||
|
users := scope.Get(selectors.ExchangeUser)
|
||||||
|
allCollections := make([]*exchange.Collection, 0)
|
||||||
|
// Create collection of ExchangeDataCollection
|
||||||
|
for _, user := range users {
|
||||||
|
collections := make(map[string]*exchange.Collection)
|
||||||
|
|
||||||
|
qp := graph.QueryParams{
|
||||||
|
User: user,
|
||||||
|
Scope: scope,
|
||||||
|
FailFast: gc.failFast,
|
||||||
|
Credentials: gc.credentials,
|
||||||
|
}
|
||||||
|
|
||||||
|
itemCategory := qp.Scope.Category().PathType()
|
||||||
|
|
||||||
|
foldersComplete, closer := observe.MessageWithCompletion(fmt.Sprintf("∙ %s - %s:", itemCategory.String(), user))
|
||||||
|
defer closer()
|
||||||
|
defer close(foldersComplete)
|
||||||
|
|
||||||
|
resolver, err := exchange.PopulateExchangeContainerResolver(
|
||||||
|
ctx,
|
||||||
|
qp,
|
||||||
|
qp.Scope.Category().PathType(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "getting folder cache")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = exchange.FilterContainersAndFillCollections(
|
||||||
|
ctx,
|
||||||
|
qp,
|
||||||
|
collections,
|
||||||
|
gc.UpdateStatus,
|
||||||
|
resolver)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "filling collections")
|
||||||
|
}
|
||||||
|
|
||||||
|
foldersComplete <- struct{}{}
|
||||||
|
|
||||||
|
for _, collection := range collections {
|
||||||
|
gc.incrementAwaitingMessages()
|
||||||
|
|
||||||
|
allCollections = append(allCollections, collection)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return allCollections, errs.ErrorOrNil()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExchangeDataCollections returns a DataCollection which the caller can
|
||||||
|
// use to read mailbox data out for the specified user
|
||||||
|
// Assumption: User exists
|
||||||
|
//
|
||||||
|
// Add iota to this call -> mail, contacts, calendar, etc.
|
||||||
|
func (gc *GraphConnector) ExchangeDataCollection(
|
||||||
|
ctx context.Context,
|
||||||
|
selector selectors.Selector,
|
||||||
|
) ([]data.Collection, error) {
|
||||||
|
eb, err := selector.ToExchangeBackup()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "exchangeDataCollection: parsing selector")
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
scopes = eb.DiscreteScopes(gc.GetUsers())
|
||||||
|
collections = []data.Collection{}
|
||||||
|
errs error
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, scope := range scopes {
|
||||||
|
// Creates a map of collections based on scope
|
||||||
|
dcs, err := gc.createCollections(ctx, scope)
|
||||||
|
if err != nil {
|
||||||
|
user := scope.Get(selectors.ExchangeUser)
|
||||||
|
return nil, support.WrapAndAppend(user[0], err, errs)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, collection := range dcs {
|
||||||
|
collections = append(collections, collection)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return collections, errs
|
||||||
|
}
|
||||||
|
|
||||||
|
// OneDriveDataCollections returns a set of DataCollection which represents the OneDrive data
|
||||||
|
// for the specified user
|
||||||
|
func (gc *GraphConnector) OneDriveDataCollections(
|
||||||
|
ctx context.Context,
|
||||||
|
selector selectors.Selector,
|
||||||
|
) ([]data.Collection, error) {
|
||||||
|
odb, err := selector.ToOneDriveBackup()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "oneDriveDataCollection: parsing selector")
|
||||||
|
}
|
||||||
|
|
||||||
|
collections := []data.Collection{}
|
||||||
|
|
||||||
|
scopes := odb.DiscreteScopes(gc.GetUsers())
|
||||||
|
|
||||||
|
var errs error
|
||||||
|
|
||||||
|
// for each scope that includes oneDrive items, get all
|
||||||
|
for _, scope := range scopes {
|
||||||
|
for _, user := range scope.Get(selectors.OneDriveUser) {
|
||||||
|
logger.Ctx(ctx).With("user", user).Debug("Creating OneDrive collections")
|
||||||
|
|
||||||
|
odcs, err := onedrive.NewCollections(
|
||||||
|
gc.credentials.AzureTenantID,
|
||||||
|
user,
|
||||||
|
scope,
|
||||||
|
&gc.graphService,
|
||||||
|
gc.UpdateStatus,
|
||||||
|
).Get(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, support.WrapAndAppend(user, err, errs)
|
||||||
|
}
|
||||||
|
|
||||||
|
collections = append(collections, odcs...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for range collections {
|
||||||
|
gc.incrementAwaitingMessages()
|
||||||
|
}
|
||||||
|
|
||||||
|
return collections, errs
|
||||||
|
}
|
||||||
414
src/internal/connector/data_collections_test.go
Normal file
414
src/internal/connector/data_collections_test.go
Normal file
@ -0,0 +1,414 @@
|
|||||||
|
package connector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/connector/exchange"
|
||||||
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
|
"github.com/alcionai/corso/src/pkg/selectors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// DataCollection tests
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type ConnectorDataCollectionIntegrationSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
connector *GraphConnector
|
||||||
|
user string
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConnectorDataCollectionIntegrationSuite(t *testing.T) {
|
||||||
|
if err := tester.RunOnAny(
|
||||||
|
tester.CorsoCITests,
|
||||||
|
tester.CorsoConnectorDataCollectionTests,
|
||||||
|
); err != nil {
|
||||||
|
t.Skip(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
suite.Run(t, new(ConnectorDataCollectionIntegrationSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *ConnectorDataCollectionIntegrationSuite) SetupSuite() {
|
||||||
|
ctx, flush := tester.NewContext()
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
_, err := tester.GetRequiredEnvVars(tester.M365AcctCredEnvs...)
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
suite.connector = loadConnector(ctx, suite.T())
|
||||||
|
suite.user = tester.M365UserID(suite.T())
|
||||||
|
tester.LogTimeOfTest(suite.T())
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestExchangeDataCollection verifies interface between operation and
|
||||||
|
// GraphConnector remains stable to receive a non-zero amount of Collections
|
||||||
|
// for the Exchange Package. Enabled exchange applications:
|
||||||
|
// - mail
|
||||||
|
// - contacts
|
||||||
|
// - events
|
||||||
|
func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection() {
|
||||||
|
ctx, flush := tester.NewContext()
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
connector := loadConnector(ctx, suite.T())
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
getSelector func(t *testing.T) selectors.Selector
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: suite.user + " Email",
|
||||||
|
getSelector: func(t *testing.T) selectors.Selector {
|
||||||
|
sel := selectors.NewExchangeBackup()
|
||||||
|
sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
|
||||||
|
|
||||||
|
return sel.Selector
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: suite.user + " Contacts",
|
||||||
|
getSelector: func(t *testing.T) selectors.Selector {
|
||||||
|
sel := selectors.NewExchangeBackup()
|
||||||
|
sel.Include(sel.ContactFolders(
|
||||||
|
[]string{suite.user},
|
||||||
|
[]string{exchange.DefaultContactFolder},
|
||||||
|
selectors.PrefixMatch()))
|
||||||
|
|
||||||
|
return sel.Selector
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: suite.user + " Events",
|
||||||
|
getSelector: func(t *testing.T) selectors.Selector {
|
||||||
|
sel := selectors.NewExchangeBackup()
|
||||||
|
sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch()))
|
||||||
|
|
||||||
|
return sel.Selector
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
|
collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t))
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, len(collection), 1)
|
||||||
|
channel := collection[0].Items()
|
||||||
|
for object := range channel {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
_, err := buf.ReadFrom(object.ToReader())
|
||||||
|
assert.NoError(t, err, "received a buf.Read error")
|
||||||
|
}
|
||||||
|
status := connector.AwaitStatus()
|
||||||
|
assert.NotZero(t, status.Successful)
|
||||||
|
t.Log(status.String())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestInvalidUserForDataCollections ensures verification process for users
|
||||||
|
func (suite *ConnectorDataCollectionIntegrationSuite) TestInvalidUserForDataCollections() {
|
||||||
|
ctx, flush := tester.NewContext()
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
invalidUser := "foo@example.com"
|
||||||
|
connector := loadConnector(ctx, suite.T())
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
getSelector func(t *testing.T) selectors.Selector
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "invalid exchange backup user",
|
||||||
|
getSelector: func(t *testing.T) selectors.Selector {
|
||||||
|
sel := selectors.NewExchangeBackup()
|
||||||
|
sel.Include(sel.MailFolders([]string{invalidUser}, selectors.Any()))
|
||||||
|
return sel.Selector
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Invalid onedrive backup user",
|
||||||
|
getSelector: func(t *testing.T) selectors.Selector {
|
||||||
|
sel := selectors.NewOneDriveBackup()
|
||||||
|
sel.Include(sel.Folders([]string{invalidUser}, selectors.Any()))
|
||||||
|
return sel.Selector
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
|
collections, err := connector.DataCollections(ctx, test.getSelector(t))
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.Empty(t, collections)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *GraphConnectorIntegrationSuite) TestEmptyCollections() {
|
||||||
|
dest := tester.DefaultTestRestoreDestination()
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
col []data.Collection
|
||||||
|
sel selectors.Selector
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "ExchangeNil",
|
||||||
|
col: nil,
|
||||||
|
sel: selectors.Selector{
|
||||||
|
Service: selectors.ServiceExchange,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "ExchangeEmpty",
|
||||||
|
col: []data.Collection{},
|
||||||
|
sel: selectors.Selector{
|
||||||
|
Service: selectors.ServiceExchange,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "OneDriveNil",
|
||||||
|
col: nil,
|
||||||
|
sel: selectors.Selector{
|
||||||
|
Service: selectors.ServiceOneDrive,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "OneDriveEmpty",
|
||||||
|
col: []data.Collection{},
|
||||||
|
sel: selectors.Selector{
|
||||||
|
Service: selectors.ServiceOneDrive,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range table {
|
||||||
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
|
ctx, flush := tester.NewContext()
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
deets, err := suite.connector.RestoreDataCollections(ctx, test.sel, dest, test.col)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.NotNil(t, deets)
|
||||||
|
|
||||||
|
stats := suite.connector.AwaitStatus()
|
||||||
|
assert.Zero(t, stats.ObjectCount)
|
||||||
|
assert.Zero(t, stats.FolderCount)
|
||||||
|
assert.Zero(t, stats.Successful)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// CreateCollection tests
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type ConnectorCreateCollectionIntegrationSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
connector *GraphConnector
|
||||||
|
user string
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConnectorCreateCollectionIntegrationSuite(t *testing.T) {
|
||||||
|
if err := tester.RunOnAny(
|
||||||
|
tester.CorsoCITests,
|
||||||
|
tester.CorsoConnectorCreateCollectionTests,
|
||||||
|
); err != nil {
|
||||||
|
t.Skip(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
suite.Run(t, new(ConnectorCreateCollectionIntegrationSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *ConnectorCreateCollectionIntegrationSuite) SetupSuite() {
|
||||||
|
ctx, flush := tester.NewContext()
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
_, err := tester.GetRequiredEnvVars(tester.M365AcctCredEnvs...)
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
suite.connector = loadConnector(ctx, suite.T())
|
||||||
|
suite.user = tester.M365UserID(suite.T())
|
||||||
|
tester.LogTimeOfTest(suite.T())
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMailSerializationRegression verifies that all mail data stored in the
|
||||||
|
// test account can be successfully downloaded into bytes and restored into
|
||||||
|
// M365 mail objects
|
||||||
|
func (suite *ConnectorCreateCollectionIntegrationSuite) TestMailSerializationRegression() {
|
||||||
|
ctx, flush := tester.NewContext()
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
t := suite.T()
|
||||||
|
connector := loadConnector(ctx, t)
|
||||||
|
sel := selectors.NewExchangeBackup()
|
||||||
|
sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
|
||||||
|
collection, err := connector.createCollections(ctx, sel.Scopes()[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for _, edc := range collection {
|
||||||
|
suite.T().Run(edc.FullPath().String(), func(t *testing.T) {
|
||||||
|
streamChannel := edc.Items()
|
||||||
|
// Verify that each message can be restored
|
||||||
|
for stream := range streamChannel {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
read, err := buf.ReadFrom(stream.ToReader())
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotZero(t, read)
|
||||||
|
message, err := support.CreateMessageFromBytes(buf.Bytes())
|
||||||
|
assert.NotNil(t, message)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
status := connector.AwaitStatus()
|
||||||
|
suite.NotNil(status)
|
||||||
|
suite.Equal(status.ObjectCount, status.Successful)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestContactSerializationRegression verifies ability to query contact items
|
||||||
|
// and to store contact within Collection. Downloaded contacts are run through
|
||||||
|
// a regression test to ensure that downloaded items can be uploaded.
|
||||||
|
func (suite *ConnectorCreateCollectionIntegrationSuite) TestContactSerializationRegression() {
|
||||||
|
ctx, flush := tester.NewContext()
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
connector := loadConnector(ctx, suite.T())
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
getCollection func(t *testing.T) []*exchange.Collection
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Default Contact Folder",
|
||||||
|
getCollection: func(t *testing.T) []*exchange.Collection {
|
||||||
|
scope := selectors.
|
||||||
|
NewExchangeBackup().
|
||||||
|
ContactFolders([]string{suite.user}, []string{exchange.DefaultContactFolder}, selectors.PrefixMatch())[0]
|
||||||
|
collections, err := connector.createCollections(ctx, scope)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return collections
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
|
edcs := test.getCollection(t)
|
||||||
|
require.Equal(t, len(edcs), 1)
|
||||||
|
edc := edcs[0]
|
||||||
|
assert.Equal(t, edc.FullPath().Folder(), exchange.DefaultContactFolder)
|
||||||
|
streamChannel := edc.Items()
|
||||||
|
count := 0
|
||||||
|
for stream := range streamChannel {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
read, err := buf.ReadFrom(stream.ToReader())
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotZero(t, read)
|
||||||
|
contact, err := support.CreateContactFromBytes(buf.Bytes())
|
||||||
|
assert.NotNil(t, contact)
|
||||||
|
assert.NoError(t, err, "error on converting contact bytes: "+string(buf.Bytes()))
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
assert.NotZero(t, count)
|
||||||
|
|
||||||
|
status := connector.AwaitStatus()
|
||||||
|
suite.NotNil(status)
|
||||||
|
suite.Equal(status.ObjectCount, status.Successful)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEventsSerializationRegression ensures functionality of createCollections
|
||||||
|
// to be able to successfully query, download and restore event objects
|
||||||
|
func (suite *ConnectorCreateCollectionIntegrationSuite) TestEventsSerializationRegression() {
|
||||||
|
ctx, flush := tester.NewContext()
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
connector := loadConnector(ctx, suite.T())
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name, expected string
|
||||||
|
getCollection func(t *testing.T) []*exchange.Collection
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Default Event Calendar",
|
||||||
|
expected: exchange.DefaultCalendar,
|
||||||
|
getCollection: func(t *testing.T) []*exchange.Collection {
|
||||||
|
sel := selectors.NewExchangeBackup()
|
||||||
|
sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch()))
|
||||||
|
collections, err := connector.createCollections(ctx, sel.Scopes()[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return collections
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Birthday Calendar",
|
||||||
|
expected: "Birthdays",
|
||||||
|
getCollection: func(t *testing.T) []*exchange.Collection {
|
||||||
|
sel := selectors.NewExchangeBackup()
|
||||||
|
sel.Include(sel.EventCalendars([]string{suite.user}, []string{"Birthdays"}, selectors.PrefixMatch()))
|
||||||
|
collections, err := connector.createCollections(ctx, sel.Scopes()[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return collections
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
|
collections := test.getCollection(t)
|
||||||
|
require.Equal(t, len(collections), 1)
|
||||||
|
edc := collections[0]
|
||||||
|
assert.Equal(t, edc.FullPath().Folder(), test.expected)
|
||||||
|
streamChannel := edc.Items()
|
||||||
|
|
||||||
|
for stream := range streamChannel {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
read, err := buf.ReadFrom(stream.ToReader())
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotZero(t, read)
|
||||||
|
event, err := support.CreateEventFromBytes(buf.Bytes())
|
||||||
|
assert.NotNil(t, event)
|
||||||
|
assert.NoError(t, err, "experienced error parsing event bytes: "+string(buf.Bytes()))
|
||||||
|
}
|
||||||
|
|
||||||
|
status := connector.AwaitStatus()
|
||||||
|
suite.NotNil(status)
|
||||||
|
suite.Equal(status.ObjectCount, status.Successful)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestAccessOfInboxAllUsers verifies that GraphConnector can
|
||||||
|
// support `--users *` for backup operations. Selector.DiscreteScopes
|
||||||
|
// returns all of the users within one scope. Only users who have
|
||||||
|
// messages in their inbox will have a collection returned.
|
||||||
|
// The final test insures that more than a 75% of the user collections are
|
||||||
|
// returned. If an error was experienced, the test will fail overall
|
||||||
|
func (suite *ConnectorCreateCollectionIntegrationSuite) TestAccessOfInboxAllUsers() {
|
||||||
|
ctx, flush := tester.NewContext()
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
t := suite.T()
|
||||||
|
connector := loadConnector(ctx, t)
|
||||||
|
sel := selectors.NewExchangeBackup()
|
||||||
|
sel.Include(sel.MailFolders(selectors.Any(), []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
|
||||||
|
scopes := sel.DiscreteScopes(connector.GetUsers())
|
||||||
|
|
||||||
|
for _, scope := range scopes {
|
||||||
|
users := scope.Get(selectors.ExchangeUser)
|
||||||
|
standard := (len(users) / 4) * 3
|
||||||
|
collections, err := connector.createCollections(ctx, scope)
|
||||||
|
require.NoError(t, err)
|
||||||
|
suite.Greater(len(collections), standard)
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -6,10 +6,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime/trace"
|
"runtime/trace"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/hashicorp/go-multierror"
|
|
||||||
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
|
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
|
||||||
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
|
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
|
||||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||||
@ -21,14 +19,16 @@ import (
|
|||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
D "github.com/alcionai/corso/src/internal/diagnostics"
|
D "github.com/alcionai/corso/src/internal/diagnostics"
|
||||||
"github.com/alcionai/corso/src/internal/observe"
|
|
||||||
"github.com/alcionai/corso/src/pkg/account"
|
"github.com/alcionai/corso/src/pkg/account"
|
||||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
"github.com/alcionai/corso/src/pkg/logger"
|
|
||||||
"github.com/alcionai/corso/src/pkg/selectors"
|
"github.com/alcionai/corso/src/pkg/selectors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Graph Connector
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
// GraphConnector is a struct used to wrap the GraphServiceClient and
|
// GraphConnector is a struct used to wrap the GraphServiceClient and
|
||||||
// GraphRequestAdapter from the msgraph-sdk-go. Additional fields are for
|
// GraphRequestAdapter from the msgraph-sdk-go. Additional fields are for
|
||||||
// bookkeeping and interfacing with other component.
|
// bookkeeping and interfacing with other component.
|
||||||
@ -209,42 +209,6 @@ func buildFromMap(isKey bool, mapping map[string]string) []string {
|
|||||||
return returnString
|
return returnString
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExchangeDataStream returns a DataCollection which the caller can
|
|
||||||
// use to read mailbox data out for the specified user
|
|
||||||
// Assumption: User exists
|
|
||||||
//
|
|
||||||
// Add iota to this call -> mail, contacts, calendar, etc.
|
|
||||||
func (gc *GraphConnector) ExchangeDataCollection(
|
|
||||||
ctx context.Context,
|
|
||||||
selector selectors.Selector,
|
|
||||||
) ([]data.Collection, error) {
|
|
||||||
eb, err := selector.ToExchangeBackup()
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "exchangeDataCollection: parsing selector")
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
scopes = eb.DiscreteScopes(gc.GetUsers())
|
|
||||||
collections = []data.Collection{}
|
|
||||||
errs error
|
|
||||||
)
|
|
||||||
|
|
||||||
for _, scope := range scopes {
|
|
||||||
// Creates a map of collections based on scope
|
|
||||||
dcs, err := gc.createCollections(ctx, scope)
|
|
||||||
if err != nil {
|
|
||||||
user := scope.Get(selectors.ExchangeUser)
|
|
||||||
return nil, support.WrapAndAppend(user[0], err, errs)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, collection := range dcs {
|
|
||||||
collections = append(collections, collection)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return collections, errs
|
|
||||||
}
|
|
||||||
|
|
||||||
// RestoreDataCollections restores data from the specified collections
|
// RestoreDataCollections restores data from the specified collections
|
||||||
// into M365 using the GraphAPI.
|
// into M365 using the GraphAPI.
|
||||||
// SideEffect: gc.status is updated at the completion of operation
|
// SideEffect: gc.status is updated at the completion of operation
|
||||||
@ -278,67 +242,6 @@ func (gc *GraphConnector) RestoreDataCollections(
|
|||||||
return deets, err
|
return deets, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// createCollections - utility function that retrieves M365
|
|
||||||
// IDs through Microsoft Graph API. The selectors.ExchangeScope
|
|
||||||
// determines the type of collections that are stored.
|
|
||||||
// to the GraphConnector struct.
|
|
||||||
func (gc *GraphConnector) createCollections(
|
|
||||||
ctx context.Context,
|
|
||||||
scope selectors.ExchangeScope,
|
|
||||||
) ([]*exchange.Collection, error) {
|
|
||||||
var errs *multierror.Error
|
|
||||||
|
|
||||||
users := scope.Get(selectors.ExchangeUser)
|
|
||||||
allCollections := make([]*exchange.Collection, 0)
|
|
||||||
// Create collection of ExchangeDataCollection
|
|
||||||
for _, user := range users {
|
|
||||||
collections := make(map[string]*exchange.Collection)
|
|
||||||
|
|
||||||
qp := graph.QueryParams{
|
|
||||||
User: user,
|
|
||||||
Scope: scope,
|
|
||||||
FailFast: gc.failFast,
|
|
||||||
Credentials: gc.credentials,
|
|
||||||
}
|
|
||||||
|
|
||||||
itemCategory := qp.Scope.Category().PathType()
|
|
||||||
|
|
||||||
foldersComplete, closer := observe.MessageWithCompletion(fmt.Sprintf("∙ %s - %s:", itemCategory.String(), user))
|
|
||||||
defer closer()
|
|
||||||
defer close(foldersComplete)
|
|
||||||
|
|
||||||
resolver, err := exchange.PopulateExchangeContainerResolver(
|
|
||||||
ctx,
|
|
||||||
qp,
|
|
||||||
qp.Scope.Category().PathType(),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "getting folder cache")
|
|
||||||
}
|
|
||||||
|
|
||||||
err = exchange.FilterContainersAndFillCollections(
|
|
||||||
ctx,
|
|
||||||
qp,
|
|
||||||
collections,
|
|
||||||
gc.UpdateStatus,
|
|
||||||
resolver)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "filling collections")
|
|
||||||
}
|
|
||||||
|
|
||||||
foldersComplete <- struct{}{}
|
|
||||||
|
|
||||||
for _, collection := range collections {
|
|
||||||
gc.incrementAwaitingMessages()
|
|
||||||
|
|
||||||
allCollections = append(allCollections, collection)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return allCollections, errs.ErrorOrNil()
|
|
||||||
}
|
|
||||||
|
|
||||||
// AwaitStatus waits for all gc tasks to complete and then returns status
|
// AwaitStatus waits for all gc tasks to complete and then returns status
|
||||||
func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus {
|
func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus {
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -378,6 +281,10 @@ func (gc *GraphConnector) incrementAwaitingMessages() {
|
|||||||
gc.wg.Add(1)
|
gc.wg.Add(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Helper Funcs
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
// IsRecoverableError returns true iff error is a RecoverableGCEerror
|
// IsRecoverableError returns true iff error is a RecoverableGCEerror
|
||||||
func IsRecoverableError(e error) bool {
|
func IsRecoverableError(e error) bool {
|
||||||
var recoverable support.RecoverableGCError
|
var recoverable support.RecoverableGCError
|
||||||
@ -389,113 +296,3 @@ func IsNonRecoverableError(e error) bool {
|
|||||||
var nonRecoverable support.NonRecoverableGCError
|
var nonRecoverable support.NonRecoverableGCError
|
||||||
return errors.As(e, &nonRecoverable)
|
return errors.As(e, &nonRecoverable)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DataCollections utility function to launch backup operations for exchange and onedrive
|
|
||||||
func (gc *GraphConnector) DataCollections(ctx context.Context, sels selectors.Selector) ([]data.Collection, error) {
|
|
||||||
ctx, end := D.Span(ctx, "gc:dataCollections", D.Index("service", sels.Service.String()))
|
|
||||||
defer end()
|
|
||||||
|
|
||||||
err := verifyBackupInputs(sels, gc.Users)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch sels.Service {
|
|
||||||
case selectors.ServiceExchange:
|
|
||||||
return gc.ExchangeDataCollection(ctx, sels)
|
|
||||||
case selectors.ServiceOneDrive:
|
|
||||||
return gc.OneDriveDataCollections(ctx, sels)
|
|
||||||
default:
|
|
||||||
return nil, errors.Errorf("service %s not supported", sels)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OneDriveDataCollections returns a set of DataCollection which represents the OneDrive data
|
|
||||||
// for the specified user
|
|
||||||
func (gc *GraphConnector) OneDriveDataCollections(
|
|
||||||
ctx context.Context,
|
|
||||||
selector selectors.Selector,
|
|
||||||
) ([]data.Collection, error) {
|
|
||||||
odb, err := selector.ToOneDriveBackup()
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "oneDriveDataCollection: parsing selector")
|
|
||||||
}
|
|
||||||
|
|
||||||
collections := []data.Collection{}
|
|
||||||
|
|
||||||
scopes := odb.DiscreteScopes(gc.GetUsers())
|
|
||||||
|
|
||||||
var errs error
|
|
||||||
|
|
||||||
// for each scope that includes oneDrive items, get all
|
|
||||||
for _, scope := range scopes {
|
|
||||||
for _, user := range scope.Get(selectors.OneDriveUser) {
|
|
||||||
logger.Ctx(ctx).With("user", user).Debug("Creating OneDrive collections")
|
|
||||||
|
|
||||||
odcs, err := onedrive.NewCollections(
|
|
||||||
gc.credentials.AzureTenantID,
|
|
||||||
user,
|
|
||||||
scope,
|
|
||||||
&gc.graphService,
|
|
||||||
gc.UpdateStatus,
|
|
||||||
).Get(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, support.WrapAndAppend(user, err, errs)
|
|
||||||
}
|
|
||||||
|
|
||||||
collections = append(collections, odcs...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for range collections {
|
|
||||||
gc.incrementAwaitingMessages()
|
|
||||||
}
|
|
||||||
|
|
||||||
return collections, errs
|
|
||||||
}
|
|
||||||
|
|
||||||
func verifyBackupInputs(sel selectors.Selector, mapOfUsers map[string]string) error {
|
|
||||||
var personnel []string
|
|
||||||
|
|
||||||
// retrieve users from selectors
|
|
||||||
switch sel.Service {
|
|
||||||
case selectors.ServiceExchange:
|
|
||||||
backup, err := sel.ToExchangeBackup()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, scope := range backup.Scopes() {
|
|
||||||
temp := scope.Get(selectors.ExchangeUser)
|
|
||||||
personnel = append(personnel, temp...)
|
|
||||||
}
|
|
||||||
case selectors.ServiceOneDrive:
|
|
||||||
backup, err := sel.ToOneDriveBackup()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, user := range backup.Scopes() {
|
|
||||||
temp := user.Get(selectors.OneDriveUser)
|
|
||||||
personnel = append(personnel, temp...)
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
return errors.New("service %s not supported")
|
|
||||||
}
|
|
||||||
|
|
||||||
// verify personnel
|
|
||||||
normUsers := map[string]struct{}{}
|
|
||||||
|
|
||||||
for k := range mapOfUsers {
|
|
||||||
normUsers[strings.ToLower(k)] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, user := range personnel {
|
|
||||||
if _, ok := normUsers[strings.ToLower(user)]; !ok {
|
|
||||||
return fmt.Errorf("%s user not found within tenant", user)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
@ -13,6 +14,7 @@ import (
|
|||||||
"github.com/alcionai/corso/src/internal/connector/mockconnector"
|
"github.com/alcionai/corso/src/internal/connector/mockconnector"
|
||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
"github.com/alcionai/corso/src/pkg/path"
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
"github.com/alcionai/corso/src/pkg/selectors"
|
"github.com/alcionai/corso/src/pkg/selectors"
|
||||||
@ -878,3 +880,11 @@ func getSelectorWith(service path.ServiceType) selectors.Selector {
|
|||||||
Service: s,
|
Service: s,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func loadConnector(ctx context.Context, t *testing.T) *GraphConnector {
|
||||||
|
a := tester.NewM365Account(t)
|
||||||
|
connector, err := NewGraphConnector(ctx, a)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return connector
|
||||||
|
}
|
||||||
|
|||||||
@ -1,8 +1,6 @@
|
|||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -12,7 +10,6 @@ import (
|
|||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/connector/exchange"
|
"github.com/alcionai/corso/src/internal/connector/exchange"
|
||||||
"github.com/alcionai/corso/src/internal/connector/mockconnector"
|
"github.com/alcionai/corso/src/internal/connector/mockconnector"
|
||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
"github.com/alcionai/corso/src/internal/tester"
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
@ -26,14 +23,6 @@ type GraphConnectorIntegrationSuite struct {
|
|||||||
user string
|
user string
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadConnector(ctx context.Context, t *testing.T) *GraphConnector {
|
|
||||||
a := tester.NewM365Account(t)
|
|
||||||
connector, err := NewGraphConnector(ctx, a)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
return connector
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGraphConnectorIntegrationSuite(t *testing.T) {
|
func TestGraphConnectorIntegrationSuite(t *testing.T) {
|
||||||
if err := tester.RunOnAny(
|
if err := tester.RunOnAny(
|
||||||
tester.CorsoCITests,
|
tester.CorsoCITests,
|
||||||
@ -80,286 +69,6 @@ func (suite *GraphConnectorIntegrationSuite) TestSetTenantUsers() {
|
|||||||
suite.Greater(len(newConnector.Users), 0)
|
suite.Greater(len(newConnector.Users), 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestInvalidUserForDataCollections ensures verification process for users
|
|
||||||
func (suite *GraphConnectorIntegrationSuite) TestInvalidUserForDataCollections() {
|
|
||||||
ctx, flush := tester.NewContext()
|
|
||||||
defer flush()
|
|
||||||
|
|
||||||
invalidUser := "foo@example.com"
|
|
||||||
connector := loadConnector(ctx, suite.T())
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
getSelector func(t *testing.T) selectors.Selector
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "invalid exchange backup user",
|
|
||||||
getSelector: func(t *testing.T) selectors.Selector {
|
|
||||||
sel := selectors.NewExchangeBackup()
|
|
||||||
sel.Include(sel.MailFolders([]string{invalidUser}, selectors.Any()))
|
|
||||||
return sel.Selector
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Invalid onedrive backup user",
|
|
||||||
getSelector: func(t *testing.T) selectors.Selector {
|
|
||||||
sel := selectors.NewOneDriveBackup()
|
|
||||||
sel.Include(sel.Folders([]string{invalidUser}, selectors.Any()))
|
|
||||||
return sel.Selector
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
suite.T().Run(test.name, func(t *testing.T) {
|
|
||||||
collections, err := connector.DataCollections(ctx, test.getSelector(t))
|
|
||||||
assert.Error(t, err)
|
|
||||||
assert.Empty(t, collections)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestExchangeDataCollection verifies interface between operation and
|
|
||||||
// GraphConnector remains stable to receive a non-zero amount of Collections
|
|
||||||
// for the Exchange Package. Enabled exchange applications:
|
|
||||||
// - mail
|
|
||||||
// - contacts
|
|
||||||
// - events
|
|
||||||
func (suite *GraphConnectorIntegrationSuite) TestExchangeDataCollection() {
|
|
||||||
ctx, flush := tester.NewContext()
|
|
||||||
defer flush()
|
|
||||||
|
|
||||||
connector := loadConnector(ctx, suite.T())
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
getSelector func(t *testing.T) selectors.Selector
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: suite.user + " Email",
|
|
||||||
getSelector: func(t *testing.T) selectors.Selector {
|
|
||||||
sel := selectors.NewExchangeBackup()
|
|
||||||
sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
|
|
||||||
|
|
||||||
return sel.Selector
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: suite.user + " Contacts",
|
|
||||||
getSelector: func(t *testing.T) selectors.Selector {
|
|
||||||
sel := selectors.NewExchangeBackup()
|
|
||||||
sel.Include(sel.ContactFolders(
|
|
||||||
[]string{suite.user},
|
|
||||||
[]string{exchange.DefaultContactFolder},
|
|
||||||
selectors.PrefixMatch()))
|
|
||||||
|
|
||||||
return sel.Selector
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: suite.user + " Events",
|
|
||||||
getSelector: func(t *testing.T) selectors.Selector {
|
|
||||||
sel := selectors.NewExchangeBackup()
|
|
||||||
sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch()))
|
|
||||||
|
|
||||||
return sel.Selector
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
suite.T().Run(test.name, func(t *testing.T) {
|
|
||||||
collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t))
|
|
||||||
require.NoError(t, err)
|
|
||||||
assert.Equal(t, len(collection), 1)
|
|
||||||
channel := collection[0].Items()
|
|
||||||
for object := range channel {
|
|
||||||
buf := &bytes.Buffer{}
|
|
||||||
_, err := buf.ReadFrom(object.ToReader())
|
|
||||||
assert.NoError(t, err, "received a buf.Read error")
|
|
||||||
}
|
|
||||||
status := connector.AwaitStatus()
|
|
||||||
assert.NotZero(t, status.Successful)
|
|
||||||
t.Log(status.String())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestMailSerializationRegression verifies that all mail data stored in the
|
|
||||||
// test account can be successfully downloaded into bytes and restored into
|
|
||||||
// M365 mail objects
|
|
||||||
func (suite *GraphConnectorIntegrationSuite) TestMailSerializationRegression() {
|
|
||||||
ctx, flush := tester.NewContext()
|
|
||||||
defer flush()
|
|
||||||
|
|
||||||
t := suite.T()
|
|
||||||
connector := loadConnector(ctx, t)
|
|
||||||
sel := selectors.NewExchangeBackup()
|
|
||||||
sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
|
|
||||||
collection, err := connector.createCollections(ctx, sel.Scopes()[0])
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
for _, edc := range collection {
|
|
||||||
suite.T().Run(edc.FullPath().String(), func(t *testing.T) {
|
|
||||||
streamChannel := edc.Items()
|
|
||||||
// Verify that each message can be restored
|
|
||||||
for stream := range streamChannel {
|
|
||||||
buf := &bytes.Buffer{}
|
|
||||||
read, err := buf.ReadFrom(stream.ToReader())
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.NotZero(t, read)
|
|
||||||
message, err := support.CreateMessageFromBytes(buf.Bytes())
|
|
||||||
assert.NotNil(t, message)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
status := connector.AwaitStatus()
|
|
||||||
suite.NotNil(status)
|
|
||||||
suite.Equal(status.ObjectCount, status.Successful)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestContactSerializationRegression verifies ability to query contact items
|
|
||||||
// and to store contact within Collection. Downloaded contacts are run through
|
|
||||||
// a regression test to ensure that downloaded items can be uploaded.
|
|
||||||
func (suite *GraphConnectorIntegrationSuite) TestContactSerializationRegression() {
|
|
||||||
ctx, flush := tester.NewContext()
|
|
||||||
defer flush()
|
|
||||||
|
|
||||||
connector := loadConnector(ctx, suite.T())
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
name string
|
|
||||||
getCollection func(t *testing.T) []*exchange.Collection
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "Default Contact Folder",
|
|
||||||
getCollection: func(t *testing.T) []*exchange.Collection {
|
|
||||||
scope := selectors.
|
|
||||||
NewExchangeBackup().
|
|
||||||
ContactFolders([]string{suite.user}, []string{exchange.DefaultContactFolder}, selectors.PrefixMatch())[0]
|
|
||||||
collections, err := connector.createCollections(ctx, scope)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
return collections
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
suite.T().Run(test.name, func(t *testing.T) {
|
|
||||||
edcs := test.getCollection(t)
|
|
||||||
require.Equal(t, len(edcs), 1)
|
|
||||||
edc := edcs[0]
|
|
||||||
assert.Equal(t, edc.FullPath().Folder(), exchange.DefaultContactFolder)
|
|
||||||
streamChannel := edc.Items()
|
|
||||||
count := 0
|
|
||||||
for stream := range streamChannel {
|
|
||||||
buf := &bytes.Buffer{}
|
|
||||||
read, err := buf.ReadFrom(stream.ToReader())
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.NotZero(t, read)
|
|
||||||
contact, err := support.CreateContactFromBytes(buf.Bytes())
|
|
||||||
assert.NotNil(t, contact)
|
|
||||||
assert.NoError(t, err, "error on converting contact bytes: "+string(buf.Bytes()))
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
assert.NotZero(t, count)
|
|
||||||
|
|
||||||
status := connector.AwaitStatus()
|
|
||||||
suite.NotNil(status)
|
|
||||||
suite.Equal(status.ObjectCount, status.Successful)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestEventsSerializationRegression ensures functionality of createCollections
|
|
||||||
// to be able to successfully query, download and restore event objects
|
|
||||||
func (suite *GraphConnectorIntegrationSuite) TestEventsSerializationRegression() {
|
|
||||||
ctx, flush := tester.NewContext()
|
|
||||||
defer flush()
|
|
||||||
|
|
||||||
connector := loadConnector(ctx, suite.T())
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
name, expected string
|
|
||||||
getCollection func(t *testing.T) []*exchange.Collection
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "Default Event Calendar",
|
|
||||||
expected: exchange.DefaultCalendar,
|
|
||||||
getCollection: func(t *testing.T) []*exchange.Collection {
|
|
||||||
sel := selectors.NewExchangeBackup()
|
|
||||||
sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch()))
|
|
||||||
collections, err := connector.createCollections(ctx, sel.Scopes()[0])
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
return collections
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Birthday Calendar",
|
|
||||||
expected: "Birthdays",
|
|
||||||
getCollection: func(t *testing.T) []*exchange.Collection {
|
|
||||||
sel := selectors.NewExchangeBackup()
|
|
||||||
sel.Include(sel.EventCalendars([]string{suite.user}, []string{"Birthdays"}, selectors.PrefixMatch()))
|
|
||||||
collections, err := connector.createCollections(ctx, sel.Scopes()[0])
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
return collections
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
suite.T().Run(test.name, func(t *testing.T) {
|
|
||||||
collections := test.getCollection(t)
|
|
||||||
require.Equal(t, len(collections), 1)
|
|
||||||
edc := collections[0]
|
|
||||||
assert.Equal(t, edc.FullPath().Folder(), test.expected)
|
|
||||||
streamChannel := edc.Items()
|
|
||||||
|
|
||||||
for stream := range streamChannel {
|
|
||||||
buf := &bytes.Buffer{}
|
|
||||||
read, err := buf.ReadFrom(stream.ToReader())
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.NotZero(t, read)
|
|
||||||
event, err := support.CreateEventFromBytes(buf.Bytes())
|
|
||||||
assert.NotNil(t, event)
|
|
||||||
assert.NoError(t, err, "experienced error parsing event bytes: "+string(buf.Bytes()))
|
|
||||||
}
|
|
||||||
|
|
||||||
status := connector.AwaitStatus()
|
|
||||||
suite.NotNil(status)
|
|
||||||
suite.Equal(status.ObjectCount, status.Successful)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestAccessOfInboxAllUsers verifies that GraphConnector can
|
|
||||||
// support `--users *` for backup operations. Selector.DiscreteScopes
|
|
||||||
// returns all of the users within one scope. Only users who have
|
|
||||||
// messages in their inbox will have a collection returned.
|
|
||||||
// The final test insures that more than a 75% of the user collections are
|
|
||||||
// returned. If an error was experienced, the test will fail overall
|
|
||||||
func (suite *GraphConnectorIntegrationSuite) TestAccessOfInboxAllUsers() {
|
|
||||||
ctx, flush := tester.NewContext()
|
|
||||||
defer flush()
|
|
||||||
|
|
||||||
t := suite.T()
|
|
||||||
connector := loadConnector(ctx, t)
|
|
||||||
sel := selectors.NewExchangeBackup()
|
|
||||||
sel.Include(sel.MailFolders(selectors.Any(), []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
|
|
||||||
scopes := sel.DiscreteScopes(connector.GetUsers())
|
|
||||||
|
|
||||||
for _, scope := range scopes {
|
|
||||||
users := scope.Get(selectors.ExchangeUser)
|
|
||||||
standard := (len(users) / 4) * 3
|
|
||||||
collections, err := connector.createCollections(ctx, scope)
|
|
||||||
require.NoError(t, err)
|
|
||||||
suite.Greater(len(collections), standard)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *GraphConnectorIntegrationSuite) TestMailFetch() {
|
func (suite *GraphConnectorIntegrationSuite) TestMailFetch() {
|
||||||
ctx, flush := tester.NewContext()
|
ctx, flush := tester.NewContext()
|
||||||
defer flush()
|
defer flush()
|
||||||
@ -408,63 +117,9 @@ func (suite *GraphConnectorIntegrationSuite) TestMailFetch() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///------------------------------------------------------------
|
//-------------------------------------------------------------
|
||||||
// Exchange Functions
|
// Exchange Functions
|
||||||
//-------------------------------------------------------
|
//-------------------------------------------------------------
|
||||||
|
|
||||||
func (suite *GraphConnectorIntegrationSuite) TestEmptyCollections() {
|
|
||||||
dest := tester.DefaultTestRestoreDestination()
|
|
||||||
table := []struct {
|
|
||||||
name string
|
|
||||||
col []data.Collection
|
|
||||||
sel selectors.Selector
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "ExchangeNil",
|
|
||||||
col: nil,
|
|
||||||
sel: selectors.Selector{
|
|
||||||
Service: selectors.ServiceExchange,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "ExchangeEmpty",
|
|
||||||
col: []data.Collection{},
|
|
||||||
sel: selectors.Selector{
|
|
||||||
Service: selectors.ServiceExchange,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "OneDriveNil",
|
|
||||||
col: nil,
|
|
||||||
sel: selectors.Selector{
|
|
||||||
Service: selectors.ServiceOneDrive,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "OneDriveEmpty",
|
|
||||||
col: []data.Collection{},
|
|
||||||
sel: selectors.Selector{
|
|
||||||
Service: selectors.ServiceOneDrive,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range table {
|
|
||||||
suite.T().Run(test.name, func(t *testing.T) {
|
|
||||||
ctx, flush := tester.NewContext()
|
|
||||||
defer flush()
|
|
||||||
|
|
||||||
deets, err := suite.connector.RestoreDataCollections(ctx, test.sel, dest, test.col)
|
|
||||||
require.NoError(t, err)
|
|
||||||
assert.NotNil(t, deets)
|
|
||||||
|
|
||||||
stats := suite.connector.AwaitStatus()
|
|
||||||
assert.Zero(t, stats.ObjectCount)
|
|
||||||
assert.Zero(t, stats.FolderCount)
|
|
||||||
assert.Zero(t, stats.Successful)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func runRestoreBackupTest(
|
func runRestoreBackupTest(
|
||||||
t *testing.T,
|
t *testing.T,
|
||||||
|
|||||||
@ -9,21 +9,23 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
CorsoLoadTests = "CORSO_LOAD_TESTS"
|
CorsoLoadTests = "CORSO_LOAD_TESTS"
|
||||||
CorsoCITests = "CORSO_CI_TESTS"
|
CorsoCITests = "CORSO_CI_TESTS"
|
||||||
CorsoCLIBackupTests = "CORSO_COMMAND_LINE_BACKUP_TESTS"
|
CorsoCLIBackupTests = "CORSO_COMMAND_LINE_BACKUP_TESTS"
|
||||||
CorsoCLIConfigTests = "CORSO_COMMAND_LINE_CONFIG_TESTS"
|
CorsoCLIConfigTests = "CORSO_COMMAND_LINE_CONFIG_TESTS"
|
||||||
CorsoCLIRepoTests = "CORSO_COMMAND_LINE_REPO_TESTS"
|
CorsoCLIRepoTests = "CORSO_COMMAND_LINE_REPO_TESTS"
|
||||||
CorsoCLIRestoreTests = "CORSO_COMMAND_LINE_RESTORE_TESTS"
|
CorsoCLIRestoreTests = "CORSO_COMMAND_LINE_RESTORE_TESTS"
|
||||||
CorsoCLITests = "CORSO_COMMAND_LINE_TESTS"
|
CorsoCLITests = "CORSO_COMMAND_LINE_TESTS"
|
||||||
CorsoGraphConnectorTests = "CORSO_GRAPH_CONNECTOR_TESTS"
|
CorsoConnectorCreateCollectionTests = "CORSO_CONNECTOR_CREATE_COLLECTION_TESTS"
|
||||||
CorsoGraphConnectorExchangeTests = "CORSO_GRAPH_CONNECTOR_EXCHANGE_TESTS"
|
CorsoConnectorDataCollectionTests = "CORSO_CONNECTOR_DATA_COLLECTION_TESTS"
|
||||||
CorsoGraphConnectorOneDriveTests = "CORSO_GRAPH_CONNECTOR_ONE_DRIVE_TESTS"
|
CorsoGraphConnectorTests = "CORSO_GRAPH_CONNECTOR_TESTS"
|
||||||
CorsoKopiaWrapperTests = "CORSO_KOPIA_WRAPPER_TESTS"
|
CorsoGraphConnectorExchangeTests = "CORSO_GRAPH_CONNECTOR_EXCHANGE_TESTS"
|
||||||
CorsoModelStoreTests = "CORSO_MODEL_STORE_TESTS"
|
CorsoGraphConnectorOneDriveTests = "CORSO_GRAPH_CONNECTOR_ONE_DRIVE_TESTS"
|
||||||
CorsoOneDriveTests = "CORSO_ONE_DRIVE_TESTS"
|
CorsoKopiaWrapperTests = "CORSO_KOPIA_WRAPPER_TESTS"
|
||||||
CorsoOperationTests = "CORSO_OPERATION_TESTS"
|
CorsoModelStoreTests = "CORSO_MODEL_STORE_TESTS"
|
||||||
CorsoRepositoryTests = "CORSO_REPOSITORY_TESTS"
|
CorsoOneDriveTests = "CORSO_ONE_DRIVE_TESTS"
|
||||||
|
CorsoOperationTests = "CORSO_OPERATION_TESTS"
|
||||||
|
CorsoRepositoryTests = "CORSO_REPOSITORY_TESTS"
|
||||||
)
|
)
|
||||||
|
|
||||||
// File needs to be a single message .json
|
// File needs to be a single message .json
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user