move exchange data collections to exchange (#1818)

## Description

moves the DataCollections producer out of
collections and into exchange, along with the
integration tests. The only changes are the
code shuffles, passing down required values,
and the unexporting of funcs that were only
exported for the old design.

## Does this PR need a docs update or release note?

- [x]  No 

## Type of change

- [x] 🐹 Trivial/Minor

## Issue(s)

* #1727

## Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2022-12-20 15:46:06 -07:00 committed by GitHub
parent f202007843
commit ce7fb30a7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 575 additions and 485 deletions

View File

@ -5,17 +5,14 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/connector/exchange" "github.com/alcionai/corso/src/internal/connector/exchange"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/onedrive" "github.com/alcionai/corso/src/internal/connector/onedrive"
"github.com/alcionai/corso/src/internal/connector/sharepoint" "github.com/alcionai/corso/src/internal/connector/sharepoint"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
D "github.com/alcionai/corso/src/internal/diagnostics" D "github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
@ -46,9 +43,28 @@ func (gc *GraphConnector) DataCollections(
switch sels.Service { switch sels.Service {
case selectors.ServiceExchange: case selectors.ServiceExchange:
return gc.ExchangeDataCollection(ctx, sels, metadata, ctrlOpts) colls, err := exchange.DataCollections(
ctx,
sels,
metadata,
gc.GetUsers(),
gc.credentials,
// gc.Service,
gc.UpdateStatus,
ctrlOpts)
if err != nil {
return nil, err
}
for range colls {
gc.incrementAwaitingMessages()
}
return colls, nil
case selectors.ServiceOneDrive: case selectors.ServiceOneDrive:
return gc.OneDriveDataCollections(ctx, sels, ctrlOpts) return gc.OneDriveDataCollections(ctx, sels, ctrlOpts)
case selectors.ServiceSharePoint: case selectors.ServiceSharePoint:
colls, err := sharepoint.DataCollections( colls, err := sharepoint.DataCollections(
ctx, ctx,
@ -67,6 +83,7 @@ func (gc *GraphConnector) DataCollections(
} }
return colls, nil return colls, nil
default: default:
return nil, errors.Errorf("service %s not supported", sels.Service.String()) return nil, errors.Errorf("service %s not supported", sels.Service.String())
} }
@ -116,112 +133,6 @@ func verifyBackupInputs(sels selectors.Selector, userPNs, siteIDs []string) erro
return nil return nil
} }
// ---------------------------------------------------------------------------
// Exchange
// ---------------------------------------------------------------------------
// createExchangeCollections - utility function that retrieves M365
// IDs through Microsoft Graph API. The selectors.ExchangeScope
// determines the type of collections that are retrieved.
func (gc *GraphConnector) createExchangeCollections(
ctx context.Context,
scope selectors.ExchangeScope,
dps exchange.DeltaPaths,
ctrlOpts control.Options,
) ([]data.Collection, error) {
var (
errs *multierror.Error
users = scope.Get(selectors.ExchangeUser)
allCollections = make([]data.Collection, 0)
)
// Create collection of ExchangeDataCollection
for _, user := range users {
collections := make(map[string]data.Collection)
qp := graph.QueryParams{
Category: scope.Category().PathType(),
ResourceOwner: user,
Credentials: gc.credentials,
}
foldersComplete, closer := observe.MessageWithCompletion(fmt.Sprintf("∙ %s - %s:", qp.Category, user))
defer closer()
defer close(foldersComplete)
resolver, err := exchange.PopulateExchangeContainerResolver(ctx, qp)
if err != nil {
return nil, errors.Wrap(err, "getting folder cache")
}
err = exchange.FilterContainersAndFillCollections(
ctx,
qp,
collections,
gc.UpdateStatus,
resolver,
scope,
dps,
ctrlOpts)
if err != nil {
return nil, errors.Wrap(err, "filling collections")
}
foldersComplete <- struct{}{}
for _, collection := range collections {
gc.incrementAwaitingMessages()
allCollections = append(allCollections, collection)
}
}
return allCollections, errs.ErrorOrNil()
}
// ExchangeDataCollections returns a DataCollection which the caller can
// use to read mailbox data out for the specified user
// Assumption: User exists
//
// Add iota to this call -> mail, contacts, calendar, etc.
func (gc *GraphConnector) ExchangeDataCollection(
ctx context.Context,
selector selectors.Selector,
metadata []data.Collection,
ctrlOpts control.Options,
) ([]data.Collection, error) {
eb, err := selector.ToExchangeBackup()
if err != nil {
return nil, errors.Wrap(err, "exchangeDataCollection: parsing selector")
}
var (
scopes = eb.DiscreteScopes(gc.GetUsers())
collections = []data.Collection{}
errs error
)
cdps, err := exchange.ParseMetadataCollections(ctx, metadata)
if err != nil {
return nil, err
}
for _, scope := range scopes {
dps := cdps[scope.Category().PathType()]
dcs, err := gc.createExchangeCollections(ctx, scope, dps, control.Options{})
if err != nil {
user := scope.Get(selectors.ExchangeUser)
return nil, support.WrapAndAppend(user[0], err, errs)
}
collections = append(collections, dcs...)
}
return collections, errs
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// OneDrive // OneDrive
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -10,11 +10,8 @@ import (
"github.com/alcionai/corso/src/internal/connector/exchange" "github.com/alcionai/corso/src/internal/connector/exchange"
"github.com/alcionai/corso/src/internal/connector/sharepoint" "github.com/alcionai/corso/src/internal/connector/sharepoint"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
) )
@ -105,14 +102,26 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection
for _, test := range tests { for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) { suite.T().Run(test.name, func(t *testing.T) {
collection, err := connector.ExchangeDataCollection(ctx, test.getSelector(t), nil, control.Options{}) collections, err := exchange.DataCollections(
ctx,
test.getSelector(t),
nil,
[]string{suite.user},
connector.credentials,
connector.UpdateStatus,
control.Options{})
require.NoError(t, err) require.NoError(t, err)
for range collections {
connector.incrementAwaitingMessages()
}
// Categories with delta endpoints will produce a collection for metadata // Categories with delta endpoints will produce a collection for metadata
// as well as the actual data pulled. // as well as the actual data pulled.
assert.GreaterOrEqual(t, len(collection), 1, "expected 1 <= num collections <= 2") assert.GreaterOrEqual(t, len(collections), 1, "expected 1 <= num collections <= 2")
assert.GreaterOrEqual(t, 2, len(collection), "expected 1 <= num collections <= 2") assert.GreaterOrEqual(t, 2, len(collections), "expected 1 <= num collections <= 2")
for _, col := range collection { for _, col := range collections {
for object := range col.Items() { for object := range col.Items() {
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
_, err := buf.ReadFrom(object.ToReader()) _, err := buf.ReadFrom(object.ToReader())
@ -202,7 +211,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti
for _, test := range tests { for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) { suite.T().Run(test.name, func(t *testing.T) {
collection, err := sharepoint.DataCollections( collections, err := sharepoint.DataCollections(
ctx, ctx,
test.getSelector(), test.getSelector(),
[]string{suite.site}, []string{suite.site},
@ -212,375 +221,29 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti
control.Options{}) control.Options{})
require.NoError(t, err) require.NoError(t, err)
for range collections {
connector.incrementAwaitingMessages()
}
// we don't know an exact count of drives this will produce, // we don't know an exact count of drives this will produce,
// but it should be more than one. // but it should be more than one.
assert.Less(t, test.expected, len(collection)) assert.Less(t, test.expected, len(collections))
// the test only reads the firstt collection for _, coll := range collections {
connector.incrementAwaitingMessages() for object := range coll.Items() {
buf := &bytes.Buffer{}
for object := range collection[0].Items() { _, err := buf.ReadFrom(object.ToReader())
buf := &bytes.Buffer{} assert.NoError(t, err, "reading item")
_, err := buf.ReadFrom(object.ToReader()) }
assert.NoError(t, err, "received a buf.Read error")
} }
status := connector.AwaitStatus() status := connector.AwaitStatus()
assert.NotZero(t, status.Successful) assert.NotZero(t, status.Successful)
t.Log(status.String()) t.Log(status.String())
}) })
} }
} }
// ---------------------------------------------------------------------------
// CreateExchangeCollection tests
// ---------------------------------------------------------------------------
type ConnectorCreateExchangeCollectionIntegrationSuite struct {
suite.Suite
connector *GraphConnector
user string
site string
}
func TestConnectorCreateExchangeCollectionIntegrationSuite(t *testing.T) {
if err := tester.RunOnAny(
tester.CorsoCITests,
tester.CorsoConnectorCreateExchangeCollectionTests,
); err != nil {
t.Skip(err)
}
suite.Run(t, new(ConnectorCreateExchangeCollectionIntegrationSuite))
}
func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) SetupSuite() {
ctx, flush := tester.NewContext()
defer flush()
_, err := tester.GetRequiredEnvVars(tester.M365AcctCredEnvs...)
require.NoError(suite.T(), err)
suite.connector = loadConnector(ctx, suite.T(), Users)
suite.user = tester.M365UserID(suite.T())
suite.site = tester.M365SiteID(suite.T())
tester.LogTimeOfTest(suite.T())
}
func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailFetch() {
ctx, flush := tester.NewContext()
defer flush()
var (
t = suite.T()
userID = tester.M365UserID(t)
)
tests := []struct {
name string
scope selectors.ExchangeScope
folderNames map[string]struct{}
}{
{
name: "Folder Iterative Check Mail",
scope: selectors.NewExchangeBackup().MailFolders(
[]string{userID},
[]string{exchange.DefaultMailFolder},
selectors.PrefixMatch(),
)[0],
folderNames: map[string]struct{}{
exchange.DefaultMailFolder: {},
},
},
}
gc := loadConnector(ctx, t, Users)
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
collections, err := gc.createExchangeCollections(ctx, test.scope, exchange.DeltaPaths{}, control.Options{})
require.NoError(t, err)
for _, c := range collections {
if c.FullPath().Service() == path.ExchangeMetadataService {
continue
}
require.NotEmpty(t, c.FullPath().Folder())
folder := c.FullPath().Folder()
delete(test.folderNames, folder)
}
assert.Empty(t, test.folderNames)
})
}
}
func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestDelta() {
ctx, flush := tester.NewContext()
defer flush()
var (
userID = tester.M365UserID(suite.T())
gc = loadConnector(ctx, suite.T(), Users)
)
tests := []struct {
name string
scope selectors.ExchangeScope
}{
{
name: "Mail",
scope: selectors.NewExchangeBackup().MailFolders(
[]string{userID},
[]string{exchange.DefaultMailFolder},
selectors.PrefixMatch(),
)[0],
},
{
name: "Contacts",
scope: selectors.NewExchangeBackup().ContactFolders(
[]string{userID},
[]string{exchange.DefaultContactFolder},
selectors.PrefixMatch(),
)[0],
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
// get collections without providing any delta history (ie: full backup)
collections, err := gc.createExchangeCollections(ctx, test.scope, exchange.DeltaPaths{}, control.Options{})
require.NoError(t, err)
assert.Less(t, 1, len(collections), "retrieved metadata and data collections")
var metadata data.Collection
for _, coll := range collections {
if coll.FullPath().Service() == path.ExchangeMetadataService {
metadata = coll
}
}
require.NotNil(t, metadata, "collections contains a metadata collection")
cdps, err := exchange.ParseMetadataCollections(ctx, []data.Collection{metadata})
require.NoError(t, err)
dps := cdps[test.scope.Category().PathType()]
// now do another backup with the previous delta tokens,
// which should only contain the difference.
collections, err = gc.createExchangeCollections(ctx, test.scope, dps, control.Options{})
require.NoError(t, err)
// TODO(keepers): this isn't a very useful test at the moment. It needs to
// investigate the items in the original and delta collections to at least
// assert some minimum assumptions, such as "deltas should retrieve fewer items".
// Delta usage is commented out at the moment, anyway. So this is currently
// a sanity check that the minimum behavior won't break.
for _, coll := range collections {
if coll.FullPath().Service() != path.ExchangeMetadataService {
ec, ok := coll.(*exchange.Collection)
require.True(t, ok, "collection is *exchange.Collection")
assert.NotNil(t, ec)
}
}
})
}
}
// TestMailSerializationRegression verifies that all mail data stored in the
// test account can be successfully downloaded into bytes and restored into
// M365 mail objects
func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestMailSerializationRegression() {
ctx, flush := tester.NewContext()
defer flush()
var (
t = suite.T()
connector = loadConnector(ctx, t, Users)
sel = selectors.NewExchangeBackup()
)
sel.Include(sel.MailFolders([]string{suite.user}, []string{exchange.DefaultMailFolder}, selectors.PrefixMatch()))
collection, err := connector.createExchangeCollections(ctx, sel.Scopes()[0], exchange.DeltaPaths{}, control.Options{})
require.NoError(t, err)
for _, edc := range collection {
suite.T().Run(edc.FullPath().String(), func(t *testing.T) {
streamChannel := edc.Items()
// Verify that each message can be restored
for stream := range streamChannel {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
message, err := support.CreateMessageFromBytes(buf.Bytes())
assert.NotNil(t, message)
assert.NoError(t, err)
}
})
}
status := connector.AwaitStatus()
suite.NotNil(status)
suite.Equal(status.ObjectCount, status.Successful)
}
// TestContactSerializationRegression verifies ability to query contact items
// and to store contact within Collection. Downloaded contacts are run through
// a regression test to ensure that downloaded items can be uploaded.
func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestContactSerializationRegression() {
ctx, flush := tester.NewContext()
defer flush()
connector := loadConnector(ctx, suite.T(), Users)
tests := []struct {
name string
getCollection func(t *testing.T) []data.Collection
}{
{
name: "Default Contact Folder",
getCollection: func(t *testing.T) []data.Collection {
scope := selectors.
NewExchangeBackup().
ContactFolders([]string{suite.user}, []string{exchange.DefaultContactFolder}, selectors.PrefixMatch())[0]
collections, err := connector.createExchangeCollections(ctx, scope, exchange.DeltaPaths{}, control.Options{})
require.NoError(t, err)
return collections
},
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
edcs := test.getCollection(t)
require.GreaterOrEqual(t, len(edcs), 1, "expected 1 <= num collections <= 2")
require.GreaterOrEqual(t, 2, len(edcs), "expected 1 <= num collections <= 2")
for _, edc := range edcs {
isMetadata := edc.FullPath().Service() == path.ExchangeMetadataService
count := 0
for stream := range edc.Items() {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
if isMetadata {
continue
}
contact, err := support.CreateContactFromBytes(buf.Bytes())
assert.NotNil(t, contact)
assert.NoError(t, err, "error on converting contact bytes: "+buf.String())
count++
}
if isMetadata {
continue
}
assert.Equal(t, edc.FullPath().Folder(), exchange.DefaultContactFolder)
assert.NotZero(t, count)
}
status := connector.AwaitStatus()
suite.NotNil(status)
suite.Equal(status.ObjectCount, status.Successful)
})
}
}
// TestEventsSerializationRegression ensures functionality of createCollections
// to be able to successfully query, download and restore event objects
func (suite *ConnectorCreateExchangeCollectionIntegrationSuite) TestEventsSerializationRegression() {
ctx, flush := tester.NewContext()
defer flush()
connector := loadConnector(ctx, suite.T(), Users)
tests := []struct {
name, expected string
getCollection func(t *testing.T) []data.Collection
}{
{
name: "Default Event Calendar",
expected: exchange.DefaultCalendar,
getCollection: func(t *testing.T) []data.Collection {
sel := selectors.NewExchangeBackup()
sel.Include(sel.EventCalendars([]string{suite.user}, []string{exchange.DefaultCalendar}, selectors.PrefixMatch()))
collections, err := connector.createExchangeCollections(
ctx,
sel.Scopes()[0],
exchange.DeltaPaths{},
control.Options{})
require.NoError(t, err)
return collections
},
},
{
name: "Birthday Calendar",
expected: "Birthdays",
getCollection: func(t *testing.T) []data.Collection {
sel := selectors.NewExchangeBackup()
sel.Include(sel.EventCalendars([]string{suite.user}, []string{"Birthdays"}, selectors.PrefixMatch()))
collections, err := connector.createExchangeCollections(
ctx,
sel.Scopes()[0],
exchange.DeltaPaths{},
control.Options{})
require.NoError(t, err)
return collections
},
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
collections := test.getCollection(t)
require.Equal(t, len(collections), 2)
for _, edc := range collections {
if edc.FullPath().Service() != path.ExchangeMetadataService {
assert.Equal(t, test.expected, edc.FullPath().Folder())
} else {
assert.Equal(t, "", edc.FullPath().Folder())
}
streamChannel := edc.Items()
for stream := range streamChannel {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
event, err := support.CreateEventFromBytes(buf.Bytes())
assert.NotNil(t, event)
assert.NoError(t, err, "creating event from bytes: "+buf.String())
}
}
status := connector.AwaitStatus()
suite.NotNil(status)
suite.Equal(status.ObjectCount, status.Successful)
})
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// CreateSharePointCollection tests // CreateSharePointCollection tests
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -3,12 +3,19 @@ package exchange
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
) )
// MetadataFileNames produces the category-specific set of filenames used to // MetadataFileNames produces the category-specific set of filenames used to
@ -53,7 +60,7 @@ type DeltaPath struct {
// ParseMetadataCollections produces a map of structs holding delta // ParseMetadataCollections produces a map of structs holding delta
// and path lookup maps. // and path lookup maps.
func ParseMetadataCollections( func parseMetadataCollections(
ctx context.Context, ctx context.Context,
colls []data.Collection, colls []data.Collection,
) (CatDeltaPaths, error) { ) (CatDeltaPaths, error) {
@ -146,3 +153,114 @@ func ParseMetadataCollections(
return cdp, nil return cdp, nil
} }
// DataCollections returns a DataCollection which the caller can
// use to read mailbox data out for the specified user
// Assumption: User exists
//
// Add iota to this call -> mail, contacts, calendar, etc.
func DataCollections(
ctx context.Context,
selector selectors.Selector,
metadata []data.Collection,
userPNs []string,
acct account.M365Config,
su support.StatusUpdater,
ctrlOpts control.Options,
) ([]data.Collection, error) {
eb, err := selector.ToExchangeBackup()
if err != nil {
return nil, errors.Wrap(err, "exchangeDataCollection: parsing selector")
}
var (
scopes = eb.DiscreteScopes(userPNs)
collections = []data.Collection{}
errs error
)
cdps, err := parseMetadataCollections(ctx, metadata)
if err != nil {
return nil, err
}
for _, scope := range scopes {
dps := cdps[scope.Category().PathType()]
dcs, err := createCollections(
ctx,
acct,
scope,
dps,
control.Options{},
su)
if err != nil {
user := scope.Get(selectors.ExchangeUser)
return nil, support.WrapAndAppend(user[0], err, errs)
}
collections = append(collections, dcs...)
}
return collections, errs
}
// createCollections - utility function that retrieves M365
// IDs through Microsoft Graph API. The selectors.ExchangeScope
// determines the type of collections that are retrieved.
func createCollections(
ctx context.Context,
acct account.M365Config,
scope selectors.ExchangeScope,
dps DeltaPaths,
ctrlOpts control.Options,
su support.StatusUpdater,
) ([]data.Collection, error) {
var (
errs *multierror.Error
users = scope.Get(selectors.ExchangeUser)
allCollections = make([]data.Collection, 0)
)
// Create collection of ExchangeDataCollection
for _, user := range users {
collections := make(map[string]data.Collection)
qp := graph.QueryParams{
Category: scope.Category().PathType(),
ResourceOwner: user,
Credentials: acct,
}
foldersComplete, closer := observe.MessageWithCompletion(fmt.Sprintf("∙ %s - %s:", qp.Category, user))
defer closer()
defer close(foldersComplete)
resolver, err := populateExchangeContainerResolver(ctx, qp)
if err != nil {
return nil, errors.Wrap(err, "getting folder cache")
}
err = filterContainersAndFillCollections(
ctx,
qp,
collections,
su,
resolver,
scope,
dps,
ctrlOpts)
if err != nil {
return nil, errors.Wrap(err, "filling collections")
}
foldersComplete <- struct{}{}
for _, coll := range collections {
allCollections = append(allCollections, coll)
}
}
return allCollections, errs.ErrorOrNil()
}

View File

@ -1,6 +1,8 @@
package exchange package exchange
import ( import (
"bytes"
"sync"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -11,7 +13,9 @@ import (
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
) )
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -170,7 +174,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
) )
require.NoError(t, err) require.NoError(t, err)
cdps, err := ParseMetadataCollections(ctx, []data.Collection{coll}) cdps, err := parseMetadataCollections(ctx, []data.Collection{coll})
test.expectError(t, err) test.expectError(t, err)
emails := cdps[path.EmailCategory] emails := cdps[path.EmailCategory]
@ -184,3 +188,397 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
}) })
} }
} }
// ---------------------------------------------------------------------------
// Integration tests
// ---------------------------------------------------------------------------
func newStatusUpdater(t *testing.T, wg *sync.WaitGroup) func(status *support.ConnectorOperationStatus) {
updater := func(status *support.ConnectorOperationStatus) {
defer wg.Done()
assert.Zero(t, status.ErrorCount)
}
return updater
}
type DataCollectionsIntegrationSuite struct {
suite.Suite
user string
site string
}
func TestDataCollectionsIntegrationSuite(t *testing.T) {
if err := tester.RunOnAny(
tester.CorsoCITests,
tester.CorsoConnectorCreateExchangeCollectionTests,
); err != nil {
t.Skip(err)
}
suite.Run(t, new(DataCollectionsIntegrationSuite))
}
func (suite *DataCollectionsIntegrationSuite) SetupSuite() {
_, err := tester.GetRequiredEnvVars(tester.M365AcctCredEnvs...)
require.NoError(suite.T(), err)
suite.user = tester.M365UserID(suite.T())
suite.site = tester.M365SiteID(suite.T())
tester.LogTimeOfTest(suite.T())
}
func (suite *DataCollectionsIntegrationSuite) TestMailFetch() {
ctx, flush := tester.NewContext()
defer flush()
var (
userID = tester.M365UserID(suite.T())
acct, err = tester.NewM365Account(suite.T()).M365Config()
)
require.NoError(suite.T(), err)
tests := []struct {
name string
scope selectors.ExchangeScope
folderNames map[string]struct{}
}{
{
name: "Folder Iterative Check Mail",
scope: selectors.NewExchangeBackup().MailFolders(
[]string{userID},
[]string{DefaultMailFolder},
selectors.PrefixMatch(),
)[0],
folderNames: map[string]struct{}{
DefaultMailFolder: {},
},
},
}
// gc := loadConnector(ctx, t, Users)
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
collections, err := createCollections(
ctx,
acct,
test.scope,
DeltaPaths{},
control.Options{},
func(status *support.ConnectorOperationStatus) {})
require.NoError(t, err)
for _, c := range collections {
if c.FullPath().Service() == path.ExchangeMetadataService {
continue
}
require.NotEmpty(t, c.FullPath().Folder())
folder := c.FullPath().Folder()
delete(test.folderNames, folder)
}
assert.Empty(t, test.folderNames)
})
}
}
func (suite *DataCollectionsIntegrationSuite) TestDelta() {
ctx, flush := tester.NewContext()
defer flush()
var (
userID = tester.M365UserID(suite.T())
acct, err = tester.NewM365Account(suite.T()).M365Config()
)
require.NoError(suite.T(), err)
tests := []struct {
name string
scope selectors.ExchangeScope
}{
{
name: "Mail",
scope: selectors.NewExchangeBackup().MailFolders(
[]string{userID},
[]string{DefaultMailFolder},
selectors.PrefixMatch(),
)[0],
},
{
name: "Contacts",
scope: selectors.NewExchangeBackup().ContactFolders(
[]string{userID},
[]string{DefaultContactFolder},
selectors.PrefixMatch(),
)[0],
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
// get collections without providing any delta history (ie: full backup)
collections, err := createCollections(
ctx,
acct,
test.scope,
DeltaPaths{},
control.Options{},
func(status *support.ConnectorOperationStatus) {})
require.NoError(t, err)
assert.Less(t, 1, len(collections), "retrieved metadata and data collections")
var metadata data.Collection
for _, coll := range collections {
if coll.FullPath().Service() == path.ExchangeMetadataService {
metadata = coll
}
}
require.NotNil(t, metadata, "collections contains a metadata collection")
cdps, err := parseMetadataCollections(ctx, []data.Collection{metadata})
require.NoError(t, err)
dps := cdps[test.scope.Category().PathType()]
// now do another backup with the previous delta tokens,
// which should only contain the difference.
collections, err = createCollections(
ctx,
acct,
test.scope,
dps,
control.Options{},
func(status *support.ConnectorOperationStatus) {})
require.NoError(t, err)
// TODO(keepers): this isn't a very useful test at the moment. It needs to
// investigate the items in the original and delta collections to at least
// assert some minimum assumptions, such as "deltas should retrieve fewer items".
// Delta usage is commented out at the moment, anyway. So this is currently
// a sanity check that the minimum behavior won't break.
for _, coll := range collections {
if coll.FullPath().Service() != path.ExchangeMetadataService {
ec, ok := coll.(*Collection)
require.True(t, ok, "collection is *Collection")
assert.NotNil(t, ec)
}
}
})
}
}
// TestMailSerializationRegression verifies that all mail data stored in the
// test account can be successfully downloaded into bytes and restored into
// M365 mail objects
func (suite *DataCollectionsIntegrationSuite) TestMailSerializationRegression() {
ctx, flush := tester.NewContext()
defer flush()
var (
t = suite.T()
wg sync.WaitGroup
)
acct, err := tester.NewM365Account(t).M365Config()
require.NoError(t, err)
sel := selectors.NewExchangeBackup()
sel.Include(sel.MailFolders([]string{suite.user}, []string{DefaultMailFolder}, selectors.PrefixMatch()))
collections, err := createCollections(
ctx,
acct,
sel.Scopes()[0],
DeltaPaths{},
control.Options{},
newStatusUpdater(t, &wg))
require.NoError(t, err)
wg.Add(len(collections))
for _, edc := range collections {
t.Run(edc.FullPath().String(), func(t *testing.T) {
isMetadata := edc.FullPath().Service() == path.ExchangeMetadataService
streamChannel := edc.Items()
// Verify that each message can be restored
for stream := range streamChannel {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
if isMetadata {
continue
}
message, err := support.CreateMessageFromBytes(buf.Bytes())
assert.NotNil(t, message)
assert.NoError(t, err)
}
})
}
wg.Wait()
}
// TestContactSerializationRegression verifies ability to query contact items
// and to store contact within Collection. Downloaded contacts are run through
// a regression test to ensure that downloaded items can be uploaded.
func (suite *DataCollectionsIntegrationSuite) TestContactSerializationRegression() {
ctx, flush := tester.NewContext()
defer flush()
acct, err := tester.NewM365Account(suite.T()).M365Config()
require.NoError(suite.T(), err)
tests := []struct {
name string
scope selectors.ExchangeScope
}{
{
name: "Default Contact Folder",
scope: selectors.NewExchangeBackup().ContactFolders(
[]string{suite.user},
[]string{DefaultContactFolder},
selectors.PrefixMatch())[0],
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
var wg sync.WaitGroup
edcs, err := createCollections(
ctx,
acct,
test.scope,
DeltaPaths{},
control.Options{},
newStatusUpdater(t, &wg))
require.NoError(t, err)
wg.Add(len(edcs))
require.GreaterOrEqual(t, len(edcs), 1, "expected 1 <= num collections <= 2")
require.GreaterOrEqual(t, 2, len(edcs), "expected 1 <= num collections <= 2")
for _, edc := range edcs {
isMetadata := edc.FullPath().Service() == path.ExchangeMetadataService
count := 0
for stream := range edc.Items() {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
if isMetadata {
continue
}
contact, err := support.CreateContactFromBytes(buf.Bytes())
assert.NotNil(t, contact)
assert.NoError(t, err, "error on converting contact bytes: "+buf.String())
count++
}
if isMetadata {
continue
}
assert.Equal(t, edc.FullPath().Folder(), DefaultContactFolder)
assert.NotZero(t, count)
}
wg.Wait()
})
}
}
// TestEventsSerializationRegression ensures functionality of createCollections
// to be able to successfully query, download and restore event objects
func (suite *DataCollectionsIntegrationSuite) TestEventsSerializationRegression() {
ctx, flush := tester.NewContext()
defer flush()
acct, err := tester.NewM365Account(suite.T()).M365Config()
require.NoError(suite.T(), err)
tests := []struct {
name, expected string
scope selectors.ExchangeScope
}{
{
name: "Default Event Calendar",
expected: DefaultCalendar,
scope: selectors.NewExchangeBackup().EventCalendars(
[]string{suite.user},
[]string{DefaultCalendar},
selectors.PrefixMatch())[0],
},
{
name: "Birthday Calendar",
expected: "Birthdays",
scope: selectors.NewExchangeBackup().EventCalendars(
[]string{suite.user},
[]string{"Birthdays"},
selectors.PrefixMatch())[0],
},
}
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
var wg sync.WaitGroup
collections, err := createCollections(
ctx,
acct,
test.scope,
DeltaPaths{},
control.Options{},
newStatusUpdater(t, &wg))
require.NoError(t, err)
require.Equal(t, len(collections), 2)
wg.Add(len(collections))
for _, edc := range collections {
var isMetadata bool
if edc.FullPath().Service() != path.ExchangeMetadataService {
isMetadata = true
assert.Equal(t, test.expected, edc.FullPath().Folder())
} else {
assert.Equal(t, "", edc.FullPath().Folder())
}
for item := range edc.Items() {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(item.ToReader())
assert.NoError(t, err)
assert.NotZero(t, read)
if isMetadata {
continue
}
event, err := support.CreateEventFromBytes(buf.Bytes())
assert.NotNil(t, event)
assert.NoError(t, err, "creating event from bytes: "+buf.String())
}
}
wg.Wait()
})
}
}

