Allow selective mail backup and change how mail is fetched from Graph (#1123)

* Move interfaces to common location

Upcoming PRs are using these interfaces across packages. Move them to a
common location so multiple packages can use them without import cycles
etc.

* Allow adding newly created folders to the cache (#1131)

* New function to add folders to cache

Allow adding new folders to the cache. Automatically cache the paths for
the new folders. Also add the new function to the interface.

* Reuse the AddToCache function during population

* Wire up ability to back up a single subfolder of mail (#1132)

* Expand cache to return items in it

Required to allow matching an item's path to a selector as the selector
will not provide which paths it matches on easily.

* Function to get collections from cached folders

Returned collections match any matchers given for the folders

* Thread resolver through iterator functions

Allow the folder resolver to be used in all iterator functions. The
resolver will be tied to the current category and user.

* Choose between using resolver and making queries

Allow either using the resolver to get folders with matching names or
using queries to get them.

* Wire up resolver at entry point

Create a resolver instance for each user/category of data being
backedup.

* Preparation for changing how mail enumeration is done (#1157)

* Step towards redoing mail fetching

Pull out old way to get data into a new function and setup some helper
functions etc.

* Switch to pulling mail items folder by folder (#1158)

* Function to pull mail items given collections

Given a set of collections and IDs for those collections pull the mail
items for each collection.

* Create helper function to fetch mail

New helper function to fetch mail items. This goes through each folder
and gets the items for them individually.

* Wire up new way to fetch mail

Leaves fetch logic for other data types undisturbed.

* Tests for new mail fetching logic

Remove tests that were previously in iterators_test.go and move them to
graph_connector_test.go. These tests only had to do with mail logic.
Tests that handled all data types in iterators_test.go have been updated
to skip mail now.
This commit is contained in:
ashmrtn 2022-10-12 16:44:24 -07:00 committed by GitHub
parent 5f0f013458
commit 41319c117f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 504 additions and 175 deletions

View File

@ -5,7 +5,7 @@ import (
) )
// CalendarDisplayable is a transformative struct that aligns // CalendarDisplayable is a transformative struct that aligns
// models.Calendarable interface with the displayable interface. // models.Calendarable interface with the Displayable interface.
type CalendarDisplayable struct { type CalendarDisplayable struct {
models.Calendarable models.Calendarable
} }

View File

@ -240,7 +240,13 @@ func (suite *ExchangeServiceSuite) TestOptionsForContacts() {
func (suite *ExchangeServiceSuite) TestSetupExchangeCollection() { func (suite *ExchangeServiceSuite) TestSetupExchangeCollection() {
userID := tester.M365UserID(suite.T()) userID := tester.M365UserID(suite.T())
sel := selectors.NewExchangeBackup() sel := selectors.NewExchangeBackup()
sel.Include(sel.Users([]string{userID})) // Exchange mail uses a different system to fetch items. Right now the old
// function for it will return an error so we know if it gets called.
sel.Include(
sel.ContactFolders([]string{userID}, selectors.Any()),
sel.EventCalendars([]string{userID}, selectors.Any()),
)
eb, err := sel.ToExchangeBackup() eb, err := sel.ToExchangeBackup()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)

View File

@ -44,23 +44,3 @@ const (
// nextDataLink definition https://docs.microsoft.com/en-us/graph/paging // nextDataLink definition https://docs.microsoft.com/en-us/graph/paging
nextDataLink = "@odata.nextLink" nextDataLink = "@odata.nextLink"
) )
// descendable represents objects that implement msgraph-sdk-go/models.entityable
// and have the concept of a "parent folder".
type descendable interface {
GetId() *string
GetParentFolderId() *string
}
// displayable represents objects that implement msgraph-sdk-fo/models.entityable
// and have the concept of a display name.
type displayable interface {
GetId() *string
GetDisplayName() *string
}
// container is an interface that implements both the descendable and displayble interface.
type container interface {
descendable
displayable
}

View File

@ -38,7 +38,7 @@ func (suite *ExchangeIteratorSuite) TestDisplayable() {
contact, err := support.CreateContactFromBytes(bytes) contact, err := support.CreateContactFromBytes(bytes)
require.NoError(t, err) require.NoError(t, err)
aDisplayable, ok := contact.(displayable) aDisplayable, ok := contact.(graph.Displayable)
assert.True(t, ok) assert.True(t, ok)
assert.NotNil(t, aDisplayable.GetId()) assert.NotNil(t, aDisplayable.GetId())
assert.NotNil(t, aDisplayable.GetDisplayName()) assert.NotNil(t, aDisplayable.GetDisplayName())
@ -50,7 +50,7 @@ func (suite *ExchangeIteratorSuite) TestDescendable() {
message, err := support.CreateMessageFromBytes(bytes) message, err := support.CreateMessageFromBytes(bytes)
require.NoError(t, err) require.NoError(t, err)
aDescendable, ok := message.(descendable) aDescendable, ok := message.(graph.Descendable)
assert.True(t, ok) assert.True(t, ok)
assert.NotNil(t, aDescendable.GetId()) assert.NotNil(t, aDescendable.GetId())
assert.NotNil(t, aDescendable.GetParentFolderId()) assert.NotNil(t, aDescendable.GetParentFolderId())
@ -68,7 +68,8 @@ func loadService(t *testing.T) *exchangeService {
} }
// TestIterativeFunctions verifies that GraphQuery to Iterate // TestIterativeFunctions verifies that GraphQuery to Iterate
// functions are valid for current versioning of msgraph-go-sdk // functions are valid for current versioning of msgraph-go-sdk.
// Tests for mail have been moved to graph_connector_test.go.
func (suite *ExchangeIteratorSuite) TestIterativeFunctions() { func (suite *ExchangeIteratorSuite) TestIterativeFunctions() {
ctx, flush := tester.NewContext() ctx, flush := tester.NewContext()
defer flush() defer flush()
@ -98,16 +99,6 @@ func (suite *ExchangeIteratorSuite) TestIterativeFunctions() {
folderNames map[string]struct{} folderNames map[string]struct{}
}{ }{
{ {
name: "Mail Iterative Check",
queryFunction: GetAllMessagesForUser,
iterativeFunction: IterateSelectAllDescendablesForCollections,
scope: mailScope[0],
transformer: models.CreateMessageCollectionResponseFromDiscriminatorValue,
folderNames: map[string]struct{}{
DefaultMailFolder: {},
"Sent Items": {},
},
}, {
name: "Contacts Iterative Check", name: "Contacts Iterative Check",
queryFunction: GetAllContactFolderNamesForUser, queryFunction: GetAllContactFolderNamesForUser,
iterativeFunction: IterateSelectAllContactsForCollections, iterativeFunction: IterateSelectAllContactsForCollections,
@ -125,15 +116,6 @@ func (suite *ExchangeIteratorSuite) TestIterativeFunctions() {
iterativeFunction: IterateSelectAllEventsFromCalendars, iterativeFunction: IterateSelectAllEventsFromCalendars,
scope: eventScope[0], scope: eventScope[0],
transformer: models.CreateCalendarCollectionResponseFromDiscriminatorValue, transformer: models.CreateCalendarCollectionResponseFromDiscriminatorValue,
}, {
name: "Folder Iterative Check Mail",
queryFunction: GetAllFolderNamesForUser,
iterativeFunction: IterateFilterContainersForCollections,
scope: mailScope[0],
transformer: models.CreateMailFolderCollectionResponseFromDiscriminatorValue,
folderNames: map[string]struct{}{
DefaultMailFolder: {},
},
}, { }, {
name: "Folder Iterative Check Contacts", name: "Folder Iterative Check Contacts",
queryFunction: GetAllContactFolderNamesForUser, queryFunction: GetAllContactFolderNamesForUser,
@ -172,30 +154,13 @@ func (suite *ExchangeIteratorSuite) TestIterativeFunctions() {
qp, qp,
errUpdater, errUpdater,
collections, collections,
nil) nil,
nil,
)
iterateError := pageIterator.Iterate(ctx, callbackFunc) iterateError := pageIterator.Iterate(ctx, callbackFunc)
assert.NoError(t, iterateError) assert.NoError(t, iterateError)
assert.NoError(t, errs) assert.NoError(t, errs)
// TODO(ashmrtn): Only check Exchange Mail folder names right now because
// other resolvers aren't implemented. Once they are we can expand these
// checks, potentially by breaking things out into separate tests per
// category.
if !test.scope.IncludesCategory(selectors.ExchangeMail) {
return
}
for _, c := range collections {
require.NotEmpty(t, c.FullPath().Folder())
folder := c.FullPath().Folder()
if _, ok := test.folderNames[folder]; ok {
delete(test.folderNames, folder)
}
}
assert.Empty(t, test.folderNames)
}) })
} }
} }

View File

@ -11,20 +11,11 @@ import (
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
) )
var _ cachedContainer = &mailFolder{} var _ graph.CachedContainer = &mailFolder{}
// cachedContainer is used for local unit tests but also makes it so that this // mailFolder structure that implements the graph.CachedContainer interface
// code can be broken into generic- and service-specific chunks later on to
// reuse logic in IDToPath.
type cachedContainer interface {
container
Path() *path.Builder
SetPath(*path.Builder)
}
// mailFolder structure that implements the cachedContainer interface
type mailFolder struct { type mailFolder struct {
folder container folder graph.Container
p *path.Builder p *path.Builder
} }
@ -58,7 +49,7 @@ func (mf *mailFolder) GetParentFolderId() *string {
// cache map of cachedContainers where the key = M365ID // cache map of cachedContainers where the key = M365ID
// nameLookup map: Key: DisplayName Value: ID // nameLookup map: Key: DisplayName Value: ID
type mailFolderCache struct { type mailFolderCache struct {
cache map[string]cachedContainer cache map[string]graph.CachedContainer
gs graph.Service gs graph.Service
userID, rootID string userID, rootID string
} }
@ -106,7 +97,7 @@ func (mc *mailFolderCache) populateMailRoot(ctx context.Context, directoryID str
// checkRequiredValues is a helper function to ensure that // checkRequiredValues is a helper function to ensure that
// all the pointers are set prior to being called. // all the pointers are set prior to being called.
func checkRequiredValues(c container) error { func checkRequiredValues(c graph.Container) error {
idPtr := c.GetId() idPtr := c.GetId()
if idPtr == nil || len(*idPtr) == 0 { if idPtr == nil || len(*idPtr) == 0 {
return errors.New("folder without ID") return errors.New("folder without ID")
@ -157,14 +148,10 @@ func (mc *mailFolderCache) Populate(ctx context.Context, baseID string) error {
} }
for _, f := range resp.GetValue() { for _, f := range resp.GetValue() {
if err := checkRequiredValues(f); err != nil { if err := mc.AddToCache(ctx, f); err != nil {
errs = multierror.Append(errs, err) errs = multierror.Append(errs, err)
continue continue
} }
mc.cache[*f.GetId()] = &mailFolder{
folder: f,
}
} }
r := resp.GetAdditionalData() r := resp.GetAdditionalData()
@ -211,8 +198,39 @@ func (mc *mailFolderCache) IDToPath(
// [mc.cache, mc.rootID] // [mc.cache, mc.rootID]
func (mc *mailFolderCache) Init(ctx context.Context, baseNode string) error { func (mc *mailFolderCache) Init(ctx context.Context, baseNode string) error {
if mc.cache == nil { if mc.cache == nil {
mc.cache = map[string]cachedContainer{} mc.cache = map[string]graph.CachedContainer{}
} }
return mc.populateMailRoot(ctx, baseNode) return mc.populateMailRoot(ctx, baseNode)
} }
func (mc *mailFolderCache) AddToCache(ctx context.Context, f graph.Container) error {
if err := checkRequiredValues(f); err != nil {
return errors.Wrap(err, "adding cache entry")
}
if _, ok := mc.cache[*f.GetId()]; ok {
return nil
}
mc.cache[*f.GetId()] = &mailFolder{
folder: f,
}
_, err := mc.IDToPath(ctx, *f.GetId())
if err != nil {
return errors.Wrap(err, "updating adding cache entry")
}
return nil
}
func (mc *mailFolderCache) Items() []graph.CachedContainer {
res := make([]graph.CachedContainer, 0, len(mc.cache))
for _, c := range mc.cache {
res = append(res, c)
}
return res
}

View File

@ -207,7 +207,7 @@ func (suite *ConfiguredMailFolderCacheUnitSuite) SetupTest() {
) )
} }
suite.mc = mailFolderCache{cache: map[string]cachedContainer{}} suite.mc = mailFolderCache{cache: map[string]graph.CachedContainer{}}
for _, c := range suite.allContainers { for _, c := range suite.allContainers {
suite.mc.cache[c.id] = c suite.mc.cache[c.id] = c
@ -276,6 +276,26 @@ func (suite *ConfiguredMailFolderCacheUnitSuite) TestLookupCachedFolderErrorsNot
assert.Error(t, err) assert.Error(t, err)
} }
func (suite *ConfiguredMailFolderCacheUnitSuite) TestAddToCache() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
last := suite.allContainers[len(suite.allContainers)-1]
m := newMockCachedContainer("testAddFolder")
m.parentID = last.id
m.expectedPath = stdpath.Join(last.expectedPath, m.displayName)
require.NoError(t, suite.mc.AddToCache(ctx, m))
p, err := suite.mc.IDToPath(ctx, m.id)
require.NoError(t, err)
assert.Equal(t, m.expectedPath, p.String())
}
type MailFolderCacheIntegrationSuite struct { type MailFolderCacheIntegrationSuite struct {
suite.Suite suite.Suite
gs graph.Service gs graph.Service

View File

@ -11,6 +11,7 @@ import (
msevents "github.com/microsoftgraph/msgraph-sdk-go/users/item/events" msevents "github.com/microsoftgraph/msgraph-sdk-go/users/item/events"
msfolder "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders" msfolder "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders"
msfolderitem "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders/item" msfolderitem "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders/item"
msmfmessage "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders/item/messages"
msmessage "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages" msmessage "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages"
msitem "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages/item" msitem "github.com/microsoftgraph/msgraph-sdk-go/users/item/messages/item"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -136,6 +137,22 @@ func scopeToOptionIdentifier(selector selectors.ExchangeScope) optionIdentifier
// which reduces the overall latency of complex calls // which reduces the overall latency of complex calls
//---------------------------------------------------------------- //----------------------------------------------------------------
func optionsForFolderMessages(moreOps []string) (*msmfmessage.MessagesRequestBuilderGetRequestConfiguration, error) {
selecting, err := buildOptions(moreOps, messages)
if err != nil {
return nil, err
}
requestParameters := &msmfmessage.MessagesRequestBuilderGetQueryParameters{
Select: selecting,
}
options := &msmfmessage.MessagesRequestBuilderGetRequestConfiguration{
QueryParameters: requestParameters,
}
return options, nil
}
// optionsForMessages - used to select allowable options for exchange.Mail types // optionsForMessages - used to select allowable options for exchange.Mail types
// @param moreOps is []string of options(e.g. "parentFolderId, subject") // @param moreOps is []string of options(e.g. "parentFolderId, subject")
// @return is first call in Messages().GetWithRequestConfigurationAndResponseHandler // @return is first call in Messages().GetWithRequestConfigurationAndResponseHandler

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/hashicorp/go-multierror"
absser "github.com/microsoft/kiota-abstractions-go/serialization" absser "github.com/microsoft/kiota-abstractions-go/serialization"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core" msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
@ -354,17 +355,7 @@ func SetupExchangeCollectionVars(scope selectors.ExchangeScope) (
error, error,
) { ) {
if scope.IncludesCategory(selectors.ExchangeMail) { if scope.IncludesCategory(selectors.ExchangeMail) {
if scope.IsAny(selectors.ExchangeMailFolder) { return nil, nil, nil, errors.New("mail no longer supported this way")
return models.CreateMessageCollectionResponseFromDiscriminatorValue,
GetAllMessagesForUser,
IterateSelectAllDescendablesForCollections,
nil
}
return models.CreateMessageCollectionResponseFromDiscriminatorValue,
GetAllMessagesForUser,
IterateAndFilterDescendablesForCollections,
nil
} }
if scope.IncludesCategory(selectors.ExchangeContact) { if scope.IncludesCategory(selectors.ExchangeContact) {
@ -384,11 +375,11 @@ func SetupExchangeCollectionVars(scope selectors.ExchangeScope) (
return nil, nil, nil, errors.New("exchange scope option not supported") return nil, nil, nil, errors.New("exchange scope option not supported")
} }
// maybeGetAndPopulateFolderResolver gets a folder resolver if one is available for // MaybeGetAndPopulateFolderResolver gets a folder resolver if one is available for
// this category of data. If one is not available, returns nil so that other // this category of data. If one is not available, returns nil so that other
// logic in the caller can complete as long as they check if the resolver is not // logic in the caller can complete as long as they check if the resolver is not
// nil. If an error occurs populating the resolver, returns an error. // nil. If an error occurs populating the resolver, returns an error.
func maybeGetAndPopulateFolderResolver( func MaybeGetAndPopulateFolderResolver(
ctx context.Context, ctx context.Context,
qp graph.QueryParams, qp graph.QueryParams,
category path.CategoryType, category path.CategoryType,
@ -483,3 +474,77 @@ func getCollectionPath(
err1, err1,
) )
} }
func AddItemsToCollection(
ctx context.Context,
gs graph.Service,
userID string,
folderID string,
collection *Collection,
) error {
// TODO(ashmrtn): This can be removed when:
// 1. other data types have caching support
// 2. we have a good way to switch between the query for items for each data
// type.
// 3. the below is updated to handle different data categories
//
// The alternative would be to eventually just have collections fetch items as
// they are read. This would allow for streaming all items instead of pulling
// the IDs and then later fetching all the item data.
if collection.FullPath().Category() != path.EmailCategory {
return errors.Errorf(
"unsupported data type %s",
collection.FullPath().Category().String(),
)
}
options, err := optionsForFolderMessages([]string{"id"})
if err != nil {
return errors.Wrap(err, "getting query options")
}
messageResp, err := gs.Client().UsersById(userID).MailFoldersById(folderID).Messages().Get(ctx, options)
if err != nil {
return errors.Wrap(
errors.Wrap(err, support.ConnectorStackErrorTrace(err)),
"initial folder query",
)
}
pageIter, err := msgraphgocore.NewPageIterator(
messageResp,
gs.Adapter(),
models.CreateMessageCollectionResponseFromDiscriminatorValue,
)
if err != nil {
return errors.Wrap(err, "creating graph iterator")
}
var errs *multierror.Error
err = pageIter.Iterate(ctx, func(got any) bool {
item, ok := got.(graph.Idable)
if !ok {
errs = multierror.Append(errs, errors.New("item without ID function"))
return true
}
if item.GetId() == nil {
errs = multierror.Append(errs, errors.New("item with nil ID"))
return true
}
collection.AddJob(*item.GetId())
return true
})
if err != nil {
errs = multierror.Append(errs, errors.Wrap(
errors.Wrap(err, support.ConnectorStackErrorTrace(err)),
"getting folder messages",
))
}
return errs.ErrorOrNil()
}

View File

@ -242,7 +242,7 @@ func (suite *ServiceFunctionsIntegrationSuite) TestCollectContainers() {
Credentials: credentials, Credentials: credentials,
} }
collections := make(map[string]*Collection) collections := make(map[string]*Collection)
err := CollectFolders(ctx, qp, collections, nil) err := CollectFolders(ctx, qp, collections, nil, nil)
assert.NoError(t, err) assert.NoError(t, err)
test.expectedCount(t, len(collections), containerCount) test.expectedCount(t, len(collections), containerCount)

View File

@ -24,6 +24,7 @@ type GraphIterateFunc func(
errUpdater func(string, error), errUpdater func(string, error),
collections map[string]*Collection, collections map[string]*Collection,
statusUpdater support.StatusUpdater, statusUpdater support.StatusUpdater,
resolver graph.ContainerResolver,
) func(any) bool ) func(any) bool
// IterateSelectAllDescendablesForCollection utility function for // IterateSelectAllDescendablesForCollection utility function for
@ -36,12 +37,12 @@ func IterateSelectAllDescendablesForCollections(
errUpdater func(string, error), errUpdater func(string, error),
collections map[string]*Collection, collections map[string]*Collection,
statusUpdater support.StatusUpdater, statusUpdater support.StatusUpdater,
resolver graph.ContainerResolver,
) func(any) bool { ) func(any) bool {
var ( var (
isCategorySet bool isCategorySet bool
collectionType optionIdentifier collectionType optionIdentifier
category path.CategoryType category path.CategoryType
resolver graph.ContainerResolver
dirPath path.Path dirPath path.Path
err error err error
) )
@ -59,18 +60,12 @@ func IterateSelectAllDescendablesForCollections(
category = path.ContactsCategory category = path.ContactsCategory
} }
if r, err := maybeGetAndPopulateFolderResolver(ctx, qp, category); err != nil {
errUpdater("getting folder resolver for category "+category.String(), err)
} else {
resolver = r
}
isCategorySet = true isCategorySet = true
} }
entry, ok := pageItem.(descendable) entry, ok := pageItem.(graph.Descendable)
if !ok { if !ok {
errUpdater(qp.User, errors.New("descendable conversion failure")) errUpdater(qp.User, errors.New("Descendable conversion failure"))
return true return true
} }
@ -127,6 +122,7 @@ func IterateSelectAllEventsFromCalendars(
errUpdater func(string, error), errUpdater func(string, error),
collections map[string]*Collection, collections map[string]*Collection,
statusUpdater support.StatusUpdater, statusUpdater support.StatusUpdater,
resolver graph.ContainerResolver,
) func(any) bool { ) func(any) bool {
var ( var (
isEnabled bool isEnabled bool
@ -136,7 +132,7 @@ func IterateSelectAllEventsFromCalendars(
return func(pageItem any) bool { return func(pageItem any) bool {
if !isEnabled { if !isEnabled {
// Create Collections based on qp.Scope // Create Collections based on qp.Scope
err := CollectFolders(ctx, qp, collections, statusUpdater) err := CollectFolders(ctx, qp, collections, statusUpdater, resolver)
if err != nil { if err != nil {
errUpdater( errUpdater(
qp.User, qp.User,
@ -157,7 +153,7 @@ func IterateSelectAllEventsFromCalendars(
pageItem = CreateCalendarDisplayable(pageItem) pageItem = CreateCalendarDisplayable(pageItem)
calendar, ok := pageItem.(displayable) calendar, ok := pageItem.(graph.Displayable)
if !ok { if !ok {
errUpdater( errUpdater(
qp.User, qp.User,
@ -189,6 +185,53 @@ func IterateSelectAllEventsFromCalendars(
} }
} }
// CollectionsFromResolver returns the set of collections that match the
// selector parameters.
func CollectionsFromResolver(
ctx context.Context,
qp graph.QueryParams,
resolver graph.ContainerResolver,
statusUpdater support.StatusUpdater,
collections map[string]*Collection,
) error {
option, category, notMatcher := getCategoryAndValidation(qp.Scope)
for _, item := range resolver.Items() {
pathString := item.Path().String()
// Skip the root folder for mail which has an empty path.
if len(pathString) == 0 || notMatcher(&pathString) {
continue
}
completePath, err := item.Path().ToDataLayerExchangePathForCategory(
qp.Credentials.TenantID,
qp.User,
category,
false,
)
if err != nil {
return errors.Wrap(err, "resolving collection item path")
}
service, err := createService(qp.Credentials, qp.FailFast)
if err != nil {
return errors.Wrap(err, "making service instance")
}
tmp := NewCollection(
qp.User,
completePath,
option,
service,
statusUpdater,
)
collections[*item.GetId()] = &tmp
}
return nil
}
// IterateAndFilterDescendablesForCollections is a filtering GraphIterateFunc // IterateAndFilterDescendablesForCollections is a filtering GraphIterateFunc
// that places exchange objectsids belonging to specific directories // that places exchange objectsids belonging to specific directories
// into a Collection. Messages outside of those directories are omitted. // into a Collection. Messages outside of those directories are omitted.
@ -198,39 +241,49 @@ func IterateAndFilterDescendablesForCollections(
errUpdater func(string, error), errUpdater func(string, error),
collections map[string]*Collection, collections map[string]*Collection,
statusUpdater support.StatusUpdater, statusUpdater support.StatusUpdater,
resolver graph.ContainerResolver,
) func(any) bool { ) func(any) bool {
var ( var (
isFilterSet bool isFilterSet bool
resolver graph.ContainerResolver
cache map[string]string cache map[string]string
) )
return func(descendItem any) bool { return func(descendItem any) bool {
if !isFilterSet { if !isFilterSet {
err := CollectFolders( if resolver != nil {
ctx, err := CollectionsFromResolver(
qp, ctx,
collections, qp,
statusUpdater, resolver,
) statusUpdater,
if err != nil { collections,
errUpdater(qp.User, err) )
return false if err != nil {
errUpdater(qp.User, err)
return false
}
} else {
err := CollectFolders(
ctx,
qp,
collections,
statusUpdater,
resolver,
)
if err != nil {
errUpdater(qp.User, err)
return false
}
} }
// Caches folder directories // Caches folder directories
cache = make(map[string]string, 0) cache = make(map[string]string, 0)
resolver, err = maybeGetAndPopulateFolderResolver(ctx, qp, path.EmailCategory)
if err != nil {
errUpdater("getting folder resolver for category "+path.EmailCategory.String(), err)
}
isFilterSet = true isFilterSet = true
} }
message, ok := descendItem.(descendable) message, ok := descendItem.(graph.Descendable)
if !ok { if !ok {
errUpdater(qp.User, errors.New("casting messageItem to descendable")) errUpdater(qp.User, errors.New("casting messageItem to Descendable"))
return true return true
} }
// Saving only messages for the created directories // Saving only messages for the created directories
@ -322,12 +375,11 @@ func IterateFilterContainersForCollections(
errUpdater func(string, error), errUpdater func(string, error),
collections map[string]*Collection, collections map[string]*Collection,
statusUpdater support.StatusUpdater, statusUpdater support.StatusUpdater,
resolver graph.ContainerResolver,
) func(any) bool { ) func(any) bool {
var ( var (
resolver graph.ContainerResolver
isSet bool isSet bool
collectPath string collectPath string
err error
option optionIdentifier option optionIdentifier
category path.CategoryType category path.CategoryType
validate func(*string) bool validate func(*string) bool
@ -337,11 +389,6 @@ func IterateFilterContainersForCollections(
if !isSet { if !isSet {
option, category, validate = getCategoryAndValidation(qp.Scope) option, category, validate = getCategoryAndValidation(qp.Scope)
resolver, err = maybeGetAndPopulateFolderResolver(ctx, qp, category)
if err != nil {
errUpdater("getting folder resolver for category "+category.String(), err)
}
isSet = true isSet = true
} }
@ -349,7 +396,7 @@ func IterateFilterContainersForCollections(
folderItem = CreateCalendarDisplayable(folderItem) folderItem = CreateCalendarDisplayable(folderItem)
} }
folder, ok := folderItem.(displayable) folder, ok := folderItem.(graph.Displayable)
if !ok { if !ok {
errUpdater(qp.User, errUpdater(qp.User,
fmt.Errorf("unable to convert input of %T for category: %s", folderItem, category.String()), fmt.Errorf("unable to convert input of %T for category: %s", folderItem, category.String()),
@ -412,6 +459,7 @@ func IterateSelectAllContactsForCollections(
errUpdater func(string, error), errUpdater func(string, error),
collections map[string]*Collection, collections map[string]*Collection,
statusUpdater support.StatusUpdater, statusUpdater support.StatusUpdater,
resolver graph.ContainerResolver,
) func(any) bool { ) func(any) bool {
var ( var (
isPrimarySet bool isPrimarySet bool
@ -433,6 +481,7 @@ func IterateSelectAllContactsForCollections(
qp, qp,
collections, collections,
statusUpdater, statusUpdater,
resolver,
) )
if err != nil { if err != nil {
errUpdater(qp.User, err) errUpdater(qp.User, err)
@ -520,7 +569,7 @@ func IterateSelectAllContactsForCollections(
// iterateFindContainerID is a utility function that supports finding // iterateFindContainerID is a utility function that supports finding
// M365 folders objects that matches the folderName. Iterator callback function // M365 folders objects that matches the folderName. Iterator callback function
// will work on folderCollection responses whose objects implement // will work on folderCollection responses whose objects implement
// the displayable interface. If folder exists, the function updates the // the Displayable interface. If folder exists, the function updates the
// containerID memory address that was passed in. // containerID memory address that was passed in.
// @param containerName is the string representation of the folder, directory or calendar holds // @param containerName is the string representation of the folder, directory or calendar holds
// the underlying M365 objects // the underlying M365 objects
@ -536,16 +585,16 @@ func iterateFindContainerID(
} }
// True when pagination needs more time to get additional responses or // True when pagination needs more time to get additional responses or
// when entry is not able to be converted into a displayable // when entry is not able to be converted into a Displayable
if entry == nil { if entry == nil {
return true return true
} }
folder, ok := entry.(displayable) folder, ok := entry.(graph.Displayable)
if !ok { if !ok {
errUpdater( errUpdater(
errorIdentifier, errorIdentifier,
errors.New("struct does not implement displayable"), errors.New("struct does not implement Displayable"),
) )
return true return true

View File

@ -124,11 +124,16 @@ func RetrieveMessageDataForUser(ctx context.Context, gs graph.Service, user, m36
// CollectFolders is a utility function for creating Collections based off parameters found // CollectFolders is a utility function for creating Collections based off parameters found
// in the ExchangeScope found in the graph.QueryParams // in the ExchangeScope found in the graph.QueryParams
// TODO(ashmrtn): This may not need to do the query if we decide the cache
// should always:
// 1. be passed in
// 2. be populated with all folders for the user
func CollectFolders( func CollectFolders(
ctx context.Context, ctx context.Context,
qp graph.QueryParams, qp graph.QueryParams,
collections map[string]*Collection, collections map[string]*Collection,
statusUpdater support.StatusUpdater, statusUpdater support.StatusUpdater,
resolver graph.ContainerResolver,
) error { ) error {
var ( var (
query GraphQuery query GraphQuery
@ -187,6 +192,7 @@ func CollectFolders(
errUpdater, errUpdater,
collections, collections,
statusUpdater, statusUpdater,
resolver,
) )
iterateFailure := pageIterator.Iterate(ctx, callbackFunc) iterateFailure := pageIterator.Iterate(ctx, callbackFunc)

View File

@ -28,6 +28,40 @@ type Service interface {
ErrPolicy() bool ErrPolicy() bool
} }
// Idable represents objects that implement msgraph-sdk-go/models.entityable
// and have the concept of an ID.
type Idable interface {
GetId() *string
}
// Descendable represents objects that implement msgraph-sdk-go/models.entityable
// and have the concept of a "parent folder".
type Descendable interface {
Idable
GetParentFolderId() *string
}
// Displayable represents objects that implement msgraph-sdk-go/models.entityable
// and have the concept of a display name.
type Displayable interface {
Idable
GetDisplayName() *string
}
type Container interface {
Descendable
Displayable
}
// CachedContainer is used for local unit tests but also makes it so that this
// code can be broken into generic- and service-specific chunks later on to
// reuse logic in IDToPath.
type CachedContainer interface {
Container
Path() *path.Builder
SetPath(*path.Builder)
}
// ContainerResolver houses functions for getting information about containers // ContainerResolver houses functions for getting information about containers
// from remote APIs (i.e. resolve folder paths with Graph API). Resolvers may // from remote APIs (i.e. resolve folder paths with Graph API). Resolvers may
// cache information about containers. // cache information about containers.
@ -41,4 +75,7 @@ type ContainerResolver interface {
// @param baseFolderID represents the M365ID base that the resolver will // @param baseFolderID represents the M365ID base that the resolver will
// conclude its search. Default input is "". // conclude its search. Default input is "".
Populate(ctx context.Context, baseFolderID string) error Populate(ctx context.Context, baseFolderID string) error
AddToCache(ctx context.Context, m365Container Container) error
// Items returns the containers in the cache.
Items() []CachedContainer
} }

View File

@ -8,6 +8,7 @@ import (
"runtime/trace" "runtime/trace"
"sync" "sync"
"github.com/hashicorp/go-multierror"
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go" msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core" msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
@ -22,6 +23,7 @@ import (
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
) )
@ -272,6 +274,105 @@ func (gc *GraphConnector) RestoreDataCollections(
return deets, err return deets, err
} }
func scopeToPathCategory(scope selectors.ExchangeScope) path.CategoryType {
if scope.IncludesCategory(selectors.ExchangeMail) {
return path.EmailCategory
}
if scope.IncludesCategory(selectors.ExchangeContact) {
return path.ContactsCategory
}
if scope.IncludesCategory(selectors.ExchangeEvent) {
return path.EventsCategory
}
return path.UnknownCategory
}
func (gc *GraphConnector) fetchItemsByFolder(
ctx context.Context,
qp graph.QueryParams,
resolver graph.ContainerResolver,
) (map[string]*exchange.Collection, error) {
var errs *multierror.Error
collections := map[string]*exchange.Collection{}
// This gets the collections, but does not get the items in the
// collection.
err := exchange.CollectionsFromResolver(
ctx,
qp,
resolver,
gc.UpdateStatus,
collections,
)
if err != nil {
return nil, errors.Wrap(err, "getting target collections")
}
for id, col := range collections {
// Fetch items for said collection.
err := exchange.AddItemsToCollection(ctx, gc.Service(), qp.User, id, col)
if err != nil {
errs = multierror.Append(errs, errors.Wrapf(
err,
"fetching items for collection %s with ID %s",
col.FullPath().String(),
id,
))
}
}
return collections, errs.ErrorOrNil()
}
func (gc *GraphConnector) legacyFetchItems(
ctx context.Context,
scope selectors.ExchangeScope,
qp graph.QueryParams,
resolver graph.ContainerResolver,
) (map[string]*exchange.Collection, error) {
var (
errs error
collections = map[string]*exchange.Collection{}
)
transformer, query, gIter, err := exchange.SetupExchangeCollectionVars(scope)
if err != nil {
return nil, support.WrapAndAppend(gc.Service().Adapter().GetBaseUrl(), err, nil)
}
response, err := query(ctx, &gc.graphService, qp.User)
if err != nil {
return nil, errors.Wrapf(
err,
"user %s M365 query: %s",
qp.User, support.ConnectorStackErrorTrace(err))
}
pageIterator, err := msgraphgocore.NewPageIterator(response, &gc.graphService.adapter, transformer)
if err != nil {
return nil, err
}
errUpdater := func(id string, err error) {
errs = support.WrapAndAppend(id, err, errs)
}
// callbackFunc iterates through all M365 object target and fills exchange.Collection.jobs[]
// with corresponding item M365IDs. New collections are created for each directory.
// Each directory used the M365 Identifier. The use of ID stops collisions betweens users
callbackFunc := gIter(ctx, qp, errUpdater, collections, gc.UpdateStatus, resolver)
iterateError := pageIterator.Iterate(ctx, callbackFunc)
if iterateError != nil {
errs = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), iterateError, errs)
}
return collections, errs
}
// createCollection - utility function that retrieves M365 // createCollection - utility function that retrieves M365
// IDs through Microsoft Graph API. The selectors.ExchangeScope // IDs through Microsoft Graph API. The selectors.ExchangeScope
// determines the type of collections that are stored. // determines the type of collections that are stored.
@ -280,56 +381,46 @@ func (gc *GraphConnector) createCollections(
ctx context.Context, ctx context.Context,
scope selectors.ExchangeScope, scope selectors.ExchangeScope,
) ([]*exchange.Collection, error) { ) ([]*exchange.Collection, error) {
var ( var errs *multierror.Error
errs error
transformer, query, gIter, err = exchange.SetupExchangeCollectionVars(scope)
)
if err != nil {
return nil, support.WrapAndAppend(gc.Service().Adapter().GetBaseUrl(), err, nil)
}
users := scope.Get(selectors.ExchangeUser) users := scope.Get(selectors.ExchangeUser)
allCollections := make([]*exchange.Collection, 0) allCollections := make([]*exchange.Collection, 0)
// Create collection of ExchangeDataCollection // Create collection of ExchangeDataCollection
for _, user := range users { for _, user := range users {
var collections map[string]*exchange.Collection
qp := graph.QueryParams{ qp := graph.QueryParams{
User: user, User: user,
Scope: scope, Scope: scope,
FailFast: gc.failFast, FailFast: gc.failFast,
Credentials: gc.credentials, Credentials: gc.credentials,
} }
collections := make(map[string]*exchange.Collection)
response, err := query(ctx, &gc.graphService, qp.User) // Currently only mail has a folder cache implemented.
resolver, err := exchange.MaybeGetAndPopulateFolderResolver(
ctx,
qp,
scopeToPathCategory(scope),
)
if err != nil { if err != nil {
return nil, errors.Wrapf( return nil, errors.Wrap(err, "getting folder cache")
err,
"user %s M365 query: %s",
qp.User, support.ConnectorStackErrorTrace(err))
} }
pageIterator, err := msgraphgocore.NewPageIterator(response, &gc.graphService.adapter, transformer) if scopeToPathCategory(scope) == path.EmailCategory {
if err != nil { if resolver == nil {
return nil, err return nil, errors.New("unable to create mail folder resolver")
} }
errUpdater := func(id string, err error) { collections, err = gc.fetchItemsByFolder(ctx, qp, resolver)
errs = support.WrapAndAppend(id, err, errs) if err != nil {
} errs = multierror.Append(errs, err)
}
// callbackFunc iterates through all M365 object target and fills exchange.Collection.jobs[] } else {
// with corresponding item M365IDs. New collections are created for each directory. collections, err = gc.legacyFetchItems(ctx, scope, qp, resolver)
// Each directory used the M365 Identifier. The use of ID stops collisions betweens users // Preserving previous behavior.
callbackFunc := gIter(ctx, qp, errUpdater, collections, gc.UpdateStatus) if err != nil {
iterateError := pageIterator.Iterate(ctx, callbackFunc) return nil, err // return error if snapshot is incomplete
}
if iterateError != nil {
errs = support.WrapAndAppend(gc.graphService.adapter.GetBaseUrl(), iterateError, errs)
}
if errs != nil {
return nil, errs // return error if snapshot is incomplete
} }
for _, collection := range collections { for _, collection := range collections {
@ -339,7 +430,7 @@ func (gc *GraphConnector) createCollections(
} }
} }
return allCollections, errs return allCollections, errs.ErrorOrNil()
} }
// AwaitStatus waits for all gc tasks to complete and then returns status // AwaitStatus waits for all gc tasks to complete and then returns status

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/exchange" "github.com/alcionai/corso/src/internal/connector/exchange"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/connector/mockconnector"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
@ -316,6 +317,80 @@ func (suite *GraphConnectorIntegrationSuite) TestAccessOfInboxAllUsers() {
} }
} }
func (suite *GraphConnectorIntegrationSuite) TestMailFetch() {
ctx, flush := tester.NewContext()
defer flush()
var (
t = suite.T()
userID = tester.M365UserID(t)
sel = selectors.NewExchangeBackup()
)
tests := []struct {
name string
scope selectors.ExchangeScope
folderNames map[string]struct{}
}{
{
name: "Mail Iterative Check",
scope: sel.MailFolders([]string{userID}, selectors.Any())[0],
folderNames: map[string]struct{}{
exchange.DefaultMailFolder: {},
"Sent Items": {},
},
},
{
name: "Folder Iterative Check Mail",
scope: sel.MailFolders(
[]string{userID},
[]string{exchange.DefaultMailFolder},
)[0],
folderNames: map[string]struct{}{
exchange.DefaultMailFolder: {},
},
},
}
gc := loadConnector(ctx, t)
for _, test := range tests {
suite.T().Run(test.name, func(t *testing.T) {
qp := graph.QueryParams{
User: userID,
Scope: test.scope,
Credentials: gc.credentials,
FailFast: false,
}
resolver, err := exchange.MaybeGetAndPopulateFolderResolver(
ctx,
qp,
scopeToPathCategory(qp.Scope),
)
require.NoError(t, err)
collections, err := gc.fetchItemsByFolder(
ctx,
qp,
resolver,
)
require.NoError(t, err)
for _, c := range collections {
require.NotEmpty(t, c.FullPath().Folder())
folder := c.FullPath().Folder()
if _, ok := test.folderNames[folder]; ok {
delete(test.folderNames, folder)
}
}
assert.Empty(t, test.folderNames)
})
}
}
///------------------------------------------------------------ ///------------------------------------------------------------
// Exchange Functions // Exchange Functions
//------------------------------------------------------- //-------------------------------------------------------