refactor exchange restore to use interfaces (#3456)

refactors exchange restore from near-duplicate per-category functions and switch-based process trees with interfaces.

At the top of restoring all collections, each category creates a categoryRestoreHandler to supply the necessary restore behavior.  The appropriate handler gets passed in to the collection restore, and all restore code after that takes a single path using a common restore interface to switch between categorical behavior.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)

* #1996

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-06-01 18:58:09 -06:00 committed by GitHub
parent 6b7502bcb8
commit cdf26b7988
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1325 additions and 1073 deletions

View File

@ -65,8 +65,7 @@ func (gc *GraphConnector) ProduceBackupCollections(
ctx, ctx,
gc.Discovery.Users(), gc.Discovery.Users(),
path.ServiceType(sels.Service), path.ServiceType(sels.Service),
sels.DiscreteOwner, sels.DiscreteOwner)
)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -90,10 +89,11 @@ func (gc *GraphConnector) ProduceBackupCollections(
case selectors.ServiceExchange: case selectors.ServiceExchange:
colls, ssmb, err = exchange.DataCollections( colls, ssmb, err = exchange.DataCollections(
ctx, ctx,
gc.Discovery,
sels, sels,
gc.credentials.AzureTenantID,
owner, owner,
metadata, metadata,
gc.credentials,
gc.UpdateStatus, gc.UpdateStatus,
ctrlOpts, ctrlOpts,
errs) errs)

View File

@ -21,6 +21,7 @@ import (
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
selTD "github.com/alcionai/corso/src/pkg/selectors/testdata" selTD "github.com/alcionai/corso/src/pkg/selectors/testdata"
"github.com/alcionai/corso/src/pkg/services/m365/api"
) )
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -29,8 +30,10 @@ import (
type DataCollectionIntgSuite struct { type DataCollectionIntgSuite struct {
tester.Suite tester.Suite
user string user string
site string site string
tenantID string
ac api.Client
} }
func TestDataCollectionIntgSuite(t *testing.T) { func TestDataCollectionIntgSuite(t *testing.T) {
@ -42,10 +45,19 @@ func TestDataCollectionIntgSuite(t *testing.T) {
} }
func (suite *DataCollectionIntgSuite) SetupSuite() { func (suite *DataCollectionIntgSuite) SetupSuite() {
suite.user = tester.M365UserID(suite.T()) t := suite.T()
suite.site = tester.M365SiteID(suite.T())
tester.LogTimeOfTest(suite.T()) suite.user = tester.M365UserID(t)
suite.site = tester.M365SiteID(t)
acct := tester.NewM365Account(t)
creds, err := acct.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.tenantID = creds.AzureTenantID
suite.ac, err = api.NewClient(creds)
require.NoError(t, err, clues.ToCore(err))
} }
// TestExchangeDataCollection verifies interface between operation and // TestExchangeDataCollection verifies interface between operation and
@ -111,16 +123,18 @@ func (suite *DataCollectionIntgSuite) TestExchangeDataCollection() {
defer flush() defer flush()
sel := test.getSelector(t) sel := test.getSelector(t)
uidn := inMock.NewProvider(sel.ID(), sel.Name())
ctrlOpts := control.Defaults() ctrlOpts := control.Defaults()
ctrlOpts.ToggleFeatures.DisableDelta = !canMakeDeltaQueries ctrlOpts.ToggleFeatures.DisableDelta = !canMakeDeltaQueries
collections, excludes, err := exchange.DataCollections( collections, excludes, err := exchange.DataCollections(
ctx, ctx,
suite.ac,
sel, sel,
sel, suite.tenantID,
uidn,
nil, nil,
connector.credentials,
connector.UpdateStatus, connector.UpdateStatus,
ctrlOpts, ctrlOpts,
fault.New(true)) fault.New(true))
@ -133,7 +147,7 @@ func (suite *DataCollectionIntgSuite) TestExchangeDataCollection() {
// Categories with delta endpoints will produce a collection for metadata // Categories with delta endpoints will produce a collection for metadata
// as well as the actual data pulled, and the "temp" root collection. // as well as the actual data pulled, and the "temp" root collection.
assert.GreaterOrEqual(t, len(collections), 1, "expected 1 <= num collections <= 2") assert.LessOrEqual(t, 1, len(collections), "expected 1 <= num collections <= 3")
assert.GreaterOrEqual(t, 3, len(collections), "expected 1 <= num collections <= 3") assert.GreaterOrEqual(t, 3, len(collections), "expected 1 <= num collections <= 3")
for _, col := range collections { for _, col := range collections {

View File

@ -71,9 +71,9 @@ func (cfc *contactFolderCache) Populate(
ctx context.Context, ctx context.Context,
errs *fault.Bus, errs *fault.Bus,
baseID string, baseID string,
baseContainerPather ...string, baseContainerPath ...string,
) error { ) error {
if err := cfc.init(ctx, baseID, baseContainerPather); err != nil { if err := cfc.init(ctx, baseID, baseContainerPath); err != nil {
return clues.Wrap(err, "initializing") return clues.Wrap(err, "initializing")
} }
@ -95,7 +95,7 @@ func (cfc *contactFolderCache) init(
baseContainerPath []string, baseContainerPath []string,
) error { ) error {
if len(baseNode) == 0 { if len(baseNode) == 0 {
return clues.New("m365 folderID required for base folder").WithClues(ctx) return clues.New("m365 folderID required for base contact folder").WithClues(ctx)
} }
if cfc.containerResolver == nil { if cfc.containerResolver == nil {

View File

@ -0,0 +1,40 @@
package exchange
import (
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
var _ backupHandler = &contactBackupHandler{}
type contactBackupHandler struct {
ac api.Contacts
}
func newContactBackupHandler(
ac api.Client,
) contactBackupHandler {
acc := ac.Contacts()
return contactBackupHandler{
ac: acc,
}
}
func (h contactBackupHandler) itemEnumerator() addedAndRemovedItemGetter {
return h.ac
}
func (h contactBackupHandler) itemHandler() itemGetterSerializer {
return h.ac
}
func (h contactBackupHandler) NewContainerCache(
userID string,
) (string, graph.ContainerResolver) {
return DefaultContactFolder, &contactFolderCache{
userID: userID,
enumer: h.ac,
getter: h.ac,
}
}

View File

@ -0,0 +1,86 @@
package exchange
import (
"context"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
var _ itemRestorer = &contactRestoreHandler{}
type contactRestoreHandler struct {
ac api.Contacts
ip itemPoster[models.Contactable]
}
func newContactRestoreHandler(
ac api.Client,
) contactRestoreHandler {
return contactRestoreHandler{
ac: ac.Contacts(),
ip: ac.Contacts(),
}
}
func (h contactRestoreHandler) newContainerCache(userID string) graph.ContainerResolver {
return &contactFolderCache{
userID: userID,
enumer: h.ac,
getter: h.ac,
}
}
func (h contactRestoreHandler) formatRestoreDestination(
destinationContainerName string,
_ path.Path, // contact folders cannot be nested
) *path.Builder {
return path.Builder{}.Append(destinationContainerName)
}
func (h contactRestoreHandler) CreateContainer(
ctx context.Context,
userID, containerName, _ string, // parent container not used
) (graph.Container, error) {
return h.ac.CreateContainer(ctx, userID, containerName, "")
}
func (h contactRestoreHandler) containerSearcher() containerByNamer {
return nil
}
// always returns the provided value
func (h contactRestoreHandler) orRootContainer(c string) string {
return c
}
func (h contactRestoreHandler) restore(
ctx context.Context,
body []byte,
userID, destinationID string,
errs *fault.Bus,
) (*details.ExchangeInfo, error) {
contact, err := api.BytesToContactable(body)
if err != nil {
return nil, graph.Wrap(ctx, err, "creating contact from bytes")
}
ctx = clues.Add(ctx, "item_id", ptr.Val(contact.GetId()))
item, err := h.ip.PostItem(ctx, userID, destinationID, contact)
if err != nil {
return nil, graph.Wrap(ctx, err, "restoring mail message")
}
info := api.ContactInfo(item)
info.Size = int64(len(body))
return info, nil
}

View File

@ -0,0 +1,57 @@
package exchange
import (
"testing"
"github.com/alcionai/clues"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
type ContactsRestoreIntgSuite struct {
tester.Suite
creds account.M365Config
ac api.Client
userID string
}
func TestContactsRestoreIntgSuite(t *testing.T) {
suite.Run(t, &ContactsRestoreIntgSuite{
Suite: tester.NewIntegrationSuite(
t,
[][]string{tester.M365AcctCredEnvs}),
})
}
func (suite *ContactsRestoreIntgSuite) SetupSuite() {
t := suite.T()
a := tester.NewM365Account(t)
creds, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.creds = creds
suite.ac, err = api.NewClient(creds)
require.NoError(t, err, clues.ToCore(err))
suite.userID = tester.M365UserID(t)
}
// Testing to ensure that cache system works for in multiple different environments
func (suite *ContactsRestoreIntgSuite) TestCreateContainerDestination() {
runCreateDestinationTest(
suite.T(),
newMailRestoreHandler(suite.ac),
path.EmailCategory,
suite.creds.AzureTenantID,
suite.userID,
tester.DefaultTestRestoreDestination("").ContainerName,
[]string{"Hufflepuff"},
[]string{"Ravenclaw"})
}

View File

@ -15,7 +15,6 @@ import (
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
) )
@ -676,181 +675,69 @@ func (suite *ConfiguredFolderCacheUnitSuite) TestAddToCache() {
// integration suite // integration suite
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
type FolderCacheIntegrationSuite struct { func runCreateDestinationTest(
tester.Suite t *testing.T,
credentials account.M365Config handler restoreHandler,
gs graph.Servicer category path.CategoryType,
} tenantID, userID, destinationName string,
containerNames1 []string,
func TestFolderCacheIntegrationSuite(t *testing.T) { containerNames2 []string,
suite.Run(t, &FolderCacheIntegrationSuite{ ) {
Suite: tester.NewIntegrationSuite( ctx, flush := tester.NewContext(t)
t, defer flush()
[][]string{tester.M365AcctCredEnvs},
),
})
}
func (suite *FolderCacheIntegrationSuite) SetupSuite() {
t := suite.T()
a := tester.NewM365Account(t)
m365, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.credentials = m365
adpt, err := graph.CreateAdapter(
m365.AzureTenantID,
m365.AzureClientID,
m365.AzureClientSecret)
require.NoError(t, err, clues.ToCore(err))
suite.gs = graph.NewService(adpt)
}
// Testing to ensure that cache system works for in multiple different environments
func (suite *FolderCacheIntegrationSuite) TestCreateContainerDestination() {
a := tester.NewM365Account(suite.T())
m365, err := a.M365Config()
require.NoError(suite.T(), err, clues.ToCore(err))
var ( var (
user = tester.M365UserID(suite.T()) svc = path.ExchangeService
directoryCaches = make(map[path.CategoryType]graph.ContainerResolver) gcr = handler.newContainerCache(userID)
folderName = tester.DefaultTestRestoreDestination("").ContainerName
tests = []struct {
name string
pathFunc1 func(t *testing.T) path.Path
pathFunc2 func(t *testing.T) path.Path
category path.CategoryType
folderPrefix string
}{
{
name: "Mail Cache Test",
category: path.EmailCategory,
pathFunc1: func(t *testing.T) path.Path {
pth, err := path.Build(
suite.credentials.AzureTenantID,
user,
path.ExchangeService,
path.EmailCategory,
false,
"Griffindor", "Croix")
require.NoError(t, err, clues.ToCore(err))
return pth
},
pathFunc2: func(t *testing.T) path.Path {
pth, err := path.Build(
suite.credentials.AzureTenantID,
user,
path.ExchangeService,
path.EmailCategory,
false,
"Griffindor", "Felicius")
require.NoError(t, err, clues.ToCore(err))
return pth
},
},
{
name: "Contact Cache Test",
category: path.ContactsCategory,
pathFunc1: func(t *testing.T) path.Path {
pth, err := path.Build(
suite.credentials.AzureTenantID,
user,
path.ExchangeService,
path.ContactsCategory,
false,
"HufflePuff")
require.NoError(t, err, clues.ToCore(err))
return pth
},
pathFunc2: func(t *testing.T) path.Path {
pth, err := path.Build(
suite.credentials.AzureTenantID,
user,
path.ExchangeService,
path.ContactsCategory,
false,
"Ravenclaw")
require.NoError(t, err, clues.ToCore(err))
return pth
},
},
{
name: "Event Cache Test",
category: path.EventsCategory,
pathFunc1: func(t *testing.T) path.Path {
pth, err := path.Build(
suite.credentials.AzureTenantID,
user,
path.ExchangeService,
path.EventsCategory,
false,
"Durmstrang")
require.NoError(t, err, clues.ToCore(err))
return pth
},
pathFunc2: func(t *testing.T) path.Path {
pth, err := path.Build(
suite.credentials.AzureTenantID,
user,
path.ExchangeService,
path.EventsCategory,
false,
"Beauxbatons")
require.NoError(t, err, clues.ToCore(err))
return pth
},
},
}
) )
for _, test := range tests { path1, err := path.Build(
suite.Run(test.name, func() { tenantID,
t := suite.T() userID,
svc,
category,
false,
containerNames1...)
require.NoError(t, err, clues.ToCore(err))
ctx, flush := tester.NewContext(t) containerID, gcr, err := createDestination(
defer flush() ctx,
handler,
handler.formatRestoreDestination(destinationName, path1),
userID,
gcr,
true,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
folderID, err := CreateContainerDestination( _, _, err = gcr.IDToPath(ctx, containerID)
ctx, assert.NoError(t, err, clues.ToCore(err))
m365,
test.pathFunc1(t),
folderName,
directoryCaches,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
resolver := directoryCaches[test.category] path2, err := path.Build(
tenantID,
userID,
svc,
category,
false,
containerNames2...)
require.NoError(t, err, clues.ToCore(err))
_, _, err = resolver.IDToPath(ctx, folderID) containerID, gcr, err = createDestination(
assert.NoError(t, err, clues.ToCore(err)) ctx,
handler,
handler.formatRestoreDestination(destinationName, path2),
userID,
gcr,
false,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
secondID, err := CreateContainerDestination( p, l, err := gcr.IDToPath(ctx, containerID)
ctx, require.NoError(t, err, clues.ToCore(err))
m365,
test.pathFunc2(t),
folderName,
directoryCaches,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
p, l, err := resolver.IDToPath(ctx, secondID) _, ok := gcr.LocationInCache(l.String())
require.NoError(t, err, clues.ToCore(err)) require.True(t, ok, "looking for location in cache: %s", l)
_, ok := resolver.LocationInCache(l.String()) _, ok = gcr.PathInCache(p.String())
require.True(t, ok, "looking for location in cache: %s", l) require.True(t, ok, "looking for path in cache: %s", p)
_, ok = resolver.PathInCache(p.String())
require.True(t, ok, "looking for path in cache: %s", p)
})
}
} }

View File

@ -12,7 +12,6 @@ import (
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
@ -159,15 +158,13 @@ func parseMetadataCollections(
// DataCollections returns a DataCollection which the caller can // DataCollections returns a DataCollection which the caller can
// use to read mailbox data out for the specified user // use to read mailbox data out for the specified user
// Assumption: User exists
//
// Add iota to this call -> mail, contacts, calendar, etc.
func DataCollections( func DataCollections(
ctx context.Context, ctx context.Context,
ac api.Client,
selector selectors.Selector, selector selectors.Selector,
tenantID string,
user idname.Provider, user idname.Provider,
metadata []data.RestoreCollection, metadata []data.RestoreCollection,
acct account.M365Config,
su support.StatusUpdater, su support.StatusUpdater,
ctrlOpts control.Options, ctrlOpts control.Options,
errs *fault.Bus, errs *fault.Bus,
@ -181,6 +178,7 @@ func DataCollections(
collections = []data.BackupCollection{} collections = []data.BackupCollection{}
el = errs.Local() el = errs.Local()
categories = map[path.CategoryType]struct{}{} categories = map[path.CategoryType]struct{}{}
handlers = BackupHandlers(ac)
) )
// Turn on concurrency limiter middleware for exchange backups // Turn on concurrency limiter middleware for exchange backups
@ -201,7 +199,8 @@ func DataCollections(
dcs, err := createCollections( dcs, err := createCollections(
ctx, ctx,
acct, handlers,
tenantID,
user, user,
scope, scope,
cdps[scope.Category().PathType()], cdps[scope.Category().PathType()],
@ -222,7 +221,7 @@ func DataCollections(
baseCols, err := graph.BaseCollections( baseCols, err := graph.BaseCollections(
ctx, ctx,
collections, collections,
acct.AzureTenantID, tenantID,
user.ID(), user.ID(),
path.ExchangeService, path.ExchangeService,
categories, categories,
@ -238,25 +237,13 @@ func DataCollections(
return collections, nil, el.Failure() return collections, nil, el.Failure()
} }
func getterByType(ac api.Client, category path.CategoryType) (addedAndRemovedItemIDsGetter, error) {
switch category {
case path.EmailCategory:
return ac.Mail(), nil
case path.EventsCategory:
return ac.Events(), nil
case path.ContactsCategory:
return ac.Contacts(), nil
default:
return nil, clues.New("no api client registered for category")
}
}
// createCollections - utility function that retrieves M365 // createCollections - utility function that retrieves M365
// IDs through Microsoft Graph API. The selectors.ExchangeScope // IDs through Microsoft Graph API. The selectors.ExchangeScope
// determines the type of collections that are retrieved. // determines the type of collections that are retrieved.
func createCollections( func createCollections(
ctx context.Context, ctx context.Context,
creds account.M365Config, handlers map[path.CategoryType]backupHandler,
tenantID string,
user idname.Provider, user idname.Provider,
scope selectors.ExchangeScope, scope selectors.ExchangeScope,
dps DeltaPaths, dps DeltaPaths,
@ -264,27 +251,21 @@ func createCollections(
su support.StatusUpdater, su support.StatusUpdater,
errs *fault.Bus, errs *fault.Bus,
) ([]data.BackupCollection, error) { ) ([]data.BackupCollection, error) {
ctx = clues.Add(ctx, "category", scope.Category().PathType())
var ( var (
allCollections = make([]data.BackupCollection, 0) allCollections = make([]data.BackupCollection, 0)
category = scope.Category().PathType() category = scope.Category().PathType()
qp = graph.QueryParams{
Category: category,
ResourceOwner: user,
TenantID: tenantID,
}
) )
ac, err := api.NewClient(creds) handler, ok := handlers[category]
if err != nil { if !ok {
return nil, clues.Wrap(err, "getting api client").WithClues(ctx) return nil, clues.New("unsupported backup category type").WithClues(ctx)
}
ctx = clues.Add(ctx, "category", category)
getter, err := getterByType(ac, category)
if err != nil {
return nil, clues.Stack(err).WithClues(ctx)
}
qp := graph.QueryParams{
Category: category,
ResourceOwner: user,
Credentials: creds,
} }
foldersComplete, closer := observe.MessageWithCompletion( foldersComplete, closer := observe.MessageWithCompletion(
@ -293,17 +274,18 @@ func createCollections(
defer closer() defer closer()
defer close(foldersComplete) defer close(foldersComplete)
resolver, err := PopulateExchangeContainerResolver(ctx, qp, errs) rootFolder, cc := handler.NewContainerCache(user.ID())
if err != nil {
if err := cc.Populate(ctx, errs, rootFolder); err != nil {
return nil, clues.Wrap(err, "populating container cache") return nil, clues.Wrap(err, "populating container cache")
} }
collections, err := filterContainersAndFillCollections( collections, err := filterContainersAndFillCollections(
ctx, ctx,
qp, qp,
getter, handler,
su, su,
resolver, cc,
scope, scope,
dps, dps,
ctrlOpts, ctrlOpts,

View File

@ -222,8 +222,10 @@ func newStatusUpdater(t *testing.T, wg *sync.WaitGroup) func(status *support.Con
type DataCollectionsIntegrationSuite struct { type DataCollectionsIntegrationSuite struct {
tester.Suite tester.Suite
user string user string
site string site string
tenantID string
ac api.Client
} }
func TestDataCollectionsIntegrationSuite(t *testing.T) { func TestDataCollectionsIntegrationSuite(t *testing.T) {
@ -239,18 +241,25 @@ func (suite *DataCollectionsIntegrationSuite) SetupSuite() {
suite.user = tester.M365UserID(suite.T()) suite.user = tester.M365UserID(suite.T())
suite.site = tester.M365SiteID(suite.T()) suite.site = tester.M365SiteID(suite.T())
acct := tester.NewM365Account(suite.T())
creds, err := acct.M365Config()
require.NoError(suite.T(), err, clues.ToCore(err))
suite.ac, err = api.NewClient(creds)
require.NoError(suite.T(), err, clues.ToCore(err))
suite.tenantID = creds.AzureTenantID
tester.LogTimeOfTest(suite.T()) tester.LogTimeOfTest(suite.T())
} }
func (suite *DataCollectionsIntegrationSuite) TestMailFetch() { func (suite *DataCollectionsIntegrationSuite) TestMailFetch() {
var ( var (
userID = tester.M365UserID(suite.T()) userID = tester.M365UserID(suite.T())
users = []string{userID} users = []string{userID}
acct, err = tester.NewM365Account(suite.T()).M365Config() handlers = BackupHandlers(suite.ac)
) )
require.NoError(suite.T(), err, clues.ToCore(err))
tests := []struct { tests := []struct {
name string name string
scope selectors.ExchangeScope scope selectors.ExchangeScope
@ -293,7 +302,8 @@ func (suite *DataCollectionsIntegrationSuite) TestMailFetch() {
collections, err := createCollections( collections, err := createCollections(
ctx, ctx,
acct, handlers,
suite.tenantID,
inMock.NewProvider(userID, userID), inMock.NewProvider(userID, userID),
test.scope, test.scope,
DeltaPaths{}, DeltaPaths{},
@ -329,13 +339,11 @@ func (suite *DataCollectionsIntegrationSuite) TestMailFetch() {
func (suite *DataCollectionsIntegrationSuite) TestDelta() { func (suite *DataCollectionsIntegrationSuite) TestDelta() {
var ( var (
userID = tester.M365UserID(suite.T()) userID = tester.M365UserID(suite.T())
users = []string{userID} users = []string{userID}
acct, err = tester.NewM365Account(suite.T()).M365Config() handlers = BackupHandlers(suite.ac)
) )
require.NoError(suite.T(), err, clues.ToCore(err))
tests := []struct { tests := []struct {
name string name string
scope selectors.ExchangeScope scope selectors.ExchangeScope
@ -372,7 +380,8 @@ func (suite *DataCollectionsIntegrationSuite) TestDelta() {
// get collections without providing any delta history (ie: full backup) // get collections without providing any delta history (ie: full backup)
collections, err := createCollections( collections, err := createCollections(
ctx, ctx,
acct, handlers,
suite.tenantID,
inMock.NewProvider(userID, userID), inMock.NewProvider(userID, userID),
test.scope, test.scope,
DeltaPaths{}, DeltaPaths{},
@ -403,7 +412,8 @@ func (suite *DataCollectionsIntegrationSuite) TestDelta() {
// which should only contain the difference. // which should only contain the difference.
collections, err = createCollections( collections, err = createCollections(
ctx, ctx,
acct, handlers,
suite.tenantID,
inMock.NewProvider(userID, userID), inMock.NewProvider(userID, userID),
test.scope, test.scope,
dps, dps,
@ -438,19 +448,18 @@ func (suite *DataCollectionsIntegrationSuite) TestMailSerializationRegression()
defer flush() defer flush()
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
users = []string{suite.user} users = []string{suite.user}
handlers = BackupHandlers(suite.ac)
) )
acct, err := tester.NewM365Account(t).M365Config()
require.NoError(t, err, clues.ToCore(err))
sel := selectors.NewExchangeBackup(users) sel := selectors.NewExchangeBackup(users)
sel.Include(sel.MailFolders([]string{DefaultMailFolder}, selectors.PrefixMatch())) sel.Include(sel.MailFolders([]string{DefaultMailFolder}, selectors.PrefixMatch()))
collections, err := createCollections( collections, err := createCollections(
ctx, ctx,
acct, handlers,
suite.tenantID,
inMock.NewProvider(suite.user, suite.user), inMock.NewProvider(suite.user, suite.user),
sel.Scopes()[0], sel.Scopes()[0],
DeltaPaths{}, DeltaPaths{},
@ -497,10 +506,10 @@ func (suite *DataCollectionsIntegrationSuite) TestMailSerializationRegression()
// and to store contact within Collection. Downloaded contacts are run through // and to store contact within Collection. Downloaded contacts are run through
// a regression test to ensure that downloaded items can be uploaded. // a regression test to ensure that downloaded items can be uploaded.
func (suite *DataCollectionsIntegrationSuite) TestContactSerializationRegression() { func (suite *DataCollectionsIntegrationSuite) TestContactSerializationRegression() {
acct, err := tester.NewM365Account(suite.T()).M365Config() var (
require.NoError(suite.T(), err, clues.ToCore(err)) users = []string{suite.user}
handlers = BackupHandlers(suite.ac)
users := []string{suite.user} )
tests := []struct { tests := []struct {
name string name string
@ -525,7 +534,8 @@ func (suite *DataCollectionsIntegrationSuite) TestContactSerializationRegression
edcs, err := createCollections( edcs, err := createCollections(
ctx, ctx,
acct, handlers,
suite.tenantID,
inMock.NewProvider(suite.user, suite.user), inMock.NewProvider(suite.user, suite.user),
test.scope, test.scope,
DeltaPaths{}, DeltaPaths{},
@ -589,17 +599,11 @@ func (suite *DataCollectionsIntegrationSuite) TestEventsSerializationRegression(
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
acct, err := tester.NewM365Account(t).M365Config()
require.NoError(t, err, clues.ToCore(err))
users := []string{suite.user}
ac, err := api.NewClient(acct)
require.NoError(t, err, "creating client", clues.ToCore(err))
var ( var (
calID string users = []string{suite.user}
bdayID string handlers = BackupHandlers(suite.ac)
calID string
bdayID string
) )
fn := func(gcf graph.CachedContainer) error { fn := func(gcf graph.CachedContainer) error {
@ -614,7 +618,7 @@ func (suite *DataCollectionsIntegrationSuite) TestEventsSerializationRegression(
return nil return nil
} }
err = ac.Events().EnumerateContainers(ctx, suite.user, DefaultCalendar, fn, fault.New(true)) err := suite.ac.Events().EnumerateContainers(ctx, suite.user, DefaultCalendar, fn, fault.New(true))
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
tests := []struct { tests := []struct {
@ -650,7 +654,8 @@ func (suite *DataCollectionsIntegrationSuite) TestEventsSerializationRegression(
collections, err := createCollections( collections, err := createCollections(
ctx, ctx,
acct, handlers,
suite.tenantID,
inMock.NewProvider(suite.user, suite.user), inMock.NewProvider(suite.user, suite.user),
test.scope, test.scope,
DeltaPaths{}, DeltaPaths{},

View File

@ -0,0 +1,40 @@
package exchange
import (
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
var _ backupHandler = &eventBackupHandler{}
type eventBackupHandler struct {
ac api.Events
}
func newEventBackupHandler(
ac api.Client,
) eventBackupHandler {
ace := ac.Events()
return eventBackupHandler{
ac: ace,
}
}
func (h eventBackupHandler) itemEnumerator() addedAndRemovedItemGetter {
return h.ac
}
func (h eventBackupHandler) itemHandler() itemGetterSerializer {
return h.ac
}
func (h eventBackupHandler) NewContainerCache(
userID string,
) (string, graph.ContainerResolver) {
return DefaultCalendar, &eventCalendarCache{
userID: userID,
enumer: h.ac,
getter: h.ac,
}
}

View File

@ -0,0 +1,109 @@
package exchange
import (
"context"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
var _ itemRestorer = &eventRestoreHandler{}
type eventRestoreHandler struct {
ac api.Events
ip itemPoster[models.Eventable]
}
func newEventRestoreHandler(
ac api.Client,
) eventRestoreHandler {
ace := ac.Events()
return eventRestoreHandler{
ac: ace,
ip: ace,
}
}
func (h eventRestoreHandler) newContainerCache(userID string) graph.ContainerResolver {
return &eventCalendarCache{
userID: userID,
enumer: h.ac,
getter: h.ac,
}
}
func (h eventRestoreHandler) formatRestoreDestination(
destinationContainerName string,
_ path.Path, // ignored because calendars cannot be nested
) *path.Builder {
return path.Builder{}.Append(destinationContainerName)
}
func (h eventRestoreHandler) CreateContainer(
ctx context.Context,
userID, containerName, _ string, // parent container not used
) (graph.Container, error) {
return h.ac.CreateContainer(ctx, userID, containerName, "")
}
func (h eventRestoreHandler) containerSearcher() containerByNamer {
return h.ac
}
// always returns the provided value
func (h eventRestoreHandler) orRootContainer(c string) string {
return c
}
func (h eventRestoreHandler) restore(
ctx context.Context,
body []byte,
userID, destinationID string,
errs *fault.Bus,
) (*details.ExchangeInfo, error) {
event, err := api.BytesToEventable(body)
if err != nil {
return nil, clues.Wrap(err, "creating event from bytes").WithClues(ctx)
}
ctx = clues.Add(ctx, "item_id", ptr.Val(event.GetId()))
event = toEventSimplified(event)
var attachments []models.Attachmentable
if ptr.Val(event.GetHasAttachments()) {
attachments = event.GetAttachments()
event.SetAttachments([]models.Attachmentable{})
}
item, err := h.ip.PostItem(ctx, userID, destinationID, event)
if err != nil {
return nil, graph.Wrap(ctx, err, "restoring mail message")
}
err = uploadAttachments(
ctx,
h.ac,
attachments,
userID,
destinationID,
ptr.Val(item.GetId()),
errs)
if err != nil {
return nil, clues.Stack(err)
}
info := api.EventInfo(event)
info.Size = int64(len(body))
return info, nil
}

View File

@ -0,0 +1,57 @@
package exchange
import (
"testing"
"github.com/alcionai/clues"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
type EventsRestoreIntgSuite struct {
tester.Suite
creds account.M365Config
ac api.Client
userID string
}
func TestEventsRestoreIntgSuite(t *testing.T) {
suite.Run(t, &EventsRestoreIntgSuite{
Suite: tester.NewIntegrationSuite(
t,
[][]string{tester.M365AcctCredEnvs}),
})
}
func (suite *EventsRestoreIntgSuite) SetupSuite() {
t := suite.T()
a := tester.NewM365Account(t)
creds, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.creds = creds
suite.ac, err = api.NewClient(creds)
require.NoError(t, err, clues.ToCore(err))
suite.userID = tester.M365UserID(t)
}
// Testing to ensure that cache system works for in multiple different environments
func (suite *EventsRestoreIntgSuite) TestCreateContainerDestination() {
runCreateDestinationTest(
suite.T(),
newMailRestoreHandler(suite.ac),
path.EmailCategory,
suite.creds.AzureTenantID,
suite.userID,
tester.DefaultTestRestoreDestination("").ContainerName,
[]string{"Durmstrang"},
[]string{"Beauxbatons"})
}

View File

@ -12,7 +12,6 @@ import (
"time" "time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoft/kiota-abstractions-go/serialization"
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
@ -37,20 +36,6 @@ const (
numberOfRetries = 4 numberOfRetries = 4
) )
type itemer interface {
GetItem(
ctx context.Context,
user, itemID string,
immutableIDs bool,
errs *fault.Bus,
) (serialization.Parsable, *details.ExchangeInfo, error)
Serialize(
ctx context.Context,
item serialization.Parsable,
user, itemID string,
) ([]byte, error)
}
// Collection implements the interface from data.Collection // Collection implements the interface from data.Collection
// Structure holds data for an Exchange application for a single user // Structure holds data for an Exchange application for a single user
type Collection struct { type Collection struct {
@ -63,7 +48,7 @@ type Collection struct {
// removed is a list of item IDs that were deleted from, or moved out, of a container // removed is a list of item IDs that were deleted from, or moved out, of a container
removed map[string]struct{} removed map[string]struct{}
items itemer items itemGetterSerializer
category path.CategoryType category path.CategoryType
statusUpdater support.StatusUpdater statusUpdater support.StatusUpdater
@ -98,7 +83,7 @@ func NewCollection(
curr, prev path.Path, curr, prev path.Path,
location *path.Builder, location *path.Builder,
category path.CategoryType, category path.CategoryType,
items itemer, items itemGetterSerializer,
statusUpdater support.StatusUpdater, statusUpdater support.StatusUpdater,
ctrlOpts control.Options, ctrlOpts control.Options,
doNotMergeItems bool, doNotMergeItems bool,

View File

@ -0,0 +1,131 @@
package exchange
import (
"context"
"github.com/microsoft/kiota-abstractions-go/serialization"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
// ---------------------------------------------------------------------------
// backup
// ---------------------------------------------------------------------------
type backupHandler interface {
itemEnumerator() addedAndRemovedItemGetter
itemHandler() itemGetterSerializer
NewContainerCache(userID string) (string, graph.ContainerResolver)
}
type addedAndRemovedItemGetter interface {
GetAddedAndRemovedItemIDs(
ctx context.Context,
user, containerID, oldDeltaToken string,
immutableIDs bool,
canMakeDeltaQueries bool,
) ([]string, []string, api.DeltaUpdate, error)
}
type itemGetterSerializer interface {
GetItem(
ctx context.Context,
user, itemID string,
immutableIDs bool,
errs *fault.Bus,
) (serialization.Parsable, *details.ExchangeInfo, error)
Serialize(
ctx context.Context,
item serialization.Parsable,
user, itemID string,
) ([]byte, error)
}
func BackupHandlers(ac api.Client) map[path.CategoryType]backupHandler {
return map[path.CategoryType]backupHandler{
path.ContactsCategory: newContactBackupHandler(ac),
path.EmailCategory: newMailBackupHandler(ac),
path.EventsCategory: newEventBackupHandler(ac),
}
}
// ---------------------------------------------------------------------------
// restore
// ---------------------------------------------------------------------------
type restoreHandler interface {
itemRestorer
containerAPI
newContainerCache(userID string) graph.ContainerResolver
formatRestoreDestination(
destinationContainerName string,
collectionFullPath path.Path,
) *path.Builder
}
// runs the item restoration (ie: item creation) process
// for a single item, whose summary contents are held in
// the body property.
type itemRestorer interface {
restore(
ctx context.Context,
body []byte,
userID, destinationID string,
errs *fault.Bus,
) (*details.ExchangeInfo, error)
}
// runs the actual graph API post request.
type itemPoster[T any] interface {
PostItem(
ctx context.Context,
userID, dirID string,
body T,
) (T, error)
}
// produces structs that interface with the graph/cache_container
// CachedContainer interface.
type containerAPI interface {
// POSTs the creation of a new container
CreateContainer(
ctx context.Context,
userID, containerName, parentContainerID string,
) (graph.Container, error)
// GETs a container by name.
// if containerByNamer is nil, this functionality is not supported
// and should be skipped by the caller.
// normally, we'd alias the func directly. The indirection here
// is because not all types comply with GetContainerByName.
containerSearcher() containerByNamer
// returns either the provided value (assumed to be the root
// folder for that cache tree), or the default root container
// (if the category uses a root folder that exists above the
// restore location path).
orRootContainer(string) string
}
type containerByNamer interface {
// searches for a container by name.
GetContainerByName(
ctx context.Context,
userID, containerName string,
) (graph.Container, error)
}
// primary interface controller for all per-cateogry restoration behavior.
func restoreHandlers(
ac api.Client,
) map[path.CategoryType]restoreHandler {
return map[path.CategoryType]restoreHandler{
path.ContactsCategory: newContactRestoreHandler(ac),
path.EmailCategory: newMailRestoreHandler(ac),
path.EventsCategory: newEventRestoreHandler(ac),
}
}

View File

@ -0,0 +1,40 @@
package exchange
import (
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
var _ backupHandler = &mailBackupHandler{}
type mailBackupHandler struct {
ac api.Mail
}
func newMailBackupHandler(
ac api.Client,
) mailBackupHandler {
acm := ac.Mail()
return mailBackupHandler{
ac: acm,
}
}
func (h mailBackupHandler) itemEnumerator() addedAndRemovedItemGetter {
return h.ac
}
func (h mailBackupHandler) itemHandler() itemGetterSerializer {
return h.ac
}
func (h mailBackupHandler) NewContainerCache(
userID string,
) (string, graph.ContainerResolver) {
return rootFolderAlias, &mailFolderCache{
userID: userID,
enumer: h.ac,
getter: h.ac,
}
}

View File

@ -0,0 +1,140 @@
package exchange
import (
"context"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/dttm"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
var _ itemRestorer = &mailRestoreHandler{}
type mailRestoreHandler struct {
ac api.Mail
ip itemPoster[models.Messageable]
}
func newMailRestoreHandler(
ac api.Client,
) mailRestoreHandler {
acm := ac.Mail()
return mailRestoreHandler{
ac: acm,
ip: acm,
}
}
func (h mailRestoreHandler) newContainerCache(userID string) graph.ContainerResolver {
return &mailFolderCache{
userID: userID,
enumer: h.ac,
getter: h.ac,
}
}
func (h mailRestoreHandler) formatRestoreDestination(
destinationContainerName string,
collectionFullPath path.Path,
) *path.Builder {
return path.Builder{}.Append(destinationContainerName).Append(collectionFullPath.Folders()...)
}
func (h mailRestoreHandler) CreateContainer(
ctx context.Context,
userID, containerName, parentContainerID string,
) (graph.Container, error) {
if len(parentContainerID) == 0 {
parentContainerID = rootFolderAlias
}
return h.ac.CreateContainer(ctx, userID, containerName, parentContainerID)
}
func (h mailRestoreHandler) containerSearcher() containerByNamer {
return nil
}
// always returns rootFolderAlias
func (h mailRestoreHandler) orRootContainer(string) string {
return rootFolderAlias
}
func (h mailRestoreHandler) restore(
ctx context.Context,
body []byte,
userID, destinationID string,
errs *fault.Bus,
) (*details.ExchangeInfo, error) {
msg, err := api.BytesToMessageable(body)
if err != nil {
return nil, clues.Wrap(err, "creating mail from bytes").WithClues(ctx)
}
ctx = clues.Add(ctx, "item_id", ptr.Val(msg.GetId()))
msg = setMessageSVEPs(toMessage(msg))
attachments := msg.GetAttachments()
// Item.Attachments --> HasAttachments doesn't always have a value populated when deserialized
msg.SetAttachments([]models.Attachmentable{})
item, err := h.ip.PostItem(ctx, userID, destinationID, msg)
if err != nil {
return nil, graph.Wrap(ctx, err, "restoring mail message")
}
err = uploadAttachments(
ctx,
h.ac,
attachments,
userID,
destinationID,
ptr.Val(item.GetId()),
errs)
if err != nil {
return nil, clues.Stack(err)
}
return api.MailInfo(msg, int64(len(body))), nil
}
func setMessageSVEPs(msg models.Messageable) models.Messageable {
// Set Extended Properties:
svlep := make([]models.SingleValueLegacyExtendedPropertyable, 0)
// prevent "resending" of the mail in the graph api backstore
sv1 := models.NewSingleValueLegacyExtendedProperty()
sv1.SetId(ptr.To(MailRestorePropertyTag))
sv1.SetValue(ptr.To(RestoreCanonicalEnableValue))
svlep = append(svlep, sv1)
// establish the sent date
if msg.GetSentDateTime() != nil {
sv2 := models.NewSingleValueLegacyExtendedProperty()
sv2.SetId(ptr.To(MailSendDateTimeOverrideProperty))
sv2.SetValue(ptr.To(dttm.FormatToLegacy(ptr.Val(msg.GetSentDateTime()))))
svlep = append(svlep, sv2)
}
// establish the received Date
if msg.GetReceivedDateTime() != nil {
sv3 := models.NewSingleValueLegacyExtendedProperty()
sv3.SetId(ptr.To(MailReceiveDateTimeOverriveProperty))
sv3.SetValue(ptr.To(dttm.FormatToLegacy(ptr.Val(msg.GetReceivedDateTime()))))
svlep = append(svlep, sv3)
}
msg.SetSingleValueExtendedProperties(svlep)
return msg
}

View File

@ -0,0 +1,57 @@
package exchange
import (
"testing"
"github.com/alcionai/clues"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
type MailRestoreIntgSuite struct {
tester.Suite
creds account.M365Config
ac api.Client
userID string
}
func TestMailRestoreIntgSuite(t *testing.T) {
suite.Run(t, &MailRestoreIntgSuite{
Suite: tester.NewIntegrationSuite(
t,
[][]string{tester.M365AcctCredEnvs}),
})
}
func (suite *MailRestoreIntgSuite) SetupSuite() {
t := suite.T()
a := tester.NewM365Account(t)
creds, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
suite.creds = creds
suite.ac, err = api.NewClient(creds)
require.NoError(t, err, clues.ToCore(err))
suite.userID = tester.M365UserID(t)
}
// Testing to ensure that cache system works for in multiple different environments
func (suite *MailRestoreIntgSuite) TestCreateContainerDestination() {
runCreateDestinationTest(
suite.T(),
newMailRestoreHandler(suite.ac),
path.EmailCategory,
suite.creds.AzureTenantID,
suite.userID,
tester.DefaultTestRestoreDestination("").ContainerName,
[]string{"Griffindor", "Croix"},
[]string{"Griffindor", "Felicius"})
}

View File

@ -14,7 +14,6 @@ import (
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api"
@ -66,9 +65,10 @@ func (suite *RestoreIntgSuite) TestRestoreContact() {
var ( var (
userID = tester.M365UserID(t) userID = tester.M365UserID(t)
folderName = tester.DefaultTestRestoreDestination("contact").ContainerName folderName = tester.DefaultTestRestoreDestination("contact").ContainerName
handler = newContactRestoreHandler(suite.ac)
) )
aFolder, err := suite.ac.Contacts().CreateContactFolder(ctx, userID, folderName) aFolder, err := handler.ac.CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
folderID := ptr.Val(aFolder.GetId()) folderID := ptr.Val(aFolder.GetId())
@ -79,13 +79,11 @@ func (suite *RestoreIntgSuite) TestRestoreContact() {
assert.NoError(t, err, clues.ToCore(err)) assert.NoError(t, err, clues.ToCore(err))
}() }()
info, err := RestoreContact( info, err := handler.restore(
ctx, ctx,
exchMock.ContactBytes("Corso TestContact"), exchMock.ContactBytes("Corso TestContact"),
suite.ac.Contacts(), userID, folderID,
control.Copy, fault.New(true))
folderID,
userID)
assert.NoError(t, err, clues.ToCore(err)) assert.NoError(t, err, clues.ToCore(err))
assert.NotNil(t, info, "contact item info") assert.NotNil(t, info, "contact item info")
} }
@ -101,9 +99,10 @@ func (suite *RestoreIntgSuite) TestRestoreEvent() {
var ( var (
userID = tester.M365UserID(t) userID = tester.M365UserID(t)
subject = tester.DefaultTestRestoreDestination("event").ContainerName subject = tester.DefaultTestRestoreDestination("event").ContainerName
handler = newEventRestoreHandler(suite.ac)
) )
calendar, err := suite.ac.Events().CreateCalendar(ctx, userID, subject) calendar, err := handler.ac.CreateContainer(ctx, userID, subject, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
calendarID := ptr.Val(calendar.GetId()) calendarID := ptr.Val(calendar.GetId())
@ -135,15 +134,10 @@ func (suite *RestoreIntgSuite) TestRestoreEvent() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
info, err := RestoreEvent( info, err := handler.restore(
ctx, ctx,
test.bytes, test.bytes,
suite.ac.Events(), userID, calendarID,
suite.ac.Events(),
suite.gs,
control.Copy,
calendarID,
userID,
fault.New(true)) fault.New(true))
assert.NoError(t, err, clues.ToCore(err)) assert.NoError(t, err, clues.ToCore(err))
assert.NotNil(t, info, "event item info") assert.NotNil(t, info, "event item info")
@ -154,12 +148,8 @@ func (suite *RestoreIntgSuite) TestRestoreEvent() {
// TestRestoreExchangeObject verifies path.Category usage for restored objects // TestRestoreExchangeObject verifies path.Category usage for restored objects
func (suite *RestoreIntgSuite) TestRestoreExchangeObject() { func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
t := suite.T() t := suite.T()
a := tester.NewM365Account(t)
m365, err := a.M365Config()
require.NoError(t, err, clues.ToCore(err))
service, err := createService(m365) handlers := restoreHandlers(suite.ac)
require.NoError(t, err, clues.ToCore(err))
userID := tester.M365UserID(suite.T()) userID := tester.M365UserID(suite.T())
@ -175,7 +165,8 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
category: path.EmailCategory, category: path.EmailCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("mailobj").ContainerName folderName := tester.DefaultTestRestoreDestination("mailobj").ContainerName
folder, err := suite.ac.Mail().CreateMailFolder(ctx, userID, folderName) folder, err := handlers[path.EmailCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(folder.GetId()) return ptr.Val(folder.GetId())
@ -187,7 +178,8 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
category: path.EmailCategory, category: path.EmailCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("mailwattch").ContainerName folderName := tester.DefaultTestRestoreDestination("mailwattch").ContainerName
folder, err := suite.ac.Mail().CreateMailFolder(ctx, userID, folderName) folder, err := handlers[path.EmailCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(folder.GetId()) return ptr.Val(folder.GetId())
@ -199,7 +191,8 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
category: path.EmailCategory, category: path.EmailCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("eventwattch").ContainerName folderName := tester.DefaultTestRestoreDestination("eventwattch").ContainerName
folder, err := suite.ac.Mail().CreateMailFolder(ctx, userID, folderName) folder, err := handlers[path.EmailCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(folder.GetId()) return ptr.Val(folder.GetId())
@ -211,7 +204,8 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
category: path.EmailCategory, category: path.EmailCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("mailitemattch").ContainerName folderName := tester.DefaultTestRestoreDestination("mailitemattch").ContainerName
folder, err := suite.ac.Mail().CreateMailFolder(ctx, userID, folderName) folder, err := handlers[path.EmailCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(folder.GetId()) return ptr.Val(folder.GetId())
@ -226,7 +220,8 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
category: path.EmailCategory, category: path.EmailCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("mailbasicattch").ContainerName folderName := tester.DefaultTestRestoreDestination("mailbasicattch").ContainerName
folder, err := suite.ac.Mail().CreateMailFolder(ctx, userID, folderName) folder, err := handlers[path.EmailCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(folder.GetId()) return ptr.Val(folder.GetId())
@ -241,7 +236,8 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
category: path.EmailCategory, category: path.EmailCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("mailnestattch").ContainerName folderName := tester.DefaultTestRestoreDestination("mailnestattch").ContainerName
folder, err := suite.ac.Mail().CreateMailFolder(ctx, userID, folderName) folder, err := handlers[path.EmailCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(folder.GetId()) return ptr.Val(folder.GetId())
@ -256,7 +252,8 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
category: path.EmailCategory, category: path.EmailCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("mailcontactattch").ContainerName folderName := tester.DefaultTestRestoreDestination("mailcontactattch").ContainerName
folder, err := suite.ac.Mail().CreateMailFolder(ctx, userID, folderName) folder, err := handlers[path.EmailCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(folder.GetId()) return ptr.Val(folder.GetId())
@ -268,7 +265,8 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
category: path.EmailCategory, category: path.EmailCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("nestedattch").ContainerName folderName := tester.DefaultTestRestoreDestination("nestedattch").ContainerName
folder, err := suite.ac.Mail().CreateMailFolder(ctx, userID, folderName) folder, err := handlers[path.EmailCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(folder.GetId()) return ptr.Val(folder.GetId())
@ -280,7 +278,8 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
category: path.EmailCategory, category: path.EmailCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("maillargeattch").ContainerName folderName := tester.DefaultTestRestoreDestination("maillargeattch").ContainerName
folder, err := suite.ac.Mail().CreateMailFolder(ctx, userID, folderName) folder, err := handlers[path.EmailCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(folder.GetId()) return ptr.Val(folder.GetId())
@ -292,7 +291,8 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
category: path.EmailCategory, category: path.EmailCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("mailtwoattch").ContainerName folderName := tester.DefaultTestRestoreDestination("mailtwoattch").ContainerName
folder, err := suite.ac.Mail().CreateMailFolder(ctx, userID, folderName) folder, err := handlers[path.EmailCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(folder.GetId()) return ptr.Val(folder.GetId())
@ -304,20 +304,21 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
category: path.EmailCategory, category: path.EmailCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("mailrefattch").ContainerName folderName := tester.DefaultTestRestoreDestination("mailrefattch").ContainerName
folder, err := suite.ac.Mail().CreateMailFolder(ctx, userID, folderName) folder, err := handlers[path.EmailCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(folder.GetId()) return ptr.Val(folder.GetId())
}, },
}, },
// TODO: #884 - reinstate when able to specify root folder by name
{ {
name: "Test Contact", name: "Test Contact",
bytes: exchMock.ContactBytes("Test_Omega"), bytes: exchMock.ContactBytes("Test_Omega"),
category: path.ContactsCategory, category: path.ContactsCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("contact").ContainerName folderName := tester.DefaultTestRestoreDestination("contact").ContainerName
folder, err := suite.ac.Contacts().CreateContactFolder(ctx, userID, folderName) folder, err := handlers[path.ContactsCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(folder.GetId()) return ptr.Val(folder.GetId())
@ -329,7 +330,8 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
category: path.EventsCategory, category: path.EventsCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("event").ContainerName folderName := tester.DefaultTestRestoreDestination("event").ContainerName
calendar, err := suite.ac.Events().CreateCalendar(ctx, userID, folderName) calendar, err := handlers[path.EventsCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(calendar.GetId()) return ptr.Val(calendar.GetId())
@ -341,7 +343,8 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
category: path.EventsCategory, category: path.EventsCategory,
destination: func(t *testing.T, ctx context.Context) string { destination: func(t *testing.T, ctx context.Context) string {
folderName := tester.DefaultTestRestoreDestination("eventobj").ContainerName folderName := tester.DefaultTestRestoreDestination("eventobj").ContainerName
calendar, err := suite.ac.Events().CreateCalendar(ctx, userID, folderName) calendar, err := handlers[path.EventsCategory].
CreateContainer(ctx, userID, folderName, "")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ptr.Val(calendar.GetId()) return ptr.Val(calendar.GetId())
@ -357,15 +360,10 @@ func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
defer flush() defer flush()
destination := test.destination(t, ctx) destination := test.destination(t, ctx)
info, err := RestoreItem( info, err := handlers[test.category].restore(
ctx, ctx,
test.bytes, test.bytes,
test.category, userID, destination,
control.Copy,
suite.ac,
service,
destination,
userID,
fault.New(true)) fault.New(true))
assert.NoError(t, err, clues.ToCore(err)) assert.NoError(t, err, clues.ToCore(err))
assert.NotNil(t, info, "item info was not populated") assert.NotNil(t, info, "item info was not populated")

View File

@ -1,160 +0,0 @@
package exchange
import (
"context"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
var ErrFolderNotFound = clues.New("folder not found")
func createService(credentials account.M365Config) (*graph.Service, error) {
adapter, err := graph.CreateAdapter(
credentials.AzureTenantID,
credentials.AzureClientID,
credentials.AzureClientSecret)
if err != nil {
return nil, clues.Wrap(err, "creating microsoft graph service for exchange")
}
return graph.NewService(adapter), nil
}
// populateExchangeContainerResolver gets a folder resolver if one is available for
// this category of data. If one is not available, returns nil so that other
// logic in the caller can complete as long as they check if the resolver is not
// nil. If an error occurs populating the resolver, returns an error.
func PopulateExchangeContainerResolver(
ctx context.Context,
qp graph.QueryParams,
errs *fault.Bus,
) (graph.ContainerResolver, error) {
var (
res graph.ContainerResolver
cacheRoot string
)
ac, err := api.NewClient(qp.Credentials)
if err != nil {
return nil, err
}
switch qp.Category {
case path.EmailCategory:
acm := ac.Mail()
res = &mailFolderCache{
userID: qp.ResourceOwner.ID(),
getter: acm,
enumer: acm,
}
cacheRoot = rootFolderAlias
case path.ContactsCategory:
acc := ac.Contacts()
res = &contactFolderCache{
userID: qp.ResourceOwner.ID(),
getter: acc,
enumer: acc,
}
cacheRoot = DefaultContactFolder
case path.EventsCategory:
ecc := ac.Events()
res = &eventCalendarCache{
userID: qp.ResourceOwner.ID(),
getter: ecc,
enumer: ecc,
}
cacheRoot = DefaultCalendar
default:
return nil, clues.New("no container resolver registered for category").WithClues(ctx)
}
if err := res.Populate(ctx, errs, cacheRoot); err != nil {
return nil, clues.Wrap(err, "populating directory resolver").WithClues(ctx)
}
return res, nil
}
// Returns true if the container passes the scope comparison and should be included.
// Returns:
// - the path representing the directory as it should be stored in the repository.
// - the human-readable path using display names.
// - true if the path passes the scope comparison.
func includeContainer(
ctx context.Context,
qp graph.QueryParams,
c graph.CachedContainer,
scope selectors.ExchangeScope,
category path.CategoryType,
) (path.Path, *path.Builder, bool) {
var (
directory string
locPath path.Path
pb = c.Path()
loc = c.Location()
)
// Clause ensures that DefaultContactFolder is inspected properly
if category == path.ContactsCategory && ptr.Val(c.GetDisplayName()) == DefaultContactFolder {
loc = loc.Append(DefaultContactFolder)
}
dirPath, err := pb.ToDataLayerExchangePathForCategory(
qp.Credentials.AzureTenantID,
qp.ResourceOwner.ID(),
category,
false)
// Containers without a path (e.g. Root mail folder) always err here.
if err != nil {
return nil, nil, false
}
directory = dirPath.Folder(false)
if loc != nil {
locPath, err = loc.ToDataLayerExchangePathForCategory(
qp.Credentials.AzureTenantID,
qp.ResourceOwner.ID(),
category,
false)
// Containers without a path (e.g. Root mail folder) always err here.
if err != nil {
return nil, nil, false
}
directory = locPath.Folder(false)
}
var ok bool
switch category {
case path.EmailCategory:
ok = scope.Matches(selectors.ExchangeMailFolder, directory)
case path.ContactsCategory:
ok = scope.Matches(selectors.ExchangeContactFolder, directory)
case path.EventsCategory:
ok = scope.Matches(selectors.ExchangeEventCalendar, directory)
default:
return nil, nil, false
}
logger.Ctx(ctx).With(
"included", ok,
"scope", scope,
"matches_input", directory,
).Debug("backup folder selection filter")
return dirPath, loc, ok
}

View File

@ -18,15 +18,6 @@ import (
"github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api"
) )
type addedAndRemovedItemIDsGetter interface {
GetAddedAndRemovedItemIDs(
ctx context.Context,
user, containerID, oldDeltaToken string,
immutableIDs bool,
canMakeDeltaQueries bool,
) ([]string, []string, api.DeltaUpdate, error)
}
// filterContainersAndFillCollections is a utility function // filterContainersAndFillCollections is a utility function
// that places the M365 object ids belonging to specific directories // that places the M365 object ids belonging to specific directories
// into a BackupCollection. Messages outside of those directories are omitted. // into a BackupCollection. Messages outside of those directories are omitted.
@ -39,7 +30,7 @@ type addedAndRemovedItemIDsGetter interface {
func filterContainersAndFillCollections( func filterContainersAndFillCollections(
ctx context.Context, ctx context.Context,
qp graph.QueryParams, qp graph.QueryParams,
getter addedAndRemovedItemIDsGetter, bh backupHandler,
statusUpdater support.StatusUpdater, statusUpdater support.StatusUpdater,
resolver graph.ContainerResolver, resolver graph.ContainerResolver,
scope selectors.ExchangeScope, scope selectors.ExchangeScope,
@ -61,19 +52,6 @@ func filterContainersAndFillCollections(
logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps)) logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps))
// TODO(rkeepers): this should be passed in from the caller, probably
// as an interface that satisfies the NewCollection requirements.
// But this will work for the short term.
ac, err := api.NewClient(qp.Credentials)
if err != nil {
return nil, err
}
ibt, err := itemerByType(ac, category)
if err != nil {
return nil, err
}
el := errs.Local() el := errs.Local()
for _, c := range resolver.Items() { for _, c := range resolver.Items() {
@ -85,6 +63,7 @@ func filterContainersAndFillCollections(
delete(tombstones, cID) delete(tombstones, cID)
var ( var (
err error
dp = dps[cID] dp = dps[cID]
prevDelta = dp.Delta prevDelta = dp.Delta
prevPathStr = dp.Path // do not log: pii; log prevPath instead prevPathStr = dp.Path // do not log: pii; log prevPath instead
@ -115,13 +94,14 @@ func filterContainersAndFillCollections(
ictx = clues.Add(ictx, "previous_path", prevPath) ictx = clues.Add(ictx, "previous_path", prevPath)
added, removed, newDelta, err := getter.GetAddedAndRemovedItemIDs( added, removed, newDelta, err := bh.itemEnumerator().
ictx, GetAddedAndRemovedItemIDs(
qp.ResourceOwner.ID(), ictx,
cID, qp.ResourceOwner.ID(),
prevDelta, cID,
ctrlOpts.ToggleFeatures.ExchangeImmutableIDs, prevDelta,
!ctrlOpts.ToggleFeatures.DisableDelta) ctrlOpts.ToggleFeatures.ExchangeImmutableIDs,
!ctrlOpts.ToggleFeatures.DisableDelta)
if err != nil { if err != nil {
if !graph.IsErrDeletedInFlight(err) { if !graph.IsErrDeletedInFlight(err) {
el.AddRecoverable(clues.Stack(err).Label(fault.LabelForceNoBackupCreation)) el.AddRecoverable(clues.Stack(err).Label(fault.LabelForceNoBackupCreation))
@ -148,7 +128,7 @@ func filterContainersAndFillCollections(
prevPath, prevPath,
locPath, locPath,
category, category,
ibt, bh.itemHandler(),
statusUpdater, statusUpdater,
ctrlOpts, ctrlOpts,
newDelta.Reset) newDelta.Reset)
@ -181,7 +161,10 @@ func filterContainersAndFillCollections(
return nil, el.Failure() return nil, el.Failure()
} }
ictx := clues.Add(ctx, "tombstone_id", id) var (
err error
ictx = clues.Add(ctx, "tombstone_id", id)
)
if collections[id] != nil { if collections[id] != nil {
el.AddRecoverable(clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ictx)) el.AddRecoverable(clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ictx))
@ -207,7 +190,7 @@ func filterContainersAndFillCollections(
prevPath, prevPath,
nil, // tombstones don't need a location nil, // tombstones don't need a location
category, category,
ibt, bh.itemHandler(),
statusUpdater, statusUpdater,
ctrlOpts, ctrlOpts,
false) false)
@ -220,7 +203,7 @@ func filterContainersAndFillCollections(
"num_deltas_entries", len(deltaURLs)) "num_deltas_entries", len(deltaURLs))
col, err := graph.MakeMetadataCollection( col, err := graph.MakeMetadataCollection(
qp.Credentials.AzureTenantID, qp.TenantID,
qp.ResourceOwner.ID(), qp.ResourceOwner.ID(),
path.ExchangeService, path.ExchangeService,
qp.Category, qp.Category,
@ -260,15 +243,74 @@ func pathFromPrevString(ps string) (path.Path, error) {
return p, nil return p, nil
} }
func itemerByType(ac api.Client, category path.CategoryType) (itemer, error) { // Returns true if the container passes the scope comparison and should be included.
// Returns:
// - the path representing the directory as it should be stored in the repository.
// - the human-readable path using display names.
// - true if the path passes the scope comparison.
func includeContainer(
ctx context.Context,
qp graph.QueryParams,
c graph.CachedContainer,
scope selectors.ExchangeScope,
category path.CategoryType,
) (path.Path, *path.Builder, bool) {
var (
directory string
locPath path.Path
pb = c.Path()
loc = c.Location()
)
// Clause ensures that DefaultContactFolder is inspected properly
if category == path.ContactsCategory && ptr.Val(c.GetDisplayName()) == DefaultContactFolder {
loc = loc.Append(DefaultContactFolder)
}
dirPath, err := pb.ToDataLayerExchangePathForCategory(
qp.TenantID,
qp.ResourceOwner.ID(),
category,
false)
// Containers without a path (e.g. Root mail folder) always err here.
if err != nil {
return nil, nil, false
}
directory = dirPath.Folder(false)
if loc != nil {
locPath, err = loc.ToDataLayerExchangePathForCategory(
qp.TenantID,
qp.ResourceOwner.ID(),
category,
false)
// Containers without a path (e.g. Root mail folder) always err here.
if err != nil {
return nil, nil, false
}
directory = locPath.Folder(false)
}
var ok bool
switch category { switch category {
case path.EmailCategory: case path.EmailCategory:
return ac.Mail(), nil ok = scope.Matches(selectors.ExchangeMailFolder, directory)
case path.EventsCategory:
return ac.Events(), nil
case path.ContactsCategory: case path.ContactsCategory:
return ac.Contacts(), nil ok = scope.Matches(selectors.ExchangeContactFolder, directory)
case path.EventsCategory:
ok = scope.Matches(selectors.ExchangeEventCalendar, directory)
default: default:
return nil, clues.New("category not registered in getFetchIDFunc") return nil, nil, false
} }
logger.Ctx(ctx).With(
"included", ok,
"scope", scope,
"matches_input", directory,
).Debug("backup folder selection filter")
return dirPath, loc, ok
} }

View File

@ -27,7 +27,25 @@ import (
// mocks // mocks
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
var _ addedAndRemovedItemIDsGetter = &mockGetter{} var _ backupHandler = &mockBackupHandler{}
type mockBackupHandler struct {
mg mockGetter
category path.CategoryType
ac api.Client
userID string
}
func (bh mockBackupHandler) itemEnumerator() addedAndRemovedItemGetter { return bh.mg }
func (bh mockBackupHandler) itemHandler() itemGetterSerializer { return nil }
func (bh mockBackupHandler) NewContainerCache(
userID string,
) (string, graph.ContainerResolver) {
return BackupHandlers(bh.ac)[bh.category].NewContainerCache(bh.userID)
}
var _ addedAndRemovedItemGetter = &mockGetter{}
type ( type (
mockGetter struct { mockGetter struct {
@ -115,7 +133,7 @@ type ServiceIteratorsSuite struct {
creds account.M365Config creds account.M365Config
} }
func TestServiceIteratorsSuite(t *testing.T) { func TestServiceIteratorsUnitSuite(t *testing.T) {
suite.Run(t, &ServiceIteratorsSuite{Suite: tester.NewUnitSuite(t)}) suite.Run(t, &ServiceIteratorsSuite{Suite: tester.NewUnitSuite(t)})
} }
@ -131,7 +149,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
qp = graph.QueryParams{ qp = graph.QueryParams{
Category: path.EmailCategory, // doesn't matter which one we use. Category: path.EmailCategory, // doesn't matter which one we use.
ResourceOwner: inMock.NewProvider("user_id", "user_name"), ResourceOwner: inMock.NewProvider("user_id", "user_name"),
Credentials: suite.creds, TenantID: suite.creds.AzureTenantID,
} }
statusUpdater = func(*support.ConnectorOperationStatus) {} statusUpdater = func(*support.ConnectorOperationStatus) {}
allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0] allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0]
@ -326,10 +344,15 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
ctrlOpts := control.Options{FailureHandling: test.failFast} ctrlOpts := control.Options{FailureHandling: test.failFast}
ctrlOpts.ToggleFeatures.DisableDelta = !canMakeDeltaQueries ctrlOpts.ToggleFeatures.DisableDelta = !canMakeDeltaQueries
mbh := mockBackupHandler{
mg: test.getter,
category: qp.Category,
}
collections, err := filterContainersAndFillCollections( collections, err := filterContainersAndFillCollections(
ctx, ctx,
qp, qp,
test.getter, mbh,
statusUpdater, statusUpdater,
test.resolver, test.resolver,
test.scope, test.scope,
@ -422,7 +445,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_Dupli
var ( var (
qp = graph.QueryParams{ qp = graph.QueryParams{
ResourceOwner: inMock.NewProvider("user_id", "user_name"), ResourceOwner: inMock.NewProvider("user_id", "user_name"),
Credentials: suite.creds, TenantID: suite.creds.AzureTenantID,
} }
statusUpdater = func(*support.ConnectorOperationStatus) {} statusUpdater = func(*support.ConnectorOperationStatus) {}
@ -660,10 +683,15 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_Dupli
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
mbh := mockBackupHandler{
mg: test.getter,
category: qp.Category,
}
collections, err := filterContainersAndFillCollections( collections, err := filterContainersAndFillCollections(
ctx, ctx,
qp, qp,
test.getter, mbh,
statusUpdater, statusUpdater,
test.resolver, test.resolver,
sc.scope, sc.scope,
@ -803,7 +831,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea
qp = graph.QueryParams{ qp = graph.QueryParams{
Category: path.EmailCategory, // doesn't matter which one we use. Category: path.EmailCategory, // doesn't matter which one we use.
ResourceOwner: inMock.NewProvider("user_id", "user_name"), ResourceOwner: inMock.NewProvider("user_id", "user_name"),
Credentials: suite.creds, TenantID: suite.creds.AzureTenantID,
} }
statusUpdater = func(*support.ConnectorOperationStatus) {} statusUpdater = func(*support.ConnectorOperationStatus) {}
allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0] allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0]
@ -815,6 +843,10 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea
l: path.Builder{}.Append("display_name_1"), l: path.Builder{}.Append("display_name_1"),
} }
resolver = newMockResolver(container1) resolver = newMockResolver(container1)
mbh = mockBackupHandler{
mg: test.getter,
category: qp.Category,
}
) )
require.Equal(t, "user_id", qp.ResourceOwner.ID(), qp.ResourceOwner) require.Equal(t, "user_id", qp.ResourceOwner.ID(), qp.ResourceOwner)
@ -823,7 +855,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea
collections, err := filterContainersAndFillCollections( collections, err := filterContainersAndFillCollections(
ctx, ctx,
qp, qp,
test.getter, mbh,
statusUpdater, statusUpdater,
resolver, resolver,
allScope, allScope,
@ -884,7 +916,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
qp = graph.QueryParams{ qp = graph.QueryParams{
Category: cat, Category: cat,
ResourceOwner: inMock.NewProvider("user_id", "user_name"), ResourceOwner: inMock.NewProvider("user_id", "user_name"),
Credentials: suite.creds, TenantID: suite.creds.AzureTenantID,
} }
statusUpdater = func(*support.ConnectorOperationStatus) {} statusUpdater = func(*support.ConnectorOperationStatus) {}
allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0] allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0]
@ -1226,6 +1258,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
getter.noReturnDelta = false getter.noReturnDelta = false
} }
mbh := mockBackupHandler{
mg: test.getter,
category: qp.Category,
}
dps := test.dps dps := test.dps
if !deltaBefore { if !deltaBefore {
for k, dp := range dps { for k, dp := range dps {
@ -1237,7 +1274,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
collections, err := filterContainersAndFillCollections( collections, err := filterContainersAndFillCollections(
ctx, ctx,
qp, qp,
test.getter, mbh,
statusUpdater, statusUpdater,
test.resolver, test.resolver,
allScope, allScope,

View File

@ -3,13 +3,11 @@ package exchange
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"runtime/trace" "runtime/trace"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/dttm"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
@ -25,228 +23,6 @@ import (
"github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api"
) )
type itemPoster[T any] interface {
PostItem(
ctx context.Context,
userID, dirID string,
body T,
) (T, error)
}
// RestoreItem directs restore pipeline towards restore function
// based on the path.CategoryType. All input params are necessary to perform
// the type-specific restore function.
func RestoreItem(
ctx context.Context,
bits []byte,
category path.CategoryType,
policy control.CollisionPolicy,
ac api.Client,
gs graph.Servicer,
destination, user string,
errs *fault.Bus,
) (*details.ExchangeInfo, error) {
if policy != control.Copy {
return nil, clues.Wrap(clues.New(policy.String()), "policy not supported for Exchange restore").WithClues(ctx)
}
switch category {
case path.EmailCategory:
return RestoreMessage(ctx, bits, ac.Mail(), ac.Mail(), gs, control.Copy, destination, user, errs)
case path.ContactsCategory:
return RestoreContact(ctx, bits, ac.Contacts(), control.Copy, destination, user)
case path.EventsCategory:
return RestoreEvent(ctx, bits, ac.Events(), ac.Events(), gs, control.Copy, destination, user, errs)
default:
return nil, clues.Wrap(clues.New(category.String()), "not supported for Exchange restore")
}
}
// RestoreContact wraps api.Contacts().PostItem()
func RestoreContact(
ctx context.Context,
body []byte,
cli itemPoster[models.Contactable],
cp control.CollisionPolicy,
destination, user string,
) (*details.ExchangeInfo, error) {
contact, err := api.BytesToContactable(body)
if err != nil {
return nil, graph.Wrap(ctx, err, "creating contact from bytes")
}
ctx = clues.Add(ctx, "item_id", ptr.Val(contact.GetId()))
_, err = cli.PostItem(ctx, user, destination, contact)
if err != nil {
return nil, clues.Stack(err)
}
info := api.ContactInfo(contact)
info.Size = int64(len(body))
return info, nil
}
// RestoreEvent wraps api.Events().PostItem()
func RestoreEvent(
ctx context.Context,
body []byte,
itemCli itemPoster[models.Eventable],
attachmentCli attachmentPoster,
gs graph.Servicer,
cp control.CollisionPolicy,
destination, user string,
errs *fault.Bus,
) (*details.ExchangeInfo, error) {
event, err := api.BytesToEventable(body)
if err != nil {
return nil, clues.Wrap(err, "creating event from bytes").WithClues(ctx)
}
ctx = clues.Add(ctx, "item_id", ptr.Val(event.GetId()))
var (
el = errs.Local()
transformedEvent = toEventSimplified(event)
attached []models.Attachmentable
)
if ptr.Val(event.GetHasAttachments()) {
attached = event.GetAttachments()
transformedEvent.SetAttachments([]models.Attachmentable{})
}
item, err := itemCli.PostItem(ctx, user, destination, event)
if err != nil {
return nil, clues.Stack(err)
}
for _, a := range attached {
if el.Failure() != nil {
break
}
err := uploadAttachment(
ctx,
attachmentCli,
user,
destination,
ptr.Val(item.GetId()),
a)
if err != nil {
el.AddRecoverable(err)
}
}
info := api.EventInfo(event)
info.Size = int64(len(body))
return info, el.Failure()
}
// RestoreMessage wraps api.Mail().PostItem(), handling attachment creation along the way
func RestoreMessage(
ctx context.Context,
body []byte,
itemCli itemPoster[models.Messageable],
attachmentCli attachmentPoster,
gs graph.Servicer,
cp control.CollisionPolicy,
destination, user string,
errs *fault.Bus,
) (*details.ExchangeInfo, error) {
// Creates messageable object from original bytes
msg, err := api.BytesToMessageable(body)
if err != nil {
return nil, clues.Wrap(err, "creating mail from bytes").WithClues(ctx)
}
ctx = clues.Add(ctx, "item_id", ptr.Val(msg.GetId()))
var (
clone = toMessage(msg)
valueID = MailRestorePropertyTag
enableValue = RestoreCanonicalEnableValue
)
// Set Extended Properties:
// 1st: No transmission
// 2nd: Send Date
// 3rd: Recv Date
svlep := make([]models.SingleValueLegacyExtendedPropertyable, 0)
sv1 := models.NewSingleValueLegacyExtendedProperty()
sv1.SetId(&valueID)
sv1.SetValue(&enableValue)
svlep = append(svlep, sv1)
if clone.GetSentDateTime() != nil {
sv2 := models.NewSingleValueLegacyExtendedProperty()
sendPropertyValue := dttm.FormatToLegacy(ptr.Val(clone.GetSentDateTime()))
sendPropertyTag := MailSendDateTimeOverrideProperty
sv2.SetId(&sendPropertyTag)
sv2.SetValue(&sendPropertyValue)
svlep = append(svlep, sv2)
}
if clone.GetReceivedDateTime() != nil {
sv3 := models.NewSingleValueLegacyExtendedProperty()
recvPropertyValue := dttm.FormatToLegacy(ptr.Val(clone.GetReceivedDateTime()))
recvPropertyTag := MailReceiveDateTimeOverriveProperty
sv3.SetId(&recvPropertyTag)
sv3.SetValue(&recvPropertyValue)
svlep = append(svlep, sv3)
}
clone.SetSingleValueExtendedProperties(svlep)
attached := clone.GetAttachments()
// Item.Attachments --> HasAttachments doesn't always have a value populated when deserialized
clone.SetAttachments([]models.Attachmentable{})
item, err := itemCli.PostItem(ctx, user, destination, clone)
if err != nil {
return nil, graph.Wrap(ctx, err, "restoring mail message")
}
el := errs.Local()
for _, a := range attached {
if el.Failure() != nil {
return nil, el.Failure()
}
err := uploadAttachment(
ctx,
attachmentCli,
user,
destination,
ptr.Val(item.GetId()),
a)
if err != nil {
// FIXME: I don't know why we're swallowing this error case.
// It needs investigation: https://github.com/alcionai/corso/issues/3498
if ptr.Val(a.GetOdataType()) == "#microsoft.graph.itemAttachment" {
name := ptr.Val(a.GetName())
logger.CtxErr(ctx, err).
With("attachment_name", name).
Info("mail upload failed")
continue
}
el.AddRecoverable(clues.Wrap(err, "uploading mail attachment"))
}
}
return api.MailInfo(clone, int64(len(body))), el.Failure()
}
// RestoreCollections restores M365 objects in data.RestoreCollection to MSFT // RestoreCollections restores M365 objects in data.RestoreCollection to MSFT
// store through GraphAPI. // store through GraphAPI.
func RestoreCollections( func RestoreCollections(
@ -259,49 +35,82 @@ func RestoreCollections(
deets *details.Builder, deets *details.Builder,
errs *fault.Bus, errs *fault.Bus,
) (*support.ConnectorOperationStatus, error) { ) (*support.ConnectorOperationStatus, error) {
if len(dcs) == 0 {
return support.CreateStatus(ctx, support.Restore, 0, support.CollectionMetrics{}, ""), nil
}
var ( var (
directoryCaches = make(map[string]map[path.CategoryType]graph.ContainerResolver) userID = dcs[0].FullPath().ResourceOwner()
metrics support.CollectionMetrics directoryCache = make(map[path.CategoryType]graph.ContainerResolver)
userID string handlers = restoreHandlers(ac)
metrics support.CollectionMetrics
// TODO policy to be updated from external source after completion of refactoring // TODO policy to be updated from external source after completion of refactoring
policy = control.Copy policy = control.Copy
el = errs.Local() el = errs.Local()
) )
if len(dcs) > 0 { ctx = clues.Add(ctx, "resource_owner", clues.Hide(userID))
userID = dcs[0].FullPath().ResourceOwner()
ctx = clues.Add(ctx, "resource_owner", clues.Hide(userID))
}
for _, dc := range dcs { for _, dc := range dcs {
if el.Failure() != nil { if el.Failure() != nil {
break break
} }
userCaches := directoryCaches[userID] var (
if userCaches == nil { isNewCache bool
directoryCaches[userID] = make(map[path.CategoryType]graph.ContainerResolver) category = dc.FullPath().Category()
userCaches = directoryCaches[userID] ictx = clues.Add(
} ctx,
"restore_category", category,
"restore_full_path", dc.FullPath())
)
containerID, err := CreateContainerDestination( handler, ok := handlers[category]
ctx, if !ok {
creds, el.AddRecoverable(clues.New("unsupported restore path category").WithClues(ictx))
dc.FullPath(),
dest.ContainerName,
userCaches,
errs)
if err != nil {
el.AddRecoverable(clues.Wrap(err, "creating destination").WithClues(ctx))
continue continue
} }
temp, canceled := restoreCollection(ctx, ac, gs, dc, containerID, policy, deets, errs) if directoryCache[category] == nil {
directoryCache[category] = handler.newContainerCache(userID)
isNewCache = true
}
containerID, gcr, err := createDestination(
ictx,
handler,
handler.formatRestoreDestination(dest.ContainerName, dc.FullPath()),
userID,
directoryCache[category],
isNewCache,
errs)
if err != nil {
el.AddRecoverable(err)
continue
}
directoryCache[category] = gcr
ictx = clues.Add(ictx, "restore_destination_id", containerID)
temp, err := restoreCollection(
ictx,
handler,
dc,
userID,
containerID,
policy,
deets,
errs)
metrics = support.CombineMetrics(metrics, temp) metrics = support.CombineMetrics(metrics, temp)
if canceled { if err != nil {
break if graph.IsErrTimeout(err) {
break
}
el.AddRecoverable(err)
} }
} }
@ -318,48 +127,39 @@ func RestoreCollections(
// restoreCollection handles restoration of an individual collection. // restoreCollection handles restoration of an individual collection.
func restoreCollection( func restoreCollection(
ctx context.Context, ctx context.Context,
ac api.Client, ir itemRestorer,
gs graph.Servicer,
dc data.RestoreCollection, dc data.RestoreCollection,
folderID string, userID, destinationID string,
policy control.CollisionPolicy, policy control.CollisionPolicy,
deets *details.Builder, deets *details.Builder,
errs *fault.Bus, errs *fault.Bus,
) (support.CollectionMetrics, bool) { ) (support.CollectionMetrics, error) {
ctx, end := diagnostics.Span(ctx, "gc:exchange:restoreCollection", diagnostics.Label("path", dc.FullPath())) ctx, end := diagnostics.Span(ctx, "gc:exchange:restoreCollection", diagnostics.Label("path", dc.FullPath()))
defer end() defer end()
var ( var (
metrics support.CollectionMetrics el = errs.Local()
items = dc.Items(ctx, errs) metrics support.CollectionMetrics
directory = dc.FullPath() items = dc.Items(ctx, errs)
service = directory.Service() fullPath = dc.FullPath()
category = directory.Category() category = fullPath.Category()
user = directory.ResourceOwner()
) )
ctx = clues.Add(
ctx,
"full_path", directory,
"service", service,
"category", category)
colProgress, closer := observe.CollectionProgress( colProgress, closer := observe.CollectionProgress(
ctx, ctx,
category.String(), category.String(),
clues.Hide(directory.Folder(false))) fullPath.Folder(false))
defer closer() defer closer()
defer close(colProgress) defer close(colProgress)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
errs.AddRecoverable(clues.Wrap(ctx.Err(), "context cancelled").WithClues(ctx)) return metrics, clues.Wrap(ctx.Err(), "context cancelled").WithClues(ctx)
return metrics, true
case itemData, ok := <-items: case itemData, ok := <-items:
if !ok || errs.Failure() != nil { if !ok || el.Failure() != nil {
return metrics, false return metrics, el.Failure()
} }
ictx := clues.Add(ctx, "item_id", itemData.UUID()) ictx := clues.Add(ctx, "item_id", itemData.UUID())
@ -370,33 +170,26 @@ func restoreCollection(
_, err := buf.ReadFrom(itemData.ToReader()) _, err := buf.ReadFrom(itemData.ToReader())
if err != nil { if err != nil {
errs.AddRecoverable(clues.Wrap(err, "reading item bytes").WithClues(ictx)) el.AddRecoverable(clues.Wrap(err, "reading item bytes").WithClues(ictx))
continue continue
} }
byteArray := buf.Bytes() body := buf.Bytes()
info, err := RestoreItem( info, err := ir.restore(ictx, body, userID, destinationID, errs)
ictx,
byteArray,
category,
policy,
ac,
gs,
folderID,
user,
errs)
if err != nil { if err != nil {
errs.AddRecoverable(err) el.AddRecoverable(err)
continue continue
} }
metrics.Bytes += int64(len(byteArray)) metrics.Bytes += int64(len(body))
metrics.Successes++ metrics.Successes++
itemPath, err := dc.FullPath().AppendItem(itemData.UUID()) // FIXME: this may be the incorrect path. If we restored within a top-level
// destination folder, then the restore path no longer matches the fullPath.
itemPath, err := fullPath.AppendItem(itemData.UUID())
if err != nil { if err != nil {
errs.AddRecoverable(clues.Wrap(err, "building full path with item").WithClues(ctx)) el.AddRecoverable(clues.Wrap(err, "adding item to collection path").WithClues(ctx))
continue continue
} }
@ -410,7 +203,8 @@ func restoreCollection(
Exchange: info, Exchange: info,
}) })
if err != nil { if err != nil {
// Not critical enough to need to stop restore operation. // These deets additions are for cli display purposes only.
// no need to fail out on error.
logger.Ctx(ctx).Infow("accounting for restored item", "error", err) logger.Ctx(ctx).Infow("accounting for restored item", "error", err)
} }
@ -419,257 +213,143 @@ func restoreCollection(
} }
} }
// CreateContainerDestination builds the destination into the container // createDestination creates folders in sequence
// at the provided path. As a precondition, the destination cannot // [root leaf1 leaf2] similar to a linked list.
// already exist. If it does then an error is returned. The provided // @param directory is the desired path from the root to the container
// containerResolver is updated with the new destination. // that the items will be restored into.
// @ returns the container ID of the new destination container. func createDestination(
func CreateContainerDestination(
ctx context.Context, ctx context.Context,
creds account.M365Config, ca containerAPI,
directory path.Path, destination *path.Builder,
destination string, userID string,
caches map[path.CategoryType]graph.ContainerResolver, gcr graph.ContainerResolver,
isNewCache bool,
errs *fault.Bus, errs *fault.Bus,
) (string, error) { ) (string, graph.ContainerResolver, error) {
var ( var (
newCache = false cache = gcr
user = directory.ResourceOwner() restoreLoc = &path.Builder{}
category = directory.Category() containerParentID string
directoryCache = caches[category]
) )
// TODO(rkeepers): pass the api client into this func, rather than generating one. for _, container := range destination.Elements() {
ac, err := api.NewClient(creds) restoreLoc = restoreLoc.Append(container)
if err != nil {
return "", clues.Stack(err).WithClues(ctx)
}
switch category { ictx := clues.Add(
case path.EmailCategory:
folders := append([]string{destination}, directory.Folders()...)
if directoryCache == nil {
acm := ac.Mail()
mfc := &mailFolderCache{
userID: user,
enumer: acm,
getter: acm,
}
caches[category] = mfc
newCache = true
directoryCache = mfc
}
return establishMailRestoreLocation(
ctx, ctx,
ac, "is_new_cache", isNewCache,
folders, "container_parent_id", containerParentID,
directoryCache, "container_name", container,
user, "restore_location", restoreLoc)
newCache,
fid, err := getOrPopulateContainer(
ictx,
ca,
cache,
restoreLoc,
userID,
containerParentID,
container,
isNewCache,
errs) errs)
case path.ContactsCategory:
folders := append([]string{destination}, directory.Folders()...)
if directoryCache == nil {
acc := ac.Contacts()
cfc := &contactFolderCache{
userID: user,
enumer: acc,
getter: acc,
}
caches[category] = cfc
newCache = true
directoryCache = cfc
}
return establishContactsRestoreLocation(
ctx,
ac,
folders,
directoryCache,
user,
newCache,
errs)
case path.EventsCategory:
dest := destination
if directoryCache == nil {
ace := ac.Events()
ecc := &eventCalendarCache{
userID: user,
getter: ace,
enumer: ace,
}
caches[category] = ecc
newCache = true
directoryCache = ecc
}
folders := append([]string{dest}, directory.Folders()...)
return establishEventsRestoreLocation(
ctx,
ac,
folders,
directoryCache,
user,
newCache,
errs)
default:
return "", clues.New(fmt.Sprintf("type not supported: %T", category)).WithClues(ctx)
}
}
// establishMailRestoreLocation creates Mail folders in sequence
// [root leaf1 leaf2] in a similar to a linked list.
// @param folders is the desired path from the root to the container
// that the items will be restored into
// @param isNewCache identifies if the cache is created and not populated
func establishMailRestoreLocation(
ctx context.Context,
ac api.Client,
folders []string,
mfc graph.ContainerResolver,
user string,
isNewCache bool,
errs *fault.Bus,
) (string, error) {
// Process starts with the root folder in order to recreate
// the top-level folder with the same tactic
folderID := rootFolderAlias
pb := path.Builder{}
ctx = clues.Add(ctx, "is_new_cache", isNewCache)
for _, folder := range folders {
pb = *pb.Append(folder)
cached, ok := mfc.LocationInCache(pb.String())
if ok {
folderID = cached
continue
}
temp, err := ac.Mail().CreateMailFolderWithParent(ctx, user, folder, folderID)
if err != nil { if err != nil {
// Should only error if cache malfunctions or incorrect parameters return "", cache, clues.Stack(err)
return "", err
} }
folderID = ptr.Val(temp.GetId()) containerParentID = fid
// Only populate the cache if we actually had to create it. Since we set
// newCache to false in this we'll only try to populate it once per function
// call even if we make a new cache.
if isNewCache {
if err := mfc.Populate(ctx, errs, rootFolderAlias); err != nil {
return "", clues.Wrap(err, "populating folder cache")
}
isNewCache = false
}
// NOOP if the folder is already in the cache.
if err = mfc.AddToCache(ctx, temp); err != nil {
return "", clues.Wrap(err, "adding folder to cache")
}
} }
return folderID, nil // containerParentID now identifies the last created container,
// not its parent.
return containerParentID, cache, nil
} }
// establishContactsRestoreLocation creates Contact Folders in sequence func getOrPopulateContainer(
// and updates the container resolver appropriately. Contact Folders are
// displayed in a flat representation. Therefore, only the root can be populated and all content
// must be restored into the root location.
// @param folders is the list of intended folders from root to leaf (e.g. [root ...])
// @param isNewCache bool representation of whether Populate function needs to be run
func establishContactsRestoreLocation(
ctx context.Context, ctx context.Context,
ac api.Client, ca containerAPI,
folders []string, gcr graph.ContainerResolver,
cfc graph.ContainerResolver, restoreLoc *path.Builder,
user string, userID, containerParentID, containerName string,
isNewCache bool, isNewCache bool,
errs *fault.Bus, errs *fault.Bus,
) (string, error) { ) (string, error) {
cached, ok := cfc.LocationInCache(folders[0]) cached, ok := gcr.LocationInCache(restoreLoc.String())
if ok { if ok {
return cached, nil return cached, nil
} }
ctx = clues.Add(ctx, "is_new_cache", isNewCache) c, err := ca.CreateContainer(ctx, userID, containerName, containerParentID)
temp, err := ac.Contacts().CreateContactFolder(ctx, user, folders[0]) // 409 handling case:
if err != nil { // attempt to fetch the container by name and add that result to the cache.
return "", err // This is rare, but may happen if CreateContainer() POST fails with 5xx:
} // sometimes the backend will create the folder despite the 5xx response,
// leaving our local containerResolver with inconsistent state.
folderID := ptr.Val(temp.GetId())
if isNewCache {
if err := cfc.Populate(ctx, errs, folderID, folders[0]); err != nil {
return "", clues.Wrap(err, "populating contact cache")
}
if err = cfc.AddToCache(ctx, temp); err != nil {
return "", clues.Wrap(err, "adding contact folder to cache")
}
}
return folderID, nil
}
func establishEventsRestoreLocation(
ctx context.Context,
ac api.Client,
folders []string,
ecc graph.ContainerResolver, // eventCalendarCache
user string,
isNewCache bool,
errs *fault.Bus,
) (string, error) {
// Need to prefix with the "Other Calendars" folder so lookup happens properly.
cached, ok := ecc.LocationInCache(folders[0])
if ok {
return cached, nil
}
ctx = clues.Add(ctx, "is_new_cache", isNewCache)
temp, err := ac.Events().CreateCalendar(ctx, user, folders[0])
if err != nil && !graph.IsErrFolderExists(err) {
return "", err
}
// 409 handling: Fetch folder if it exists and add to cache.
// This is rare, but may happen if CreateCalendar() POST fails with 5xx,
// potentially leaving dirty state in graph.
if graph.IsErrFolderExists(err) { if graph.IsErrFolderExists(err) {
temp, err = ac.Events().GetContainerByName(ctx, user, folders[0]) cs := ca.containerSearcher()
if err != nil { if cs != nil {
return "", err cc, e := cs.GetContainerByName(ctx, userID, containerName)
c = cc
err = clues.Stack(err, e)
} }
} }
folderID := ptr.Val(temp.GetId()) if err != nil {
return "", clues.Wrap(err, "creating restore container")
}
folderID := ptr.Val(c.GetId())
if isNewCache { if isNewCache {
if err = ecc.Populate(ctx, errs, folderID, folders[0]); err != nil { if err := gcr.Populate(ctx, errs, folderID, ca.orRootContainer(restoreLoc.HeadElem())); err != nil {
return "", clues.Wrap(err, "populating event cache") return "", clues.Wrap(err, "populating container cache")
} }
}
displayable := api.CalendarDisplayable{Calendarable: temp} if err = gcr.AddToCache(ctx, c); err != nil {
if err = ecc.AddToCache(ctx, displayable); err != nil { return "", clues.Wrap(err, "adding container to cache")
return "", clues.Wrap(err, "adding new calendar to cache")
}
} }
return folderID, nil return folderID, nil
} }
func uploadAttachments(
ctx context.Context,
ap attachmentPoster,
as []models.Attachmentable,
userID, destinationID, itemID string,
errs *fault.Bus,
) error {
el := errs.Local()
for _, a := range as {
if el.Failure() != nil {
return el.Failure()
}
err := uploadAttachment(
ctx,
ap,
userID,
destinationID,
itemID,
a)
if err != nil {
// FIXME: I don't know why we're swallowing this error case.
// It needs investigation: https://github.com/alcionai/corso/issues/3498
if ptr.Val(a.GetOdataType()) == "#microsoft.graph.itemAttachment" {
name := ptr.Val(a.GetName())
logger.CtxErr(ctx, err).
With("attachment_name", name).
Info("mail upload failed")
continue
}
el.AddRecoverable(clues.Wrap(err, "uploading mail attachment").WithClues(ctx))
}
}
return el.Failure()
}

View File

@ -0,0 +1,34 @@
package testdata
import (
"context"
"testing"
"github.com/alcionai/clues"
"github.com/stretchr/testify/require"
"github.com/alcionai/corso/src/internal/connector/exchange"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
func PopulateContainerCache(
t *testing.T,
ctx context.Context, //revive:disable-line:context-as-argument
ac api.Client,
category path.CategoryType,
resourceOwnerID string,
errs *fault.Bus,
) graph.ContainerResolver {
handler, ok := exchange.BackupHandlers(ac)[category]
require.Truef(t, ok, "container resolver registered for category %s", category)
root, cc := handler.NewContainerCache(resourceOwnerID)
err := cc.Populate(ctx, errs, root)
require.NoError(t, err, clues.ToCore(err))
return cc
}

View File

@ -55,14 +55,7 @@ func CloneMessageableFields(orig, message models.Messageable) models.Messageable
func toMessage(orig models.Messageable) models.Messageable { func toMessage(orig models.Messageable) models.Messageable {
message := models.NewMessage() message := models.NewMessage()
temp := CloneMessageableFields(orig, message) return CloneMessageableFields(orig, message)
aMessage, ok := temp.(*models.Message)
if !ok {
return nil
}
return aMessage
} }
// ToEventSimplified transforms an event to simplified restore format // ToEventSimplified transforms an event to simplified restore format

View File

@ -64,7 +64,7 @@ type ContainerResolver interface {
// @param ctx is necessary param for Graph API tracing // @param ctx is necessary param for Graph API tracing
// @param baseFolderID represents the M365ID base that the resolver will // @param baseFolderID represents the M365ID base that the resolver will
// conclude its search. Default input is "". // conclude its search. Default input is "".
Populate(ctx context.Context, errs *fault.Bus, baseFolderID string, baseContainerPather ...string) error Populate(ctx context.Context, errs *fault.Bus, baseFolderID string, baseContainerPath ...string) error
// PathInCache performs a look up of a path representation // PathInCache performs a look up of a path representation
// and returns the m365ID of directory iff the pathString // and returns the m365ID of directory iff the pathString

View File

@ -139,6 +139,7 @@ func IsErrTimeout(err error) bool {
} }
return errors.Is(err, ErrTimeout) || return errors.Is(err, ErrTimeout) ||
errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, http.ErrHandlerTimeout) || errors.Is(err, http.ErrHandlerTimeout) ||
os.IsTimeout(err) os.IsTimeout(err)

View File

@ -13,7 +13,6 @@ import (
msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core" msgraphgocore "github.com/microsoftgraph/msgraph-sdk-go-core"
"github.com/alcionai/corso/src/internal/common/idname" "github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
) )
@ -41,7 +40,7 @@ func AllMetadataFileNames() []string {
type QueryParams struct { type QueryParams struct {
Category path.CategoryType Category path.CategoryType
ResourceOwner idname.Provider ResourceOwner idname.Provider
Credentials account.M365Config TenantID string
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -24,6 +24,7 @@ import (
"github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/exchange" "github.com/alcionai/corso/src/internal/connector/exchange"
exchMock "github.com/alcionai/corso/src/internal/connector/exchange/mock" exchMock "github.com/alcionai/corso/src/internal/connector/exchange/mock"
exchTD "github.com/alcionai/corso/src/internal/connector/exchange/testdata"
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/mock" "github.com/alcionai/corso/src/internal/connector/mock"
"github.com/alcionai/corso/src/internal/connector/onedrive" "github.com/alcionai/corso/src/internal/connector/onedrive"
@ -719,7 +720,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
testExchangeContinuousBackups(suite, control.Toggles{}) testExchangeContinuousBackups(suite, control.Toggles{})
} }
func (suite *BackupOpIntegrationSuite) TestBackup_Run_nonIncrementalExchange() { func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalNonDeltaExchange() {
testExchangeContinuousBackups(suite, control.Toggles{DisableDelta: true}) testExchangeContinuousBackups(suite, control.Toggles{DisableDelta: true})
} }
@ -930,14 +931,7 @@ func testExchangeContinuousBackups(suite *BackupOpIntegrationSuite, toggles cont
// verify test data was populated, and track it for comparisons // verify test data was populated, and track it for comparisons
// TODO: this can be swapped out for InDeets checks if we add itemRefs to folder ents. // TODO: this can be swapped out for InDeets checks if we add itemRefs to folder ents.
for category, gen := range dataset { for category, gen := range dataset {
qp := graph.QueryParams{ cr := exchTD.PopulateContainerCache(t, ctx, ac, category, uidn.ID(), fault.New(true))
Category: category,
ResourceOwner: uidn,
Credentials: m365,
}
cr, err := exchange.PopulateExchangeContainerResolver(ctx, qp, fault.New(true))
require.NoError(t, err, "populating container resolver", category, clues.ToCore(err))
for destName, dest := range gen.dests { for destName, dest := range gen.dests {
id, ok := cr.LocationInCache(dest.locRef) id, ok := cr.LocationInCache(dest.locRef)
@ -1043,19 +1037,12 @@ func testExchangeContinuousBackups(suite *BackupOpIntegrationSuite, toggles cont
version.Backup, version.Backup,
gen.dbf) gen.dbf)
qp := graph.QueryParams{
Category: category,
ResourceOwner: uidn,
Credentials: m365,
}
expectedLocRef := container3 expectedLocRef := container3
if category == path.EmailCategory { if category == path.EmailCategory {
expectedLocRef = path.Builder{}.Append(container3, container3).String() expectedLocRef = path.Builder{}.Append(container3, container3).String()
} }
cr, err := exchange.PopulateExchangeContainerResolver(ctx, qp, fault.New(true)) cr := exchTD.PopulateContainerCache(t, ctx, ac, category, uidn.ID(), fault.New(true))
require.NoError(t, err, "populating container resolver", category, clues.ToCore(err))
id, ok := cr.LocationInCache(expectedLocRef) id, ok := cr.LocationInCache(expectedLocRef)
require.Truef(t, ok, "dir %s found in %s cache", expectedLocRef, category) require.Truef(t, ok, "dir %s found in %s cache", expectedLocRef, category)

View File

@ -240,6 +240,15 @@ func (pb Builder) Dir() *Builder {
} }
} }
// HeadElem returns the first element in the Builder.
func (pb Builder) HeadElem() string {
if len(pb.elements) == 0 {
return ""
}
return pb.elements[0]
}
// LastElem returns the last element in the Builder. // LastElem returns the last element in the Builder.
func (pb Builder) LastElem() string { func (pb Builder) LastElem() string {
if len(pb.elements) == 0 { if len(pb.elements) == 0 {

View File

@ -34,12 +34,13 @@ type Contacts struct {
// containers // containers
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// CreateContactFolder makes a contact folder with the displayName of folderName. // CreateContainer makes a contact folder with the displayName of folderName.
// If successful, returns the created folder object. // If successful, returns the created folder object.
func (c Contacts) CreateContactFolder( func (c Contacts) CreateContainer(
ctx context.Context, ctx context.Context,
userID, containerName string, userID, containerName string,
) (models.ContactFolderable, error) { _ string, // parentContainerID needed for iface, doesn't apply to contacts
) (graph.Container, error) {
body := models.NewContactFolder() body := models.NewContactFolder()
body.SetDisplayName(ptr.To(containerName)) body.SetDisplayName(ptr.To(containerName))

View File

@ -38,16 +38,17 @@ type Events struct {
// containers // containers
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// CreateCalendar makes an event Calendar with the name in the user's M365 exchange account // CreateContainer makes an event Calendar with the name in the user's M365 exchange account
// Reference: https://docs.microsoft.com/en-us/graph/api/user-post-calendars?view=graph-rest-1.0&tabs=go // Reference: https://docs.microsoft.com/en-us/graph/api/user-post-calendars?view=graph-rest-1.0&tabs=go
func (c Events) CreateCalendar( func (c Events) CreateContainer(
ctx context.Context, ctx context.Context,
userID, containerName string, userID, containerName string,
) (models.Calendarable, error) { _ string, // parentContainerID needed for iface, doesn't apply to contacts
) (graph.Container, error) {
body := models.NewCalendar() body := models.NewCalendar()
body.SetName(&containerName) body.SetName(&containerName)
mdl, err := c.Stable. container, err := c.Stable.
Client(). Client().
Users(). Users().
ByUserId(userID). ByUserId(userID).
@ -57,7 +58,7 @@ func (c Events) CreateCalendar(
return nil, graph.Wrap(ctx, err, "creating calendar") return nil, graph.Wrap(ctx, err, "creating calendar")
} }
return mdl, nil return CalendarDisplayable{Calendarable: container}, nil
} }
// DeleteContainer removes a calendar from user's M365 account // DeleteContainer removes a calendar from user's M365 account
@ -130,7 +131,7 @@ func (c Events) GetContainerByID(
func (c Events) GetContainerByName( func (c Events) GetContainerByName(
ctx context.Context, ctx context.Context,
userID, containerName string, userID, containerName string,
) (models.Calendarable, error) { ) (graph.Container, error) {
filter := fmt.Sprintf("name eq '%s'", containerName) filter := fmt.Sprintf("name eq '%s'", containerName)
options := &users.ItemCalendarsRequestBuilderGetRequestConfiguration{ options := &users.ItemCalendarsRequestBuilderGetRequestConfiguration{
QueryParameters: &users.ItemCalendarsRequestBuilderGetQueryParameters{ QueryParameters: &users.ItemCalendarsRequestBuilderGetQueryParameters{
@ -167,7 +168,7 @@ func (c Events) GetContainerByName(
return nil, err return nil, err
} }
return cal, nil return graph.CalendarDisplayable{Calendarable: cal}, nil
} }
func (c Events) PatchCalendar( func (c Events) PatchCalendar(

View File

@ -64,10 +64,10 @@ func (c Mail) CreateMailFolder(
return mdl, nil return mdl, nil
} }
func (c Mail) CreateMailFolderWithParent( func (c Mail) CreateContainer(
ctx context.Context, ctx context.Context,
userID, containerName, parentContainerID string, userID, containerName, parentContainerID string,
) (models.MailFolderable, error) { ) (graph.Container, error) {
isHidden := false isHidden := false
body := models.NewMailFolder() body := models.NewMailFolder()
body.SetDisplayName(&containerName) body.SetDisplayName(&containerName)