diff --git a/src/internal/connector/data_collections.go b/src/internal/connector/data_collections.go index 49bf67f1b..9b7230edf 100644 --- a/src/internal/connector/data_collections.go +++ b/src/internal/connector/data_collections.go @@ -5,17 +5,14 @@ import ( "fmt" "strings" - "github.com/hashicorp/go-multierror" "github.com/pkg/errors" "github.com/alcionai/corso/src/internal/connector/exchange" - "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/onedrive" "github.com/alcionai/corso/src/internal/connector/sharepoint" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" D "github.com/alcionai/corso/src/internal/diagnostics" - "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/selectors" @@ -46,9 +43,28 @@ func (gc *GraphConnector) DataCollections( switch sels.Service { case selectors.ServiceExchange: - return gc.ExchangeDataCollection(ctx, sels, metadata, ctrlOpts) + colls, err := exchange.DataCollections( + ctx, + sels, + metadata, + gc.GetUsers(), + gc.credentials, + // gc.Service, + gc.UpdateStatus, + ctrlOpts) + if err != nil { + return nil, err + } + + for range colls { + gc.incrementAwaitingMessages() + } + + return colls, nil + case selectors.ServiceOneDrive: return gc.OneDriveDataCollections(ctx, sels, ctrlOpts) + case selectors.ServiceSharePoint: colls, err := sharepoint.DataCollections( ctx, @@ -67,6 +83,7 @@ func (gc *GraphConnector) DataCollections( } return colls, nil + default: return nil, errors.Errorf("service %s not supported", sels.Service.String()) } @@ -116,112 +133,6 @@ func verifyBackupInputs(sels selectors.Selector, userPNs, siteIDs []string) erro return nil } -// --------------------------------------------------------------------------- -// Exchange -// --------------------------------------------------------------------------- - -// createExchangeCollections - utility function that retrieves M365 -// IDs through Microsoft Graph API. The selectors.ExchangeScope -// determines the type of collections that are retrieved. -func (gc *GraphConnector) createExchangeCollections( - ctx context.Context, - scope selectors.ExchangeScope, - dps exchange.DeltaPaths, - ctrlOpts control.Options, -) ([]data.Collection, error) { - var ( - errs *multierror.Error - users = scope.Get(selectors.ExchangeUser) - allCollections = make([]data.Collection, 0) - ) - - // Create collection of ExchangeDataCollection - for _, user := range users { - collections := make(map[string]data.Collection) - - qp := graph.QueryParams{ - Category: scope.Category().PathType(), - ResourceOwner: user, - Credentials: gc.credentials, - } - - foldersComplete, closer := observe.MessageWithCompletion(fmt.Sprintf("∙ %s - %s:", qp.Category, user)) - defer closer() - defer close(foldersComplete) - - resolver, err := exchange.PopulateExchangeContainerResolver(ctx, qp) - if err != nil { - return nil, errors.Wrap(err, "getting folder cache") - } - - err = exchange.FilterContainersAndFillCollections( - ctx, - qp, - collections, - gc.UpdateStatus, - resolver, - scope, - dps, - ctrlOpts) - - if err != nil { - return nil, errors.Wrap(err, "filling collections") - } - - foldersComplete <- struct{}{} - - for _, collection := range collections { - gc.incrementAwaitingMessages() - - allCollections = append(allCollections, collection) - } - } - - return allCollections, errs.ErrorOrNil() -} - -// ExchangeDataCollections returns a DataCollection which the caller can -// use to read mailbox data out for the specified user -// Assumption: User exists -// -// Add iota to this call -> mail, contacts, calendar, etc. -func (gc *GraphConnector) ExchangeDataCollection( - ctx context.Context, - selector selectors.Selector, - metadata []data.Collection, - ctrlOpts control.Options, -) ([]data.Collection, error) { - eb, err := selector.ToExchangeBackup() - if err != nil { - return nil, errors.Wrap(err, "exchangeDataCollection: parsing selector") - } - - var ( - scopes = eb.DiscreteScopes(gc.GetUsers()) - collections = []data.Collection{} - errs error - ) - - cdps, err := exchange.ParseMetadataCollections(ctx, metadata) - if err != nil { - return nil, err - } - - for _, scope := range scopes { - dps := cdps[scope.Category().PathType()] - - dcs, err := gc.createExchangeCollections(ctx, scope, dps, control.Options{}) - if err != nil { - user := scope.Get(selectors.ExchangeUser) - return nil, support.WrapAndAppend(user[0], err, errs) - } - - collections = append(collections, dcs...) - } - - return collections, errs -} - // --------------------------------------------------------------------------- // OneDrive // --------------------------------------------------------------------------- diff --git a/src/internal/connector/data_collections_test.go b/src/internal/connector/data_collections_test.go index d3614214f..58a6a3a84 100644 --- a/src/internal/connector/data_collections_test.go +++ b/src/internal/connector/data_collections_test.go @@ -10,11 +10,8 @@ import ( "github.com/alcionai/corso/src/internal/connector/exchange" "github.com/alcionai/corso/src/internal/connector/sharepoint" - "github.com/alcionai/corso/src/internal/connector/support" - "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/control" - "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" ) @@ -105,14 +102,26 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection for _, test := range tests { suite.T().Run(test.name, func(t *testing.T) { - collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t), nil, control.Options{}) + collections, err := exchange.DataCollections( + ctx, + test.getSelector(t), + nil, + []string{suite.user}, + connector.credentials, + connector.UpdateStatus, + control.Options{}) require.NoError(t, err) + + for range collections { + connector.incrementAwaitingMessages() + } + // Categories with delta endpoints will produce a collection for metadata // as well as the actual data pulled. - assert.GreaterOrEqual(t, len(collection), 1, "expected 1 <= num collections <= 2") - assert.GreaterOrEqual(t, 2, len(collection), "expected 1 <= num collections <= 2") + assert.GreaterOrEqual(t, len(collections), 1, "expected 1 <= num collections <= 2") + assert.GreaterOrEqual(t, 2, len(collections), "expected 1 <= num collections <= 2") - for _, col := range collection { + for _, col := range collections { for object := range col.Items() { buf := &bytes.Buffer{} _, err := buf.ReadFrom(object.ToReader()) @@ -202,7 +211,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti for _, test := range tests { suite.T().Run(test.name, func(t *testing.T) { - collection, err := sharepoint.DataCollections( + collections, err := sharepoint.DataCollections( ctx, test.getSelector(), []string{suite.site}, @@ -212,375 +221,29 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti control.Options{}) require.NoError(t, err) + for range collections { + connector.incrementAwaitingMessages() + } + // we don't know an exact count of drives this will produce, // but it should be more than one. - assert.Less(t, test.expected, len(collection)) + assert.Less(t, test.expected, len(collections)) - // the test only reads the firstt collection - connector.incrementAwaitingMessages() - - for object := range collection[0].Items() { - buf := &bytes.Buffer{} - _, err := buf.ReadFrom(object.ToReader()) - assert.NoError(t, err, "received a buf.Read error") + for _, coll := range collections { + for object := range coll.Items() { + buf := &bytes.Buffer{} + _, err := buf.ReadFrom(object.ToReader()) + assert.NoError(t, err, "reading item") + } } status := connector.AwaitStatus() assert.NotZero(t, status.Successful) - t.Log(status.String()) }) } } -// --------------------------------------------------------------------------- -// CreateExchangeCollection tests -// --------------------------------------------------------------------------- - -type ConnectorCreateExchangeCollectionIntegrationSuite struct { - suite.Suite - connector *GraphConnector - user string - site string -} - -func TestConnectorCreateExchangeCollectionIntegrationSuite(t *testing.T) { - if err := tester.RunOnAny( - tester.CorsoCITests, - tester.CorsoConnectorCreateExchangeCollectionTests, - ); err != nil { - t.Skip(err) - } - - suite.Run(t, new(ConnectorCreateExchangeCollectionIntegrationSuite)) -} - -func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) SetupSuite() { - ctx, flush := tester.NewContext() - defer flush() - - _, err := tester.GetRequiredEnvVars(tester.M365AcctCredEnvs...) - require.NoError(suite.T(), err) - suite.connector = loadConnector(ctx, suite.T(), Users) - suite.user = tester.M365UserID(suite.T()) - suite.site = tester.M365SiteID(suite.T()) - tester.LogTimeOfTest(suite.T()) -} - -func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailFetch() { - ctx, flush := tester.NewContext() - defer flush() - - var ( - t = suite.T() - userID = tester.M365UserID(t) - ) - - tests := []struct { - name string - scope selectors.ExchangeScope - folderNames map[string]struct{} - }{ - { - name: "Folder Iterative Check Mail", - scope: selectors.NewExchangeBackup().MailFolders( - []string{userID}, - []string{exchange.DefaultMailFolder}, - selectors.PrefixMatch(), - )[0], - folderNames: map[string]struct{}{ - exchange.DefaultMailFolder: {}, - }, - }, - } - - gc := loadConnector(ctx, t, Users) - - for _, test := range tests { - suite.T().Run(test.name, func(t *testing.T) { - collections, err := gc.createExchangeCollections(ctx, test.scope, exchange.DeltaPaths{}, control.Options{}) - require.NoError(t, err) - - for _, c := range collections { - if c.FullPath().Service() == path.ExchangeMetadataService { - continue - } - - require.NotEmpty(t, c.FullPath().Folder()) - folder := c.FullPath().Folder() - - delete(test.folderNames, folder) - } - - assert.Empty(t, test.folderNames) - }) - } -} - -func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestDelta() { - ctx, flush := tester.NewContext() - defer flush() - - var ( - userID = tester.M365UserID(suite.T()) - gc = loadConnector(ctx, suite.T(), Users) - ) - - tests := []struct { - name string - scope selectors.ExchangeScope - }{ - { - name: "Mail", - scope: selectors.NewExchangeBackup().MailFolders( - []string{userID}, - []string{exchange.DefaultMailFolder}, - selectors.PrefixMatch(), - )[0], - }, - { - name: "Contacts", - scope: selectors.NewExchangeBackup().ContactFolders( - []string{userID}, - []string{exchange.DefaultContactFolder}, - selectors.PrefixMatch(), - )[0], - }, - } - for _, test := range tests { - suite.T().Run(test.name, func(t *testing.T) { - // get collections without providing any delta history (ie: full backup) - collections, err := gc.createExchangeCollections(ctx, test.scope, exchange.DeltaPaths{}, control.Options{}) - require.NoError(t, err) - assert.Less(t, 1, len(collections), "retrieved metadata and data collections") - - var metadata data.Collection - - for _, coll := range collections { - if coll.FullPath().Service() == path.ExchangeMetadataService { - metadata = coll - } - } - - require.NotNil(t, metadata, "collections contains a metadata collection") - - cdps, err := exchange.ParseMetadataCollections(ctx, []data.Collection{metadata}) - require.NoError(t, err) - - dps := cdps[test.scope.Category().PathType()] - - // now do another backup with the previous delta tokens, - // which should only contain the difference. - collections, err = gc.createExchangeCollections(ctx, test.scope, dps, control.Options{}) - require.NoError(t, err) - - // TODO(keepers): this isn't a very useful test at the moment. It needs to - // investigate the items in the original and delta collections to at least - // assert some minimum assumptions, such as "deltas should retrieve fewer items". - // Delta usage is commented out at the moment, anyway. So this is currently - // a sanity check that the minimum behavior won't break. - for _, coll := range collections { - if coll.FullPath().Service() != path.ExchangeMetadataService { - ec, ok := coll.(*exchange.Collection) - require.True(t, ok, "collection is *exchange.Collection") - assert.NotNil(t, ec) - } - } - }) - } -} - -// TestMailSerializationRegression verifies that all mail data stored in the -// test account can be successfully downloaded into bytes and restored into -// M365 mail objects -func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailSerializationRegression() { - ctx, flush := tester.NewContext() - defer flush() - - var ( - t = suite.T() - connector = loadConnector(ctx, t, Users) - sel = selectors.NewExchangeBackup() - ) - - sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch())) - - collection, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], exchange.DeltaPaths{}, control.Options{}) - require.NoError(t, err) - - for _, edc := range collection { - suite.T().Run(edc.FullPath().String(), func(t *testing.T) { - streamChannel := edc.Items() - // Verify that each message can be restored - for stream := range streamChannel { - buf := &bytes.Buffer{} - - read, err := buf.ReadFrom(stream.ToReader()) - assert.NoError(t, err) - assert.NotZero(t, read) - - message, err := support.CreateMessageFromBytes(buf.Bytes()) - assert.NotNil(t, message) - assert.NoError(t, err) - } - }) - } - - status := connector.AwaitStatus() - suite.NotNil(status) - suite.Equal(status.ObjectCount, status.Successful) -} - -// TestContactSerializationRegression verifies ability to query contact items -// and to store contact within Collection. Downloaded contacts are run through -// a regression test to ensure that downloaded items can be uploaded. -func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestContactSerializationRegression() { - ctx, flush := tester.NewContext() - defer flush() - - connector := loadConnector(ctx, suite.T(), Users) - - tests := []struct { - name string - getCollection func(t *testing.T) []data.Collection - }{ - { - name: "Default Contact Folder", - getCollection: func(t *testing.T) []data.Collection { - scope := selectors. - NewExchangeBackup(). - ContactFolders([]string{suite.user}, []string{exchange.DefaultContactFolder}, selectors.PrefixMatch())[0] - collections, err := connector.createExchangeCollections(ctx, scope, exchange.DeltaPaths{}, control.Options{}) - require.NoError(t, err) - - return collections - }, - }, - } - - for _, test := range tests { - suite.T().Run(test.name, func(t *testing.T) { - edcs := test.getCollection(t) - require.GreaterOrEqual(t, len(edcs), 1, "expected 1 <= num collections <= 2") - require.GreaterOrEqual(t, 2, len(edcs), "expected 1 <= num collections <= 2") - - for _, edc := range edcs { - isMetadata := edc.FullPath().Service() == path.ExchangeMetadataService - count := 0 - - for stream := range edc.Items() { - buf := &bytes.Buffer{} - read, err := buf.ReadFrom(stream.ToReader()) - assert.NoError(t, err) - assert.NotZero(t, read) - - if isMetadata { - continue - } - - contact, err := support.CreateContactFromBytes(buf.Bytes()) - assert.NotNil(t, contact) - assert.NoError(t, err, "error on converting contact bytes: "+buf.String()) - count++ - } - - if isMetadata { - continue - } - - assert.Equal(t, edc.FullPath().Folder(), exchange.DefaultContactFolder) - assert.NotZero(t, count) - } - - status := connector.AwaitStatus() - suite.NotNil(status) - suite.Equal(status.ObjectCount, status.Successful) - }) - } -} - -// TestEventsSerializationRegression ensures functionality of createCollections -// to be able to successfully query, download and restore event objects -func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerializationRegression() { - ctx, flush := tester.NewContext() - defer flush() - - connector := loadConnector(ctx, suite.T(), Users) - - tests := []struct { - name, expected string - getCollection func(t *testing.T) []data.Collection - }{ - { - name: "Default Event Calendar", - expected: exchange.DefaultCalendar, - getCollection: func(t *testing.T) []data.Collection { - sel := selectors.NewExchangeBackup() - sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch())) - - collections, err := connector.createExchangeCollections( - ctx, - sel.Scopes()[0], - exchange.DeltaPaths{}, - control.Options{}) - require.NoError(t, err) - - return collections - }, - }, - { - name: "Birthday Calendar", - expected: "Birthdays", - getCollection: func(t *testing.T) []data.Collection { - sel := selectors.NewExchangeBackup() - sel.Include(sel.EventCalendars([]string{suite.user}, []string{"Birthdays"}, selectors.PrefixMatch())) - - collections, err := connector.createExchangeCollections( - ctx, - sel.Scopes()[0], - exchange.DeltaPaths{}, - control.Options{}) - require.NoError(t, err) - - return collections - }, - }, - } - - for _, test := range tests { - suite.T().Run(test.name, func(t *testing.T) { - collections := test.getCollection(t) - require.Equal(t, len(collections), 2) - - for _, edc := range collections { - if edc.FullPath().Service() != path.ExchangeMetadataService { - assert.Equal(t, test.expected, edc.FullPath().Folder()) - } else { - assert.Equal(t, "", edc.FullPath().Folder()) - } - - streamChannel := edc.Items() - - for stream := range streamChannel { - buf := &bytes.Buffer{} - - read, err := buf.ReadFrom(stream.ToReader()) - assert.NoError(t, err) - assert.NotZero(t, read) - - event, err := support.CreateEventFromBytes(buf.Bytes()) - assert.NotNil(t, event) - assert.NoError(t, err, "creating event from bytes: "+buf.String()) - } - } - - status := connector.AwaitStatus() - suite.NotNil(status) - suite.Equal(status.ObjectCount, status.Successful) - }) - } -} - // --------------------------------------------------------------------------- // CreateSharePointCollection tests // --------------------------------------------------------------------------- diff --git a/src/internal/connector/exchange/data_collections.go b/src/internal/connector/exchange/data_collections.go index 87838024e..92bc9e531 100644 --- a/src/internal/connector/exchange/data_collections.go +++ b/src/internal/connector/exchange/data_collections.go @@ -3,12 +3,19 @@ package exchange import ( "context" "encoding/json" + "fmt" + "github.com/hashicorp/go-multierror" "github.com/pkg/errors" "github.com/alcionai/corso/src/internal/connector/graph" + "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/observe" + "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/selectors" ) // MetadataFileNames produces the category-specific set of filenames used to @@ -53,7 +60,7 @@ type DeltaPath struct { // ParseMetadataCollections produces a map of structs holding delta // and path lookup maps. -func ParseMetadataCollections( +func parseMetadataCollections( ctx context.Context, colls []data.Collection, ) (CatDeltaPaths, error) { @@ -146,3 +153,114 @@ func ParseMetadataCollections( return cdp, nil } + +// DataCollections returns a DataCollection which the caller can +// use to read mailbox data out for the specified user +// Assumption: User exists +// +// Add iota to this call -> mail, contacts, calendar, etc. +func DataCollections( + ctx context.Context, + selector selectors.Selector, + metadata []data.Collection, + userPNs []string, + acct account.M365Config, + su support.StatusUpdater, + ctrlOpts control.Options, +) ([]data.Collection, error) { + eb, err := selector.ToExchangeBackup() + if err != nil { + return nil, errors.Wrap(err, "exchangeDataCollection: parsing selector") + } + + var ( + scopes = eb.DiscreteScopes(userPNs) + collections = []data.Collection{} + errs error + ) + + cdps, err := parseMetadataCollections(ctx, metadata) + if err != nil { + return nil, err + } + + for _, scope := range scopes { + dps := cdps[scope.Category().PathType()] + + dcs, err := createCollections( + ctx, + acct, + scope, + dps, + control.Options{}, + su) + if err != nil { + user := scope.Get(selectors.ExchangeUser) + return nil, support.WrapAndAppend(user[0], err, errs) + } + + collections = append(collections, dcs...) + } + + return collections, errs +} + +// createCollections - utility function that retrieves M365 +// IDs through Microsoft Graph API. The selectors.ExchangeScope +// determines the type of collections that are retrieved. +func createCollections( + ctx context.Context, + acct account.M365Config, + scope selectors.ExchangeScope, + dps DeltaPaths, + ctrlOpts control.Options, + su support.StatusUpdater, +) ([]data.Collection, error) { + var ( + errs *multierror.Error + users = scope.Get(selectors.ExchangeUser) + allCollections = make([]data.Collection, 0) + ) + + // Create collection of ExchangeDataCollection + for _, user := range users { + collections := make(map[string]data.Collection) + + qp := graph.QueryParams{ + Category: scope.Category().PathType(), + ResourceOwner: user, + Credentials: acct, + } + + foldersComplete, closer := observe.MessageWithCompletion(fmt.Sprintf("∙ %s - %s:", qp.Category, user)) + defer closer() + defer close(foldersComplete) + + resolver, err := populateExchangeContainerResolver(ctx, qp) + if err != nil { + return nil, errors.Wrap(err, "getting folder cache") + } + + err = filterContainersAndFillCollections( + ctx, + qp, + collections, + su, + resolver, + scope, + dps, + ctrlOpts) + + if err != nil { + return nil, errors.Wrap(err, "filling collections") + } + + foldersComplete <- struct{}{} + + for _, coll := range collections { + allCollections = append(allCollections, coll) + } + } + + return allCollections, errs.ErrorOrNil() +} diff --git a/src/internal/connector/exchange/data_collections_test.go b/src/internal/connector/exchange/data_collections_test.go index 54d295dd6..8e26d4adc 100644 --- a/src/internal/connector/exchange/data_collections_test.go +++ b/src/internal/connector/exchange/data_collections_test.go @@ -1,6 +1,8 @@ package exchange import ( + "bytes" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -11,7 +13,9 @@ import ( "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/selectors" ) // --------------------------------------------------------------------------- @@ -170,7 +174,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { ) require.NoError(t, err) - cdps, err := ParseMetadataCollections(ctx, []data.Collection{coll}) + cdps, err := parseMetadataCollections(ctx, []data.Collection{coll}) test.expectError(t, err) emails := cdps[path.EmailCategory] @@ -184,3 +188,397 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { }) } } + +// --------------------------------------------------------------------------- +// Integration tests +// --------------------------------------------------------------------------- + +func newStatusUpdater(t *testing.T, wg *sync.WaitGroup) func(status *support.ConnectorOperationStatus) { + updater := func(status *support.ConnectorOperationStatus) { + defer wg.Done() + assert.Zero(t, status.ErrorCount) + } + + return updater +} + +type DataCollectionsIntegrationSuite struct { + suite.Suite + user string + site string +} + +func TestDataCollectionsIntegrationSuite(t *testing.T) { + if err := tester.RunOnAny( + tester.CorsoCITests, + tester.CorsoConnectorCreateExchangeCollectionTests, + ); err != nil { + t.Skip(err) + } + + suite.Run(t, new(DataCollectionsIntegrationSuite)) +} + +func (suite *DataCollectionsIntegrationSuite) SetupSuite() { + _, err := tester.GetRequiredEnvVars(tester.M365AcctCredEnvs...) + require.NoError(suite.T(), err) + + suite.user = tester.M365UserID(suite.T()) + suite.site = tester.M365SiteID(suite.T()) + + tester.LogTimeOfTest(suite.T()) +} + +func (suite *DataCollectionsIntegrationSuite) TestMailFetch() { + ctx, flush := tester.NewContext() + defer flush() + + var ( + userID = tester.M365UserID(suite.T()) + acct, err = tester.NewM365Account(suite.T()).M365Config() + ) + + require.NoError(suite.T(), err) + + tests := []struct { + name string + scope selectors.ExchangeScope + folderNames map[string]struct{} + }{ + { + name: "Folder Iterative Check Mail", + scope: selectors.NewExchangeBackup().MailFolders( + []string{userID}, + []string{DefaultMailFolder}, + selectors.PrefixMatch(), + )[0], + folderNames: map[string]struct{}{ + DefaultMailFolder: {}, + }, + }, + } + + // gc := loadConnector(ctx, t, Users) + + for _, test := range tests { + suite.T().Run(test.name, func(t *testing.T) { + collections, err := createCollections( + ctx, + acct, + test.scope, + DeltaPaths{}, + control.Options{}, + func(status *support.ConnectorOperationStatus) {}) + require.NoError(t, err) + + for _, c := range collections { + if c.FullPath().Service() == path.ExchangeMetadataService { + continue + } + + require.NotEmpty(t, c.FullPath().Folder()) + folder := c.FullPath().Folder() + + delete(test.folderNames, folder) + } + + assert.Empty(t, test.folderNames) + }) + } +} + +func (suite *DataCollectionsIntegrationSuite) TestDelta() { + ctx, flush := tester.NewContext() + defer flush() + + var ( + userID = tester.M365UserID(suite.T()) + acct, err = tester.NewM365Account(suite.T()).M365Config() + ) + + require.NoError(suite.T(), err) + + tests := []struct { + name string + scope selectors.ExchangeScope + }{ + { + name: "Mail", + scope: selectors.NewExchangeBackup().MailFolders( + []string{userID}, + []string{DefaultMailFolder}, + selectors.PrefixMatch(), + )[0], + }, + { + name: "Contacts", + scope: selectors.NewExchangeBackup().ContactFolders( + []string{userID}, + []string{DefaultContactFolder}, + selectors.PrefixMatch(), + )[0], + }, + } + for _, test := range tests { + suite.T().Run(test.name, func(t *testing.T) { + // get collections without providing any delta history (ie: full backup) + collections, err := createCollections( + ctx, + acct, + test.scope, + DeltaPaths{}, + control.Options{}, + func(status *support.ConnectorOperationStatus) {}) + require.NoError(t, err) + assert.Less(t, 1, len(collections), "retrieved metadata and data collections") + + var metadata data.Collection + + for _, coll := range collections { + if coll.FullPath().Service() == path.ExchangeMetadataService { + metadata = coll + } + } + + require.NotNil(t, metadata, "collections contains a metadata collection") + + cdps, err := parseMetadataCollections(ctx, []data.Collection{metadata}) + require.NoError(t, err) + + dps := cdps[test.scope.Category().PathType()] + + // now do another backup with the previous delta tokens, + // which should only contain the difference. + collections, err = createCollections( + ctx, + acct, + test.scope, + dps, + control.Options{}, + func(status *support.ConnectorOperationStatus) {}) + require.NoError(t, err) + + // TODO(keepers): this isn't a very useful test at the moment. It needs to + // investigate the items in the original and delta collections to at least + // assert some minimum assumptions, such as "deltas should retrieve fewer items". + // Delta usage is commented out at the moment, anyway. So this is currently + // a sanity check that the minimum behavior won't break. + for _, coll := range collections { + if coll.FullPath().Service() != path.ExchangeMetadataService { + ec, ok := coll.(*Collection) + require.True(t, ok, "collection is *Collection") + assert.NotNil(t, ec) + } + } + }) + } +} + +// TestMailSerializationRegression verifies that all mail data stored in the +// test account can be successfully downloaded into bytes and restored into +// M365 mail objects +func (suite *DataCollectionsIntegrationSuite) TestMailSerializationRegression() { + ctx, flush := tester.NewContext() + defer flush() + + var ( + t = suite.T() + wg sync.WaitGroup + ) + + acct, err := tester.NewM365Account(t).M365Config() + require.NoError(t, err) + + sel := selectors.NewExchangeBackup() + sel.Include(sel.MailFolders([]string{suite.user}, []string{DefaultMailFolder}, selectors.PrefixMatch())) + + collections, err := createCollections( + ctx, + acct, + sel.Scopes()[0], + DeltaPaths{}, + control.Options{}, + newStatusUpdater(t, &wg)) + require.NoError(t, err) + + wg.Add(len(collections)) + + for _, edc := range collections { + t.Run(edc.FullPath().String(), func(t *testing.T) { + isMetadata := edc.FullPath().Service() == path.ExchangeMetadataService + streamChannel := edc.Items() + + // Verify that each message can be restored + for stream := range streamChannel { + buf := &bytes.Buffer{} + + read, err := buf.ReadFrom(stream.ToReader()) + assert.NoError(t, err) + assert.NotZero(t, read) + + if isMetadata { + continue + } + + message, err := support.CreateMessageFromBytes(buf.Bytes()) + assert.NotNil(t, message) + assert.NoError(t, err) + } + }) + } + + wg.Wait() +} + +// TestContactSerializationRegression verifies ability to query contact items +// and to store contact within Collection. Downloaded contacts are run through +// a regression test to ensure that downloaded items can be uploaded. +func (suite *DataCollectionsIntegrationSuite) TestContactSerializationRegression() { + ctx, flush := tester.NewContext() + defer flush() + + acct, err := tester.NewM365Account(suite.T()).M365Config() + require.NoError(suite.T(), err) + + tests := []struct { + name string + scope selectors.ExchangeScope + }{ + { + name: "Default Contact Folder", + scope: selectors.NewExchangeBackup().ContactFolders( + []string{suite.user}, + []string{DefaultContactFolder}, + selectors.PrefixMatch())[0], + }, + } + + for _, test := range tests { + suite.T().Run(test.name, func(t *testing.T) { + var wg sync.WaitGroup + + edcs, err := createCollections( + ctx, + acct, + test.scope, + DeltaPaths{}, + control.Options{}, + newStatusUpdater(t, &wg)) + require.NoError(t, err) + + wg.Add(len(edcs)) + + require.GreaterOrEqual(t, len(edcs), 1, "expected 1 <= num collections <= 2") + require.GreaterOrEqual(t, 2, len(edcs), "expected 1 <= num collections <= 2") + + for _, edc := range edcs { + isMetadata := edc.FullPath().Service() == path.ExchangeMetadataService + count := 0 + + for stream := range edc.Items() { + buf := &bytes.Buffer{} + read, err := buf.ReadFrom(stream.ToReader()) + assert.NoError(t, err) + assert.NotZero(t, read) + + if isMetadata { + continue + } + + contact, err := support.CreateContactFromBytes(buf.Bytes()) + assert.NotNil(t, contact) + assert.NoError(t, err, "error on converting contact bytes: "+buf.String()) + count++ + } + + if isMetadata { + continue + } + + assert.Equal(t, edc.FullPath().Folder(), DefaultContactFolder) + assert.NotZero(t, count) + } + + wg.Wait() + }) + } +} + +// TestEventsSerializationRegression ensures functionality of createCollections +// to be able to successfully query, download and restore event objects +func (suite *DataCollectionsIntegrationSuite) TestEventsSerializationRegression() { + ctx, flush := tester.NewContext() + defer flush() + + acct, err := tester.NewM365Account(suite.T()).M365Config() + require.NoError(suite.T(), err) + + tests := []struct { + name, expected string + scope selectors.ExchangeScope + }{ + { + name: "Default Event Calendar", + expected: DefaultCalendar, + scope: selectors.NewExchangeBackup().EventCalendars( + []string{suite.user}, + []string{DefaultCalendar}, + selectors.PrefixMatch())[0], + }, + { + name: "Birthday Calendar", + expected: "Birthdays", + scope: selectors.NewExchangeBackup().EventCalendars( + []string{suite.user}, + []string{"Birthdays"}, + selectors.PrefixMatch())[0], + }, + } + + for _, test := range tests { + suite.T().Run(test.name, func(t *testing.T) { + var wg sync.WaitGroup + + collections, err := createCollections( + ctx, + acct, + test.scope, + DeltaPaths{}, + control.Options{}, + newStatusUpdater(t, &wg)) + require.NoError(t, err) + require.Equal(t, len(collections), 2) + + wg.Add(len(collections)) + + for _, edc := range collections { + var isMetadata bool + + if edc.FullPath().Service() != path.ExchangeMetadataService { + isMetadata = true + assert.Equal(t, test.expected, edc.FullPath().Folder()) + } else { + assert.Equal(t, "", edc.FullPath().Folder()) + } + + for item := range edc.Items() { + buf := &bytes.Buffer{} + + read, err := buf.ReadFrom(item.ToReader()) + assert.NoError(t, err) + assert.NotZero(t, read) + + if isMetadata { + continue + } + + event, err := support.CreateEventFromBytes(buf.Bytes()) + assert.NotNil(t, event) + assert.NoError(t, err, "creating event from bytes: "+buf.String()) + } + } + + wg.Wait() + }) + } +} diff --git a/src/internal/connector/exchange/service_functions.go b/src/internal/connector/exchange/service_functions.go index 7a920630e..99be26ed6 100644 --- a/src/internal/connector/exchange/service_functions.go +++ b/src/internal/connector/exchange/service_functions.go @@ -125,11 +125,11 @@ func DeleteContactFolder(ctx context.Context, gs graph.Servicer, user, folderID return gs.Client().UsersById(user).ContactFoldersById(folderID).Delete(ctx, nil) } -// PopulateExchangeContainerResolver gets a folder resolver if one is available for +// populateExchangeContainerResolver gets a folder resolver if one is available for // this category of data. If one is not available, returns nil so that other // logic in the caller can complete as long as they check if the resolver is not // nil. If an error occurs populating the resolver, returns an error. -func PopulateExchangeContainerResolver( +func populateExchangeContainerResolver( ctx context.Context, qp graph.QueryParams, ) (graph.ContainerResolver, error) { diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index 40a5ae3fe..25607d574 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -39,12 +39,12 @@ func hasErrorCode(err error, code string) bool { *oDataError.GetError().GetCode() == code } -// FilterContainersAndFillCollections is a utility function +// filterContainersAndFillCollections is a utility function // that places the M365 object ids belonging to specific directories // into a Collection. Messages outside of those directories are omitted. // @param collection is filled with during this function. // Supports all exchange applications: Contacts, Events, and Mail -func FilterContainersAndFillCollections( +func filterContainersAndFillCollections( ctx context.Context, qp graph.QueryParams, collections map[string]data.Collection, diff --git a/src/internal/connector/support/status.go b/src/internal/connector/support/status.go index 6209bd01a..fb08ea1c4 100644 --- a/src/internal/connector/support/status.go +++ b/src/internal/connector/support/status.go @@ -20,7 +20,7 @@ type ConnectorOperationStatus struct { ObjectCount int FolderCount int Successful int - errorCount int + ErrorCount int incomplete bool incompleteReason string additionalDetails string @@ -69,14 +69,14 @@ func CreateStatus( ObjectCount: cm.Objects, FolderCount: folders, Successful: cm.Successes, - errorCount: numErr, + ErrorCount: numErr, incomplete: hasErrors, incompleteReason: reason, bytes: cm.TotalBytes, additionalDetails: details, } - if status.ObjectCount != status.errorCount+status.Successful { + if status.ObjectCount != status.ErrorCount+status.Successful { logger.Ctx(ctx).Errorw( "status object count does not match errors + successes", "objects", cm.Objects, @@ -114,7 +114,7 @@ func MergeStatus(one, two ConnectorOperationStatus) ConnectorOperationStatus { ObjectCount: one.ObjectCount + two.ObjectCount, FolderCount: one.FolderCount + two.FolderCount, Successful: one.Successful + two.Successful, - errorCount: one.errorCount + two.errorCount, + ErrorCount: one.ErrorCount + two.ErrorCount, bytes: one.bytes + two.bytes, incomplete: hasErrors, incompleteReason: one.incompleteReason + ", " + two.incompleteReason,