diff --git a/src/internal/connector/data_collections.go b/src/internal/connector/data_collections.go new file mode 100644 index 000000000..ef6a6c923 --- /dev/null +++ b/src/internal/connector/data_collections.go @@ -0,0 +1,231 @@ +package connector + +import ( + "context" + "fmt" + "strings" + + "github.com/hashicorp/go-multierror" + "github.com/pkg/errors" + + "github.com/alcionai/corso/src/internal/connector/exchange" + "github.com/alcionai/corso/src/internal/connector/graph" + "github.com/alcionai/corso/src/internal/connector/onedrive" + "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/data" + D "github.com/alcionai/corso/src/internal/diagnostics" + "github.com/alcionai/corso/src/internal/observe" + "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/selectors" +) + +// --------------------------------------------------------------------------- +// Data Collections +// --------------------------------------------------------------------------- + +// DataCollections utility function to launch backup operations for exchange and onedrive +func (gc *GraphConnector) DataCollections(ctx context.Context, sels selectors.Selector) ([]data.Collection, error) { + ctx, end := D.Span(ctx, "gc:dataCollections", D.Index("service", sels.Service.String())) + defer end() + + err := verifyBackupInputs(sels, gc.Users) + if err != nil { + return nil, err + } + + switch sels.Service { + case selectors.ServiceExchange: + return gc.ExchangeDataCollection(ctx, sels) + case selectors.ServiceOneDrive: + return gc.OneDriveDataCollections(ctx, sels) + default: + return nil, errors.Errorf("service %s not supported", sels) + } +} + +func verifyBackupInputs(sel selectors.Selector, mapOfUsers map[string]string) error { + var personnel []string + + // retrieve users from selectors + switch sel.Service { + case selectors.ServiceExchange: + backup, err := sel.ToExchangeBackup() + if err != nil { + return err + } + + for _, scope := range backup.Scopes() { + temp := scope.Get(selectors.ExchangeUser) + personnel = append(personnel, temp...) + } + case selectors.ServiceOneDrive: + backup, err := sel.ToOneDriveBackup() + if err != nil { + return err + } + + for _, user := range backup.Scopes() { + temp := user.Get(selectors.OneDriveUser) + personnel = append(personnel, temp...) + } + + default: + return errors.New("service %s not supported") + } + + // verify personnel + normUsers := map[string]struct{}{} + + for k := range mapOfUsers { + normUsers[strings.ToLower(k)] = struct{}{} + } + + for _, user := range personnel { + if _, ok := normUsers[strings.ToLower(user)]; !ok { + return fmt.Errorf("%s user not found within tenant", user) + } + } + + return nil +} + +// createCollections - utility function that retrieves M365 +// IDs through Microsoft Graph API. The selectors.ExchangeScope +// determines the type of collections that are stored. +// to the GraphConnector struct. +func (gc *GraphConnector) createCollections( + ctx context.Context, + scope selectors.ExchangeScope, +) ([]*exchange.Collection, error) { + var errs *multierror.Error + + users := scope.Get(selectors.ExchangeUser) + allCollections := make([]*exchange.Collection, 0) + // Create collection of ExchangeDataCollection + for _, user := range users { + collections := make(map[string]*exchange.Collection) + + qp := graph.QueryParams{ + User: user, + Scope: scope, + FailFast: gc.failFast, + Credentials: gc.credentials, + } + + itemCategory := qp.Scope.Category().PathType() + + foldersComplete, closer := observe.MessageWithCompletion(fmt.Sprintf("∙ %s - %s:", itemCategory.String(), user)) + defer closer() + defer close(foldersComplete) + + resolver, err := exchange.PopulateExchangeContainerResolver( + ctx, + qp, + qp.Scope.Category().PathType(), + ) + if err != nil { + return nil, errors.Wrap(err, "getting folder cache") + } + + err = exchange.FilterContainersAndFillCollections( + ctx, + qp, + collections, + gc.UpdateStatus, + resolver) + + if err != nil { + return nil, errors.Wrap(err, "filling collections") + } + + foldersComplete <- struct{}{} + + for _, collection := range collections { + gc.incrementAwaitingMessages() + + allCollections = append(allCollections, collection) + } + } + + return allCollections, errs.ErrorOrNil() +} + +// ExchangeDataCollections returns a DataCollection which the caller can +// use to read mailbox data out for the specified user +// Assumption: User exists +// +// Add iota to this call -> mail, contacts, calendar, etc. +func (gc *GraphConnector) ExchangeDataCollection( + ctx context.Context, + selector selectors.Selector, +) ([]data.Collection, error) { + eb, err := selector.ToExchangeBackup() + if err != nil { + return nil, errors.Wrap(err, "exchangeDataCollection: parsing selector") + } + + var ( + scopes = eb.DiscreteScopes(gc.GetUsers()) + collections = []data.Collection{} + errs error + ) + + for _, scope := range scopes { + // Creates a map of collections based on scope + dcs, err := gc.createCollections(ctx, scope) + if err != nil { + user := scope.Get(selectors.ExchangeUser) + return nil, support.WrapAndAppend(user[0], err, errs) + } + + for _, collection := range dcs { + collections = append(collections, collection) + } + } + + return collections, errs +} + +// OneDriveDataCollections returns a set of DataCollection which represents the OneDrive data +// for the specified user +func (gc *GraphConnector) OneDriveDataCollections( + ctx context.Context, + selector selectors.Selector, +) ([]data.Collection, error) { + odb, err := selector.ToOneDriveBackup() + if err != nil { + return nil, errors.Wrap(err, "oneDriveDataCollection: parsing selector") + } + + collections := []data.Collection{} + + scopes := odb.DiscreteScopes(gc.GetUsers()) + + var errs error + + // for each scope that includes oneDrive items, get all + for _, scope := range scopes { + for _, user := range scope.Get(selectors.OneDriveUser) { + logger.Ctx(ctx).With("user", user).Debug("Creating OneDrive collections") + + odcs, err := onedrive.NewCollections( + gc.credentials.AzureTenantID, + user, + scope, + &gc.graphService, + gc.UpdateStatus, + ).Get(ctx) + if err != nil { + return nil, support.WrapAndAppend(user, err, errs) + } + + collections = append(collections, odcs...) + } + } + + for range collections { + gc.incrementAwaitingMessages() + } + + return collections, errs +} diff --git a/src/internal/connector/data_collections_test.go b/src/internal/connector/data_collections_test.go new file mode 100644 index 000000000..9d82c4c3b --- /dev/null +++ b/src/internal/connector/data_collections_test.go @@ -0,0 +1,414 @@ +package connector + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/connector/exchange" + "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/selectors" +) + +// --------------------------------------------------------------------------- +// DataCollection tests +// --------------------------------------------------------------------------- + +type ConnectorDataCollectionIntegrationSuite struct { + suite.Suite + connector *GraphConnector + user string +} + +func TestConnectorDataCollectionIntegrationSuite(t *testing.T) { + if err := tester.RunOnAny( + tester.CorsoCITests, + tester.CorsoConnectorDataCollectionTests, + ); err != nil { + t.Skip(err) + } + + suite.Run(t, new(ConnectorDataCollectionIntegrationSuite)) +} + +func (suite *ConnectorDataCollectionIntegrationSuite) SetupSuite() { + ctx, flush := tester.NewContext() + defer flush() + + _, err := tester.GetRequiredEnvVars(tester.M365AcctCredEnvs...) + require.NoError(suite.T(), err) + suite.connector = loadConnector(ctx, suite.T()) + suite.user = tester.M365UserID(suite.T()) + tester.LogTimeOfTest(suite.T()) +} + +// TestExchangeDataCollection verifies interface between operation and +// GraphConnector remains stable to receive a non-zero amount of Collections +// for the Exchange Package. Enabled exchange applications: +// - mail +// - contacts +// - events +func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection() { + ctx, flush := tester.NewContext() + defer flush() + + connector := loadConnector(ctx, suite.T()) + tests := []struct { + name string + getSelector func(t *testing.T) selectors.Selector + }{ + { + name: suite.user + " Email", + getSelector: func(t *testing.T) selectors.Selector { + sel := selectors.NewExchangeBackup() + sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch())) + + return sel.Selector + }, + }, + { + name: suite.user + " Contacts", + getSelector: func(t *testing.T) selectors.Selector { + sel := selectors.NewExchangeBackup() + sel.Include(sel.ContactFolders( + []string{suite.user}, + []string{exchange.DefaultContactFolder}, + selectors.PrefixMatch())) + + return sel.Selector + }, + }, + { + name: suite.user + " Events", + getSelector: func(t *testing.T) selectors.Selector { + sel := selectors.NewExchangeBackup() + sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch())) + + return sel.Selector + }, + }, + } + + for _, test := range tests { + suite.T().Run(test.name, func(t *testing.T) { + collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t)) + require.NoError(t, err) + assert.Equal(t, len(collection), 1) + channel := collection[0].Items() + for object := range channel { + buf := &bytes.Buffer{} + _, err := buf.ReadFrom(object.ToReader()) + assert.NoError(t, err, "received a buf.Read error") + } + status := connector.AwaitStatus() + assert.NotZero(t, status.Successful) + t.Log(status.String()) + }) + } +} + +// TestInvalidUserForDataCollections ensures verification process for users +func (suite *ConnectorDataCollectionIntegrationSuite) TestInvalidUserForDataCollections() { + ctx, flush := tester.NewContext() + defer flush() + + invalidUser := "foo@example.com" + connector := loadConnector(ctx, suite.T()) + tests := []struct { + name string + getSelector func(t *testing.T) selectors.Selector + }{ + { + name: "invalid exchange backup user", + getSelector: func(t *testing.T) selectors.Selector { + sel := selectors.NewExchangeBackup() + sel.Include(sel.MailFolders([]string{invalidUser}, selectors.Any())) + return sel.Selector + }, + }, + { + name: "Invalid onedrive backup user", + getSelector: func(t *testing.T) selectors.Selector { + sel := selectors.NewOneDriveBackup() + sel.Include(sel.Folders([]string{invalidUser}, selectors.Any())) + return sel.Selector + }, + }, + } + + for _, test := range tests { + suite.T().Run(test.name, func(t *testing.T) { + collections, err := connector.DataCollections(ctx, test.getSelector(t)) + assert.Error(t, err) + assert.Empty(t, collections) + }) + } +} + +func (suite *GraphConnectorIntegrationSuite) TestEmptyCollections() { + dest := tester.DefaultTestRestoreDestination() + table := []struct { + name string + col []data.Collection + sel selectors.Selector + }{ + { + name: "ExchangeNil", + col: nil, + sel: selectors.Selector{ + Service: selectors.ServiceExchange, + }, + }, + { + name: "ExchangeEmpty", + col: []data.Collection{}, + sel: selectors.Selector{ + Service: selectors.ServiceExchange, + }, + }, + { + name: "OneDriveNil", + col: nil, + sel: selectors.Selector{ + Service: selectors.ServiceOneDrive, + }, + }, + { + name: "OneDriveEmpty", + col: []data.Collection{}, + sel: selectors.Selector{ + Service: selectors.ServiceOneDrive, + }, + }, + } + + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + ctx, flush := tester.NewContext() + defer flush() + + deets, err := suite.connector.RestoreDataCollections(ctx, test.sel, dest, test.col) + require.NoError(t, err) + assert.NotNil(t, deets) + + stats := suite.connector.AwaitStatus() + assert.Zero(t, stats.ObjectCount) + assert.Zero(t, stats.FolderCount) + assert.Zero(t, stats.Successful) + }) + } +} + +// --------------------------------------------------------------------------- +// CreateCollection tests +// --------------------------------------------------------------------------- + +type ConnectorCreateCollectionIntegrationSuite struct { + suite.Suite + connector *GraphConnector + user string +} + +func TestConnectorCreateCollectionIntegrationSuite(t *testing.T) { + if err := tester.RunOnAny( + tester.CorsoCITests, + tester.CorsoConnectorCreateCollectionTests, + ); err != nil { + t.Skip(err) + } + + suite.Run(t, new(ConnectorCreateCollectionIntegrationSuite)) +} + +func (suite *ConnectorCreateCollectionIntegrationSuite) SetupSuite() { + ctx, flush := tester.NewContext() + defer flush() + + _, err := tester.GetRequiredEnvVars(tester.M365AcctCredEnvs...) + require.NoError(suite.T(), err) + suite.connector = loadConnector(ctx, suite.T()) + suite.user = tester.M365UserID(suite.T()) + tester.LogTimeOfTest(suite.T()) +} + +// TestMailSerializationRegression verifies that all mail data stored in the +// test account can be successfully downloaded into bytes and restored into +// M365 mail objects +func (suite *ConnectorCreateCollectionIntegrationSuite) TestMailSerializationRegression() { + ctx, flush := tester.NewContext() + defer flush() + + t := suite.T() + connector := loadConnector(ctx, t) + sel := selectors.NewExchangeBackup() + sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch())) + collection, err := connector.createCollections(ctx, sel.Scopes()[0]) + require.NoError(t, err) + + for _, edc := range collection { + suite.T().Run(edc.FullPath().String(), func(t *testing.T) { + streamChannel := edc.Items() + // Verify that each message can be restored + for stream := range streamChannel { + buf := &bytes.Buffer{} + read, err := buf.ReadFrom(stream.ToReader()) + assert.NoError(t, err) + assert.NotZero(t, read) + message, err := support.CreateMessageFromBytes(buf.Bytes()) + assert.NotNil(t, message) + assert.NoError(t, err) + } + }) + } + + status := connector.AwaitStatus() + suite.NotNil(status) + suite.Equal(status.ObjectCount, status.Successful) +} + +// TestContactSerializationRegression verifies ability to query contact items +// and to store contact within Collection. Downloaded contacts are run through +// a regression test to ensure that downloaded items can be uploaded. +func (suite *ConnectorCreateCollectionIntegrationSuite) TestContactSerializationRegression() { + ctx, flush := tester.NewContext() + defer flush() + + connector := loadConnector(ctx, suite.T()) + + tests := []struct { + name string + getCollection func(t *testing.T) []*exchange.Collection + }{ + { + name: "Default Contact Folder", + getCollection: func(t *testing.T) []*exchange.Collection { + scope := selectors. + NewExchangeBackup(). + ContactFolders([]string{suite.user}, []string{exchange.DefaultContactFolder}, selectors.PrefixMatch())[0] + collections, err := connector.createCollections(ctx, scope) + require.NoError(t, err) + + return collections + }, + }, + } + + for _, test := range tests { + suite.T().Run(test.name, func(t *testing.T) { + edcs := test.getCollection(t) + require.Equal(t, len(edcs), 1) + edc := edcs[0] + assert.Equal(t, edc.FullPath().Folder(), exchange.DefaultContactFolder) + streamChannel := edc.Items() + count := 0 + for stream := range streamChannel { + buf := &bytes.Buffer{} + read, err := buf.ReadFrom(stream.ToReader()) + assert.NoError(t, err) + assert.NotZero(t, read) + contact, err := support.CreateContactFromBytes(buf.Bytes()) + assert.NotNil(t, contact) + assert.NoError(t, err, "error on converting contact bytes: "+string(buf.Bytes())) + count++ + } + assert.NotZero(t, count) + + status := connector.AwaitStatus() + suite.NotNil(status) + suite.Equal(status.ObjectCount, status.Successful) + }) + } +} + +// TestEventsSerializationRegression ensures functionality of createCollections +// to be able to successfully query, download and restore event objects +func (suite *ConnectorCreateCollectionIntegrationSuite) TestEventsSerializationRegression() { + ctx, flush := tester.NewContext() + defer flush() + + connector := loadConnector(ctx, suite.T()) + + tests := []struct { + name, expected string + getCollection func(t *testing.T) []*exchange.Collection + }{ + { + name: "Default Event Calendar", + expected: exchange.DefaultCalendar, + getCollection: func(t *testing.T) []*exchange.Collection { + sel := selectors.NewExchangeBackup() + sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch())) + collections, err := connector.createCollections(ctx, sel.Scopes()[0]) + require.NoError(t, err) + + return collections + }, + }, + { + name: "Birthday Calendar", + expected: "Birthdays", + getCollection: func(t *testing.T) []*exchange.Collection { + sel := selectors.NewExchangeBackup() + sel.Include(sel.EventCalendars([]string{suite.user}, []string{"Birthdays"}, selectors.PrefixMatch())) + collections, err := connector.createCollections(ctx, sel.Scopes()[0]) + require.NoError(t, err) + + return collections + }, + }, + } + + for _, test := range tests { + suite.T().Run(test.name, func(t *testing.T) { + collections := test.getCollection(t) + require.Equal(t, len(collections), 1) + edc := collections[0] + assert.Equal(t, edc.FullPath().Folder(), test.expected) + streamChannel := edc.Items() + + for stream := range streamChannel { + buf := &bytes.Buffer{} + read, err := buf.ReadFrom(stream.ToReader()) + assert.NoError(t, err) + assert.NotZero(t, read) + event, err := support.CreateEventFromBytes(buf.Bytes()) + assert.NotNil(t, event) + assert.NoError(t, err, "experienced error parsing event bytes: "+string(buf.Bytes())) + } + + status := connector.AwaitStatus() + suite.NotNil(status) + suite.Equal(status.ObjectCount, status.Successful) + }) + } +} + +// TestAccessOfInboxAllUsers verifies that GraphConnector can +// support `--users *` for backup operations. Selector.DiscreteScopes +// returns all of the users within one scope. Only users who have +// messages in their inbox will have a collection returned. +// The final test insures that more than a 75% of the user collections are +// returned. If an error was experienced, the test will fail overall +func (suite *ConnectorCreateCollectionIntegrationSuite) TestAccessOfInboxAllUsers() { + ctx, flush := tester.NewContext() + defer flush() + + t := suite.T() + connector := loadConnector(ctx, t) + sel := selectors.NewExchangeBackup() + sel.Include(sel.MailFolders(selectors.Any(), []string{exchange.DefaultMailFolder}, selectors.PrefixMatch())) + scopes := sel.DiscreteScopes(connector.GetUsers()) + + for _, scope := range scopes { + users := scope.Get(selectors.ExchangeUser) + standard := (len(users) / 4) * 3 + collections, err := connector.createCollections(ctx, scope) + require.NoError(t, err) + suite.Greater(len(collections), standard) + } +} diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index cc98e13b0..528006bd1 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -6,10 +6,8 @@ import ( "context" "fmt" "runtime/trace" - "strings" "sync" - "github.com/hashicorp/go-multierror" msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core" "github.com/microsoftgraph/msgraph-sdk-go/models" @@ -21,14 +19,16 @@ import ( "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" D "github.com/alcionai/corso/src/internal/diagnostics" - "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" - "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/selectors" ) +// --------------------------------------------------------------------------- +// Graph Connector +// --------------------------------------------------------------------------- + // GraphConnector is a struct used to wrap the GraphServiceClient and // GraphRequestAdapter from the msgraph-sdk-go. Additional fields are for // bookkeeping and interfacing with other component. @@ -209,42 +209,6 @@ func buildFromMap(isKey bool, mapping map[string]string) []string { return returnString } -// ExchangeDataStream returns a DataCollection which the caller can -// use to read mailbox data out for the specified user -// Assumption: User exists -// -// Add iota to this call -> mail, contacts, calendar, etc. -func (gc *GraphConnector) ExchangeDataCollection( - ctx context.Context, - selector selectors.Selector, -) ([]data.Collection, error) { - eb, err := selector.ToExchangeBackup() - if err != nil { - return nil, errors.Wrap(err, "exchangeDataCollection: parsing selector") - } - - var ( - scopes = eb.DiscreteScopes(gc.GetUsers()) - collections = []data.Collection{} - errs error - ) - - for _, scope := range scopes { - // Creates a map of collections based on scope - dcs, err := gc.createCollections(ctx, scope) - if err != nil { - user := scope.Get(selectors.ExchangeUser) - return nil, support.WrapAndAppend(user[0], err, errs) - } - - for _, collection := range dcs { - collections = append(collections, collection) - } - } - - return collections, errs -} - // RestoreDataCollections restores data from the specified collections // into M365 using the GraphAPI. // SideEffect: gc.status is updated at the completion of operation @@ -278,67 +242,6 @@ func (gc *GraphConnector) RestoreDataCollections( return deets, err } -// createCollections - utility function that retrieves M365 -// IDs through Microsoft Graph API. The selectors.ExchangeScope -// determines the type of collections that are stored. -// to the GraphConnector struct. -func (gc *GraphConnector) createCollections( - ctx context.Context, - scope selectors.ExchangeScope, -) ([]*exchange.Collection, error) { - var errs *multierror.Error - - users := scope.Get(selectors.ExchangeUser) - allCollections := make([]*exchange.Collection, 0) - // Create collection of ExchangeDataCollection - for _, user := range users { - collections := make(map[string]*exchange.Collection) - - qp := graph.QueryParams{ - User: user, - Scope: scope, - FailFast: gc.failFast, - Credentials: gc.credentials, - } - - itemCategory := qp.Scope.Category().PathType() - - foldersComplete, closer := observe.MessageWithCompletion(fmt.Sprintf("∙ %s - %s:", itemCategory.String(), user)) - defer closer() - defer close(foldersComplete) - - resolver, err := exchange.PopulateExchangeContainerResolver( - ctx, - qp, - qp.Scope.Category().PathType(), - ) - if err != nil { - return nil, errors.Wrap(err, "getting folder cache") - } - - err = exchange.FilterContainersAndFillCollections( - ctx, - qp, - collections, - gc.UpdateStatus, - resolver) - - if err != nil { - return nil, errors.Wrap(err, "filling collections") - } - - foldersComplete <- struct{}{} - - for _, collection := range collections { - gc.incrementAwaitingMessages() - - allCollections = append(allCollections, collection) - } - } - - return allCollections, errs.ErrorOrNil() -} - // AwaitStatus waits for all gc tasks to complete and then returns status func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus { defer func() { @@ -378,6 +281,10 @@ func (gc *GraphConnector) incrementAwaitingMessages() { gc.wg.Add(1) } +// --------------------------------------------------------------------------- +// Helper Funcs +// --------------------------------------------------------------------------- + // IsRecoverableError returns true iff error is a RecoverableGCEerror func IsRecoverableError(e error) bool { var recoverable support.RecoverableGCError @@ -389,113 +296,3 @@ func IsNonRecoverableError(e error) bool { var nonRecoverable support.NonRecoverableGCError return errors.As(e, &nonRecoverable) } - -// DataCollections utility function to launch backup operations for exchange and onedrive -func (gc *GraphConnector) DataCollections(ctx context.Context, sels selectors.Selector) ([]data.Collection, error) { - ctx, end := D.Span(ctx, "gc:dataCollections", D.Index("service", sels.Service.String())) - defer end() - - err := verifyBackupInputs(sels, gc.Users) - if err != nil { - return nil, err - } - - switch sels.Service { - case selectors.ServiceExchange: - return gc.ExchangeDataCollection(ctx, sels) - case selectors.ServiceOneDrive: - return gc.OneDriveDataCollections(ctx, sels) - default: - return nil, errors.Errorf("service %s not supported", sels) - } -} - -// OneDriveDataCollections returns a set of DataCollection which represents the OneDrive data -// for the specified user -func (gc *GraphConnector) OneDriveDataCollections( - ctx context.Context, - selector selectors.Selector, -) ([]data.Collection, error) { - odb, err := selector.ToOneDriveBackup() - if err != nil { - return nil, errors.Wrap(err, "oneDriveDataCollection: parsing selector") - } - - collections := []data.Collection{} - - scopes := odb.DiscreteScopes(gc.GetUsers()) - - var errs error - - // for each scope that includes oneDrive items, get all - for _, scope := range scopes { - for _, user := range scope.Get(selectors.OneDriveUser) { - logger.Ctx(ctx).With("user", user).Debug("Creating OneDrive collections") - - odcs, err := onedrive.NewCollections( - gc.credentials.AzureTenantID, - user, - scope, - &gc.graphService, - gc.UpdateStatus, - ).Get(ctx) - if err != nil { - return nil, support.WrapAndAppend(user, err, errs) - } - - collections = append(collections, odcs...) - } - } - - for range collections { - gc.incrementAwaitingMessages() - } - - return collections, errs -} - -func verifyBackupInputs(sel selectors.Selector, mapOfUsers map[string]string) error { - var personnel []string - - // retrieve users from selectors - switch sel.Service { - case selectors.ServiceExchange: - backup, err := sel.ToExchangeBackup() - if err != nil { - return err - } - - for _, scope := range backup.Scopes() { - temp := scope.Get(selectors.ExchangeUser) - personnel = append(personnel, temp...) - } - case selectors.ServiceOneDrive: - backup, err := sel.ToOneDriveBackup() - if err != nil { - return err - } - - for _, user := range backup.Scopes() { - temp := user.Get(selectors.OneDriveUser) - personnel = append(personnel, temp...) - } - - default: - return errors.New("service %s not supported") - } - - // verify personnel - normUsers := map[string]struct{}{} - - for k := range mapOfUsers { - normUsers[strings.ToLower(k)] = struct{}{} - } - - for _, user := range personnel { - if _, ok := normUsers[strings.ToLower(user)]; !ok { - return fmt.Errorf("%s user not found within tenant", user) - } - } - - return nil -} diff --git a/src/internal/connector/graph_connector_helper_test.go b/src/internal/connector/graph_connector_helper_test.go index bafecbd64..1228941e6 100644 --- a/src/internal/connector/graph_connector_helper_test.go +++ b/src/internal/connector/graph_connector_helper_test.go @@ -1,6 +1,7 @@ package connector import ( + "context" "io" "reflect" "testing" @@ -13,6 +14,7 @@ import ( "github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" @@ -878,3 +880,11 @@ func getSelectorWith(service path.ServiceType) selectors.Selector { Service: s, } } + +func loadConnector(ctx context.Context, t *testing.T) *GraphConnector { + a := tester.NewM365Account(t) + connector, err := NewGraphConnector(ctx, a) + require.NoError(t, err) + + return connector +} diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index 7e874a5e9..465c92e32 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -1,8 +1,6 @@ package connector import ( - "bytes" - "context" "testing" "time" @@ -12,7 +10,6 @@ import ( "github.com/alcionai/corso/src/internal/connector/exchange" "github.com/alcionai/corso/src/internal/connector/mockconnector" - "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/control" @@ -26,14 +23,6 @@ type GraphConnectorIntegrationSuite struct { user string } -func loadConnector(ctx context.Context, t *testing.T) *GraphConnector { - a := tester.NewM365Account(t) - connector, err := NewGraphConnector(ctx, a) - require.NoError(t, err) - - return connector -} - func TestGraphConnectorIntegrationSuite(t *testing.T) { if err := tester.RunOnAny( tester.CorsoCITests, @@ -80,286 +69,6 @@ func (suite *GraphConnectorIntegrationSuite) TestSetTenantUsers() { suite.Greater(len(newConnector.Users), 0) } -// TestInvalidUserForDataCollections ensures verification process for users -func (suite *GraphConnectorIntegrationSuite) TestInvalidUserForDataCollections() { - ctx, flush := tester.NewContext() - defer flush() - - invalidUser := "foo@example.com" - connector := loadConnector(ctx, suite.T()) - tests := []struct { - name string - getSelector func(t *testing.T) selectors.Selector - }{ - { - name: "invalid exchange backup user", - getSelector: func(t *testing.T) selectors.Selector { - sel := selectors.NewExchangeBackup() - sel.Include(sel.MailFolders([]string{invalidUser}, selectors.Any())) - return sel.Selector - }, - }, - { - name: "Invalid onedrive backup user", - getSelector: func(t *testing.T) selectors.Selector { - sel := selectors.NewOneDriveBackup() - sel.Include(sel.Folders([]string{invalidUser}, selectors.Any())) - return sel.Selector - }, - }, - } - - for _, test := range tests { - suite.T().Run(test.name, func(t *testing.T) { - collections, err := connector.DataCollections(ctx, test.getSelector(t)) - assert.Error(t, err) - assert.Empty(t, collections) - }) - } -} - -// TestExchangeDataCollection verifies interface between operation and -// GraphConnector remains stable to receive a non-zero amount of Collections -// for the Exchange Package. Enabled exchange applications: -// - mail -// - contacts -// - events -func (suite *GraphConnectorIntegrationSuite) TestExchangeDataCollection() { - ctx, flush := tester.NewContext() - defer flush() - - connector := loadConnector(ctx, suite.T()) - tests := []struct { - name string - getSelector func(t *testing.T) selectors.Selector - }{ - { - name: suite.user + " Email", - getSelector: func(t *testing.T) selectors.Selector { - sel := selectors.NewExchangeBackup() - sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch())) - - return sel.Selector - }, - }, - { - name: suite.user + " Contacts", - getSelector: func(t *testing.T) selectors.Selector { - sel := selectors.NewExchangeBackup() - sel.Include(sel.ContactFolders( - []string{suite.user}, - []string{exchange.DefaultContactFolder}, - selectors.PrefixMatch())) - - return sel.Selector - }, - }, - { - name: suite.user + " Events", - getSelector: func(t *testing.T) selectors.Selector { - sel := selectors.NewExchangeBackup() - sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch())) - - return sel.Selector - }, - }, - } - - for _, test := range tests { - suite.T().Run(test.name, func(t *testing.T) { - collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t)) - require.NoError(t, err) - assert.Equal(t, len(collection), 1) - channel := collection[0].Items() - for object := range channel { - buf := &bytes.Buffer{} - _, err := buf.ReadFrom(object.ToReader()) - assert.NoError(t, err, "received a buf.Read error") - } - status := connector.AwaitStatus() - assert.NotZero(t, status.Successful) - t.Log(status.String()) - }) - } -} - -// TestMailSerializationRegression verifies that all mail data stored in the -// test account can be successfully downloaded into bytes and restored into -// M365 mail objects -func (suite *GraphConnectorIntegrationSuite) TestMailSerializationRegression() { - ctx, flush := tester.NewContext() - defer flush() - - t := suite.T() - connector := loadConnector(ctx, t) - sel := selectors.NewExchangeBackup() - sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch())) - collection, err := connector.createCollections(ctx, sel.Scopes()[0]) - require.NoError(t, err) - - for _, edc := range collection { - suite.T().Run(edc.FullPath().String(), func(t *testing.T) { - streamChannel := edc.Items() - // Verify that each message can be restored - for stream := range streamChannel { - buf := &bytes.Buffer{} - read, err := buf.ReadFrom(stream.ToReader()) - assert.NoError(t, err) - assert.NotZero(t, read) - message, err := support.CreateMessageFromBytes(buf.Bytes()) - assert.NotNil(t, message) - assert.NoError(t, err) - } - }) - } - - status := connector.AwaitStatus() - suite.NotNil(status) - suite.Equal(status.ObjectCount, status.Successful) -} - -// TestContactSerializationRegression verifies ability to query contact items -// and to store contact within Collection. Downloaded contacts are run through -// a regression test to ensure that downloaded items can be uploaded. -func (suite *GraphConnectorIntegrationSuite) TestContactSerializationRegression() { - ctx, flush := tester.NewContext() - defer flush() - - connector := loadConnector(ctx, suite.T()) - - tests := []struct { - name string - getCollection func(t *testing.T) []*exchange.Collection - }{ - { - name: "Default Contact Folder", - getCollection: func(t *testing.T) []*exchange.Collection { - scope := selectors. - NewExchangeBackup(). - ContactFolders([]string{suite.user}, []string{exchange.DefaultContactFolder}, selectors.PrefixMatch())[0] - collections, err := connector.createCollections(ctx, scope) - require.NoError(t, err) - - return collections - }, - }, - } - - for _, test := range tests { - suite.T().Run(test.name, func(t *testing.T) { - edcs := test.getCollection(t) - require.Equal(t, len(edcs), 1) - edc := edcs[0] - assert.Equal(t, edc.FullPath().Folder(), exchange.DefaultContactFolder) - streamChannel := edc.Items() - count := 0 - for stream := range streamChannel { - buf := &bytes.Buffer{} - read, err := buf.ReadFrom(stream.ToReader()) - assert.NoError(t, err) - assert.NotZero(t, read) - contact, err := support.CreateContactFromBytes(buf.Bytes()) - assert.NotNil(t, contact) - assert.NoError(t, err, "error on converting contact bytes: "+string(buf.Bytes())) - count++ - } - assert.NotZero(t, count) - - status := connector.AwaitStatus() - suite.NotNil(status) - suite.Equal(status.ObjectCount, status.Successful) - }) - } -} - -// TestEventsSerializationRegression ensures functionality of createCollections -// to be able to successfully query, download and restore event objects -func (suite *GraphConnectorIntegrationSuite) TestEventsSerializationRegression() { - ctx, flush := tester.NewContext() - defer flush() - - connector := loadConnector(ctx, suite.T()) - - tests := []struct { - name, expected string - getCollection func(t *testing.T) []*exchange.Collection - }{ - { - name: "Default Event Calendar", - expected: exchange.DefaultCalendar, - getCollection: func(t *testing.T) []*exchange.Collection { - sel := selectors.NewExchangeBackup() - sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch())) - collections, err := connector.createCollections(ctx, sel.Scopes()[0]) - require.NoError(t, err) - - return collections - }, - }, - { - name: "Birthday Calendar", - expected: "Birthdays", - getCollection: func(t *testing.T) []*exchange.Collection { - sel := selectors.NewExchangeBackup() - sel.Include(sel.EventCalendars([]string{suite.user}, []string{"Birthdays"}, selectors.PrefixMatch())) - collections, err := connector.createCollections(ctx, sel.Scopes()[0]) - require.NoError(t, err) - - return collections - }, - }, - } - - for _, test := range tests { - suite.T().Run(test.name, func(t *testing.T) { - collections := test.getCollection(t) - require.Equal(t, len(collections), 1) - edc := collections[0] - assert.Equal(t, edc.FullPath().Folder(), test.expected) - streamChannel := edc.Items() - - for stream := range streamChannel { - buf := &bytes.Buffer{} - read, err := buf.ReadFrom(stream.ToReader()) - assert.NoError(t, err) - assert.NotZero(t, read) - event, err := support.CreateEventFromBytes(buf.Bytes()) - assert.NotNil(t, event) - assert.NoError(t, err, "experienced error parsing event bytes: "+string(buf.Bytes())) - } - - status := connector.AwaitStatus() - suite.NotNil(status) - suite.Equal(status.ObjectCount, status.Successful) - }) - } -} - -// TestAccessOfInboxAllUsers verifies that GraphConnector can -// support `--users *` for backup operations. Selector.DiscreteScopes -// returns all of the users within one scope. Only users who have -// messages in their inbox will have a collection returned. -// The final test insures that more than a 75% of the user collections are -// returned. If an error was experienced, the test will fail overall -func (suite *GraphConnectorIntegrationSuite) TestAccessOfInboxAllUsers() { - ctx, flush := tester.NewContext() - defer flush() - - t := suite.T() - connector := loadConnector(ctx, t) - sel := selectors.NewExchangeBackup() - sel.Include(sel.MailFolders(selectors.Any(), []string{exchange.DefaultMailFolder}, selectors.PrefixMatch())) - scopes := sel.DiscreteScopes(connector.GetUsers()) - - for _, scope := range scopes { - users := scope.Get(selectors.ExchangeUser) - standard := (len(users) / 4) * 3 - collections, err := connector.createCollections(ctx, scope) - require.NoError(t, err) - suite.Greater(len(collections), standard) - } -} - func (suite *GraphConnectorIntegrationSuite) TestMailFetch() { ctx, flush := tester.NewContext() defer flush() @@ -408,63 +117,9 @@ func (suite *GraphConnectorIntegrationSuite) TestMailFetch() { } } -///------------------------------------------------------------ +//------------------------------------------------------------- // Exchange Functions -//------------------------------------------------------- - -func (suite *GraphConnectorIntegrationSuite) TestEmptyCollections() { - dest := tester.DefaultTestRestoreDestination() - table := []struct { - name string - col []data.Collection - sel selectors.Selector - }{ - { - name: "ExchangeNil", - col: nil, - sel: selectors.Selector{ - Service: selectors.ServiceExchange, - }, - }, - { - name: "ExchangeEmpty", - col: []data.Collection{}, - sel: selectors.Selector{ - Service: selectors.ServiceExchange, - }, - }, - { - name: "OneDriveNil", - col: nil, - sel: selectors.Selector{ - Service: selectors.ServiceOneDrive, - }, - }, - { - name: "OneDriveEmpty", - col: []data.Collection{}, - sel: selectors.Selector{ - Service: selectors.ServiceOneDrive, - }, - }, - } - - for _, test := range table { - suite.T().Run(test.name, func(t *testing.T) { - ctx, flush := tester.NewContext() - defer flush() - - deets, err := suite.connector.RestoreDataCollections(ctx, test.sel, dest, test.col) - require.NoError(t, err) - assert.NotNil(t, deets) - - stats := suite.connector.AwaitStatus() - assert.Zero(t, stats.ObjectCount) - assert.Zero(t, stats.FolderCount) - assert.Zero(t, stats.Successful) - }) - } -} +//------------------------------------------------------------- func runRestoreBackupTest( t *testing.T, diff --git a/src/internal/tester/integration_runners.go b/src/internal/tester/integration_runners.go index d45e0afc1..c0b295b9d 100644 --- a/src/internal/tester/integration_runners.go +++ b/src/internal/tester/integration_runners.go @@ -9,21 +9,23 @@ import ( ) const ( - CorsoLoadTests = "CORSO_LOAD_TESTS" - CorsoCITests = "CORSO_CI_TESTS" - CorsoCLIBackupTests = "CORSO_COMMAND_LINE_BACKUP_TESTS" - CorsoCLIConfigTests = "CORSO_COMMAND_LINE_CONFIG_TESTS" - CorsoCLIRepoTests = "CORSO_COMMAND_LINE_REPO_TESTS" - CorsoCLIRestoreTests = "CORSO_COMMAND_LINE_RESTORE_TESTS" - CorsoCLITests = "CORSO_COMMAND_LINE_TESTS" - CorsoGraphConnectorTests = "CORSO_GRAPH_CONNECTOR_TESTS" - CorsoGraphConnectorExchangeTests = "CORSO_GRAPH_CONNECTOR_EXCHANGE_TESTS" - CorsoGraphConnectorOneDriveTests = "CORSO_GRAPH_CONNECTOR_ONE_DRIVE_TESTS" - CorsoKopiaWrapperTests = "CORSO_KOPIA_WRAPPER_TESTS" - CorsoModelStoreTests = "CORSO_MODEL_STORE_TESTS" - CorsoOneDriveTests = "CORSO_ONE_DRIVE_TESTS" - CorsoOperationTests = "CORSO_OPERATION_TESTS" - CorsoRepositoryTests = "CORSO_REPOSITORY_TESTS" + CorsoLoadTests = "CORSO_LOAD_TESTS" + CorsoCITests = "CORSO_CI_TESTS" + CorsoCLIBackupTests = "CORSO_COMMAND_LINE_BACKUP_TESTS" + CorsoCLIConfigTests = "CORSO_COMMAND_LINE_CONFIG_TESTS" + CorsoCLIRepoTests = "CORSO_COMMAND_LINE_REPO_TESTS" + CorsoCLIRestoreTests = "CORSO_COMMAND_LINE_RESTORE_TESTS" + CorsoCLITests = "CORSO_COMMAND_LINE_TESTS" + CorsoConnectorCreateCollectionTests = "CORSO_CONNECTOR_CREATE_COLLECTION_TESTS" + CorsoConnectorDataCollectionTests = "CORSO_CONNECTOR_DATA_COLLECTION_TESTS" + CorsoGraphConnectorTests = "CORSO_GRAPH_CONNECTOR_TESTS" + CorsoGraphConnectorExchangeTests = "CORSO_GRAPH_CONNECTOR_EXCHANGE_TESTS" + CorsoGraphConnectorOneDriveTests = "CORSO_GRAPH_CONNECTOR_ONE_DRIVE_TESTS" + CorsoKopiaWrapperTests = "CORSO_KOPIA_WRAPPER_TESTS" + CorsoModelStoreTests = "CORSO_MODEL_STORE_TESTS" + CorsoOneDriveTests = "CORSO_ONE_DRIVE_TESTS" + CorsoOperationTests = "CORSO_OPERATION_TESTS" + CorsoRepositoryTests = "CORSO_REPOSITORY_TESTS" ) // File needs to be a single message .json