View File

@ -125,11 +125,11 @@ func DeleteContactFolder(ctx context.Context, gs graph.Servicer, user, folderID
return gs.Client().UsersById(user).ContactFoldersById(folderID).Delete(ctx, nil) return gs.Client().UsersById(user).ContactFoldersById(folderID).Delete(ctx, nil)
} }
// PopulateExchangeContainerResolver gets a folder resolver if one is available for // populateExchangeContainerResolver gets a folder resolver if one is available for
// this category of data. If one is not available, returns nil so that other // this category of data. If one is not available, returns nil so that other
// logic in the caller can complete as long as they check if the resolver is not // logic in the caller can complete as long as they check if the resolver is not
// nil. If an error occurs populating the resolver, returns an error. // nil. If an error occurs populating the resolver, returns an error.
func PopulateExchangeContainerResolver( func populateExchangeContainerResolver(
ctx context.Context, ctx context.Context,
qp graph.QueryParams, qp graph.QueryParams,
) (graph.ContainerResolver, error) { ) (graph.ContainerResolver, error) {

View File

@ -39,12 +39,12 @@ func hasErrorCode(err error, code string) bool {
*oDataError.GetError().GetCode() == code *oDataError.GetError().GetCode() == code
} }
// FilterContainersAndFillCollections is a utility function // filterContainersAndFillCollections is a utility function
// that places the M365 object ids belonging to specific directories // that places the M365 object ids belonging to specific directories
// into a Collection. Messages outside of those directories are omitted. // into a Collection. Messages outside of those directories are omitted.
// @param collection is filled with during this function. // @param collection is filled with during this function.
// Supports all exchange applications: Contacts, Events, and Mail // Supports all exchange applications: Contacts, Events, and Mail
func FilterContainersAndFillCollections( func filterContainersAndFillCollections(
ctx context.Context, ctx context.Context,
qp graph.QueryParams, qp graph.QueryParams,
collections map[string]data.Collection, collections map[string]data.Collection,

View File

@ -20,7 +20,7 @@ type ConnectorOperationStatus struct {
ObjectCount int ObjectCount int
FolderCount int FolderCount int
Successful int Successful int
errorCount int ErrorCount int
incomplete bool incomplete bool
incompleteReason string incompleteReason string
additionalDetails string additionalDetails string
@ -69,14 +69,14 @@ func CreateStatus(
ObjectCount: cm.Objects, ObjectCount: cm.Objects,
FolderCount: folders, FolderCount: folders,
Successful: cm.Successes, Successful: cm.Successes,
errorCount: numErr, ErrorCount: numErr,
incomplete: hasErrors, incomplete: hasErrors,
incompleteReason: reason, incompleteReason: reason,
bytes: cm.TotalBytes, bytes: cm.TotalBytes,
additionalDetails: details, additionalDetails: details,
} }
if status.ObjectCount != status.errorCount+status.Successful { if status.ObjectCount != status.ErrorCount+status.Successful {
logger.Ctx(ctx).Errorw( logger.Ctx(ctx).Errorw(
"status object count does not match errors + successes", "status object count does not match errors + successes",
"objects", cm.Objects, "objects", cm.Objects,
@ -114,7 +114,7 @@ func MergeStatus(one, two ConnectorOperationStatus) ConnectorOperationStatus {
ObjectCount: one.ObjectCount + two.ObjectCount, ObjectCount: one.ObjectCount + two.ObjectCount,
FolderCount: one.FolderCount + two.FolderCount, FolderCount: one.FolderCount + two.FolderCount,
Successful: one.Successful + two.Successful, Successful: one.Successful + two.Successful,
errorCount: one.errorCount + two.errorCount, ErrorCount: one.ErrorCount + two.ErrorCount,
bytes: one.bytes + two.bytes, bytes: one.bytes + two.bytes,
incomplete: hasErrors, incomplete: hasErrors,
incompleteReason: one.incompleteReason + ", " + two.incompleteReason, incompleteReason: one.incompleteReason + ", " + two.incompleteReason,