Split exchange collection from service (#4011)

<!-- PR description-->

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [ ]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abin Simon 2023-08-14 12:38:11 +05:30 committed by GitHub
parent a69e504155
commit 7aed7eba0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 849 additions and 808 deletions

View File

@ -0,0 +1,382 @@
package exchange
import (
"context"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/common/pii"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
// CreateCollections - utility function that retrieves M365
// IDs through Microsoft Graph API. The selectors.ExchangeScope
// determines the type of collections that are retrieved.
func CreateCollections(
ctx context.Context,
bpc inject.BackupProducerConfig,
handlers map[path.CategoryType]backupHandler,
tenantID string,
scope selectors.ExchangeScope,
dps DeltaPaths,
su support.StatusUpdater,
errs *fault.Bus,
) ([]data.BackupCollection, error) {
ctx = clues.Add(ctx, "category", scope.Category().PathType())
var (
allCollections = make([]data.BackupCollection, 0)
category = scope.Category().PathType()
qp = graph.QueryParams{
Category: category,
ProtectedResource: bpc.ProtectedResource,
TenantID: tenantID,
}
)
handler, ok := handlers[category]
if !ok {
return nil, clues.New("unsupported backup category type").WithClues(ctx)
}
foldersComplete := observe.MessageWithCompletion(
ctx,
observe.Bulletf("%s", qp.Category))
defer close(foldersComplete)
rootFolder, cc := handler.NewContainerCache(bpc.ProtectedResource.ID())
if err := cc.Populate(ctx, errs, rootFolder); err != nil {
return nil, clues.Wrap(err, "populating container cache")
}
collections, err := populateCollections(
ctx,
qp,
handler,
su,
cc,
scope,
dps,
bpc.Options,
errs)
if err != nil {
return nil, clues.Wrap(err, "filling collections")
}
foldersComplete <- struct{}{}
for _, coll := range collections {
allCollections = append(allCollections, coll)
}
return allCollections, nil
}
// populateCollections is a utility function
// that places the M365 object ids belonging to specific directories
// into a BackupCollection. Messages outside of those directories are omitted.
// @param collection is filled with during this function.
// Supports all exchange applications: Contacts, Events, and Mail
//
// TODO(ashmrtn): This should really return []data.BackupCollection but
// unfortunately some of our tests rely on being able to lookup returned
// collections by ID and it would be non-trivial to change them.
func populateCollections(
ctx context.Context,
qp graph.QueryParams,
bh backupHandler,
statusUpdater support.StatusUpdater,
resolver graph.ContainerResolver,
scope selectors.ExchangeScope,
dps DeltaPaths,
ctrlOpts control.Options,
errs *fault.Bus,
) (map[string]data.BackupCollection, error) {
var (
// folder ID -> BackupCollection.
collections = map[string]data.BackupCollection{}
// folder ID -> delta url or folder path lookups
deltaURLs = map[string]string{}
currPaths = map[string]string{}
// copy of previousPaths. any folder found in the resolver get
// deleted from this map, leaving only the deleted folders behind
tombstones = makeTombstones(dps)
category = qp.Category
)
logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps))
el := errs.Local()
for _, c := range resolver.Items() {
if el.Failure() != nil {
return nil, el.Failure()
}
cID := ptr.Val(c.GetId())
delete(tombstones, cID)
var (
err error
dp = dps[cID]
prevDelta = dp.Delta
prevPathStr = dp.Path // do not log: pii; log prevPath instead
prevPath path.Path
ictx = clues.Add(
ctx,
"container_id", cID,
"previous_delta", pii.SafeURL{
URL: prevDelta,
SafePathElems: graph.SafeURLPathParams,
SafeQueryKeys: graph.SafeURLQueryParams,
})
)
currPath, locPath, ok := includeContainer(ictx, qp, c, scope, category)
// Only create a collection if the path matches the scope.
if !ok {
continue
}
if len(prevPathStr) > 0 {
if prevPath, err = pathFromPrevString(prevPathStr); err != nil {
logger.CtxErr(ictx, err).Error("parsing prev path")
// if the previous path is unusable, then the delta must be, too.
prevDelta = ""
}
}
ictx = clues.Add(ictx, "previous_path", prevPath)
added, removed, newDelta, err := bh.itemEnumerator().
GetAddedAndRemovedItemIDs(
ictx,
qp.ProtectedResource.ID(),
cID,
prevDelta,
ctrlOpts.ToggleFeatures.ExchangeImmutableIDs,
!ctrlOpts.ToggleFeatures.DisableDelta)
if err != nil {
if !graph.IsErrDeletedInFlight(err) {
el.AddRecoverable(ctx, clues.Stack(err).Label(fault.LabelForceNoBackupCreation))
continue
}
// race conditions happen, containers might get deleted while
// this process is in flight. If that happens, force the collection
// to reset. This prevents any old items from being retained in
// storage. If the container (or its children) are sill missing
// on the next backup, they'll get tombstoned.
newDelta = api.DeltaUpdate{Reset: true}
}
if len(newDelta.URL) > 0 {
deltaURLs[cID] = newDelta.URL
} else if !newDelta.Reset {
logger.Ctx(ictx).Info("missing delta url")
}
edc := NewCollection(
qp.ProtectedResource.ID(),
currPath,
prevPath,
locPath,
category,
bh.itemHandler(),
statusUpdater,
ctrlOpts,
newDelta.Reset)
collections[cID] = &edc
for _, add := range added {
edc.added[add] = struct{}{}
}
// Remove any deleted IDs from the set of added IDs because items that are
// deleted and then restored will have a different ID than they did
// originally.
for _, remove := range removed {
delete(edc.added, remove)
edc.removed[remove] = struct{}{}
}
// add the current path for the container ID to be used in the next backup
// as the "previous path", for reference in case of a rename or relocation.
currPaths[cID] = currPath.String()
}
// A tombstone is a folder that needs to be marked for deletion.
// The only situation where a tombstone should appear is if the folder exists
// in the `previousPath` set, but does not exist in the current container
// resolver (which contains all the resource owners' current containers).
for id, p := range tombstones {
if el.Failure() != nil {
return nil, el.Failure()
}
var (
err error
ictx = clues.Add(ctx, "tombstone_id", id)
)
if collections[id] != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ictx))
continue
}
// only occurs if it was a new folder that we picked up during the container
// resolver phase that got deleted in flight by the time we hit this stage.
if len(p) == 0 {
continue
}
prevPath, err := pathFromPrevString(p)
if err != nil {
// technically shouldn't ever happen. But just in case...
logger.CtxErr(ictx, err).Error("parsing tombstone prev path")
continue
}
edc := NewCollection(
qp.ProtectedResource.ID(),
nil, // marks the collection as deleted
prevPath,
nil, // tombstones don't need a location
category,
bh.itemHandler(),
statusUpdater,
ctrlOpts,
false)
collections[id] = &edc
}
logger.Ctx(ctx).Infow(
"adding metadata collection entries",
"num_paths_entries", len(currPaths),
"num_deltas_entries", len(deltaURLs))
col, err := graph.MakeMetadataCollection(
qp.TenantID,
qp.ProtectedResource.ID(),
path.ExchangeService,
qp.Category,
[]graph.MetadataCollectionEntry{
graph.NewMetadataEntry(graph.PreviousPathFileName, currPaths),
graph.NewMetadataEntry(graph.DeltaURLsFileName, deltaURLs),
},
statusUpdater)
if err != nil {
return nil, clues.Wrap(err, "making metadata collection")
}
collections["metadata"] = col
return collections, el.Failure()
}
// produces a set of id:path pairs from the deltapaths map.
// Each entry in the set will, if not removed, produce a collection
// that will delete the tombstone by path.
func makeTombstones(dps DeltaPaths) map[string]string {
r := make(map[string]string, len(dps))
for id, v := range dps {
r[id] = v.Path
}
return r
}
func pathFromPrevString(ps string) (path.Path, error) {
p, err := path.FromDataLayerPath(ps, false)
if err != nil {
return nil, clues.Wrap(err, "parsing previous path string")
}
return p, nil
}
// Returns true if the container passes the scope comparison and should be included.
// Returns:
// - the path representing the directory as it should be stored in the repository.
// - the human-readable path using display names.
// - true if the path passes the scope comparison.
func includeContainer(
ctx context.Context,
qp graph.QueryParams,
c graph.CachedContainer,
scope selectors.ExchangeScope,
category path.CategoryType,
) (path.Path, *path.Builder, bool) {
var (
directory string
locPath path.Path
pb = c.Path()
loc = c.Location()
)
// Clause ensures that DefaultContactFolder is inspected properly
if category == path.ContactsCategory && ptr.Val(c.GetDisplayName()) == api.DefaultContacts {
loc = loc.Append(api.DefaultContacts)
}
dirPath, err := pb.ToDataLayerExchangePathForCategory(
qp.TenantID,
qp.ProtectedResource.ID(),
category,
false)
// Containers without a path (e.g. Root mail folder) always err here.
if err != nil {
return nil, nil, false
}
directory = dirPath.Folder(false)
if loc != nil {
locPath, err = loc.ToDataLayerExchangePathForCategory(
qp.TenantID,
qp.ProtectedResource.ID(),
category,
false)
// Containers without a path (e.g. Root mail folder) always err here.
if err != nil {
return nil, nil, false
}
directory = locPath.Folder(false)
}
var ok bool
switch category {
case path.EmailCategory:
ok = scope.Matches(selectors.ExchangeMailFolder, directory)
case path.ContactsCategory:
ok = scope.Matches(selectors.ExchangeContactFolder, directory)
case path.EventsCategory:
ok = scope.Matches(selectors.ExchangeEventCalendar, directory)
default:
return nil, nil, false
}
logger.Ctx(ctx).With(
"included", ok,
"scope", scope,
"matches_input", directory,
).Debug("backup folder selection filter")
return dirPath, loc, ok
}

View File

@ -308,7 +308,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
)
require.NoError(t, err, clues.ToCore(err))
cdps, canUsePreviousBackup, err := parseMetadataCollections(ctx, []data.RestoreCollection{
cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{
data.NoFetchRestoreCollection{Collection: coll},
})
test.expectError(t, err, clues.ToCore(err))
@ -368,7 +368,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections_ReadFailure(
fc := failingColl{t}
_, canUsePreviousBackup, err := parseMetadataCollections(ctx, []data.RestoreCollection{fc})
_, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{fc})
require.NoError(t, err)
require.False(t, canUsePreviousBackup)
}
@ -477,7 +477,7 @@ func (suite *BackupIntgSuite) TestMailFetch() {
ProtectedResource: inMock.NewProvider(userID, userID),
}
collections, err := createCollections(
collections, err := CreateCollections(
ctx,
bpc,
handlers,
@ -560,7 +560,7 @@ func (suite *BackupIntgSuite) TestDelta() {
}
// get collections without providing any delta history (ie: full backup)
collections, err := createCollections(
collections, err := CreateCollections(
ctx,
bpc,
handlers,
@ -582,7 +582,7 @@ func (suite *BackupIntgSuite) TestDelta() {
require.NotNil(t, metadata, "collections contains a metadata collection")
cdps, canUsePreviousBackup, err := parseMetadataCollections(ctx, []data.RestoreCollection{
cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{
data.NoFetchRestoreCollection{Collection: metadata},
})
require.NoError(t, err, clues.ToCore(err))
@ -592,7 +592,7 @@ func (suite *BackupIntgSuite) TestDelta() {
// now do another backup with the previous delta tokens,
// which should only contain the difference.
collections, err = createCollections(
collections, err = CreateCollections(
ctx,
bpc,
handlers,
@ -644,7 +644,7 @@ func (suite *BackupIntgSuite) TestMailSerializationRegression() {
Selector: sel.Selector,
}
collections, err := createCollections(
collections, err := CreateCollections(
ctx,
bpc,
handlers,
@ -725,7 +725,7 @@ func (suite *BackupIntgSuite) TestContactSerializationRegression() {
ProtectedResource: inMock.NewProvider(suite.user, suite.user),
}
edcs, err := createCollections(
edcs, err := CreateCollections(
ctx,
bpc,
handlers,
@ -855,7 +855,7 @@ func (suite *BackupIntgSuite) TestEventsSerializationRegression() {
ProtectedResource: inMock.NewProvider(suite.user, suite.user),
}
collections, err := createCollections(
collections, err := CreateCollections(
ctx,
bpc,
handlers,
@ -1198,7 +1198,7 @@ func checkMetadata(
expect DeltaPaths,
c data.BackupCollection,
) {
catPaths, _, err := parseMetadataCollections(
catPaths, _, err := ParseMetadataCollections(
ctx,
[]data.RestoreCollection{data.NoFetchRestoreCollection{Collection: c}})
if !assert.NoError(t, err, "getting metadata", clues.ToCore(err)) {

View File

@ -31,7 +31,7 @@ func newContactRestoreHandler(
}
}
func (h contactRestoreHandler) newContainerCache(userID string) graph.ContainerResolver {
func (h contactRestoreHandler) NewContainerCache(userID string) graph.ContainerResolver {
return &contactContainerCache{
userID: userID,
enumer: h.ac,
@ -39,7 +39,7 @@ func (h contactRestoreHandler) newContainerCache(userID string) graph.ContainerR
}
}
func (h contactRestoreHandler) formatRestoreDestination(
func (h contactRestoreHandler) FormatRestoreDestination(
destinationContainerName string,
_ path.Path, // contact folders cannot be nested
) *path.Builder {
@ -60,7 +60,7 @@ func (h contactRestoreHandler) GetContainerByName(
return h.ac.GetContainerByName(ctx, userID, "", containerName)
}
func (h contactRestoreHandler) defaultRootContainer() string {
func (h contactRestoreHandler) DefaultRootContainer() string {
return api.DefaultContacts
}
@ -163,7 +163,7 @@ func restoreContact(
return info, nil
}
func (h contactRestoreHandler) getItemsInContainerByCollisionKey(
func (h contactRestoreHandler) GetItemsInContainerByCollisionKey(
ctx context.Context,
userID, containerID string,
) (map[string]string, error) {

View File

@ -804,10 +804,10 @@ func runCreateDestinationTest(
var (
svc = path.ExchangeService
gcc = handler.newContainerCache(userID)
gcc = handler.NewContainerCache(userID)
)
err := gcc.Populate(ctx, fault.New(true), handler.defaultRootContainer())
err := gcc.Populate(ctx, fault.New(true), handler.DefaultRootContainer())
require.NoError(t, err, clues.ToCore(err))
path1, err := path.Build(
@ -819,10 +819,10 @@ func runCreateDestinationTest(
containerNames1...)
require.NoError(t, err, clues.ToCore(err))
containerID, gcc, err := createDestination(
containerID, gcc, err := CreateDestination(
ctx,
handler,
handler.formatRestoreDestination(destinationName, path1),
handler.FormatRestoreDestination(destinationName, path1),
userID,
gcc,
fault.New(true))
@ -840,10 +840,10 @@ func runCreateDestinationTest(
containerNames2...)
require.NoError(t, err, clues.ToCore(err))
containerID, gcc, err = createDestination(
containerID, gcc, err = CreateDestination(
ctx,
handler,
handler.formatRestoreDestination(destinationName, path2),
handler.FormatRestoreDestination(destinationName, path2),
userID,
gcc,
fault.New(true))

View File

@ -32,7 +32,7 @@ func newEventRestoreHandler(
}
}
func (h eventRestoreHandler) newContainerCache(userID string) graph.ContainerResolver {
func (h eventRestoreHandler) NewContainerCache(userID string) graph.ContainerResolver {
return &eventContainerCache{
userID: userID,
enumer: h.ac,
@ -40,7 +40,7 @@ func (h eventRestoreHandler) newContainerCache(userID string) graph.ContainerRes
}
}
func (h eventRestoreHandler) formatRestoreDestination(
func (h eventRestoreHandler) FormatRestoreDestination(
destinationContainerName string,
_ path.Path, // ignored because calendars cannot be nested
) *path.Builder {
@ -66,7 +66,7 @@ func (h eventRestoreHandler) GetContainerByName(
}
// always returns the provided value
func (h eventRestoreHandler) defaultRootContainer() string {
func (h eventRestoreHandler) DefaultRootContainer() string {
return api.DefaultCalendar
}
@ -323,7 +323,7 @@ func updateAttachments(
return el.Failure()
}
func (h eventRestoreHandler) getItemsInContainerByCollisionKey(
func (h eventRestoreHandler) GetItemsInContainerByCollisionKey(
ctx context.Context,
userID, containerID string,
) (map[string]string, error) {

View File

@ -63,8 +63,8 @@ type restoreHandler interface {
itemRestorer
containerAPI
getItemsByCollisionKeyser
newContainerCache(userID string) graph.ContainerResolver
formatRestoreDestination(
NewContainerCache(userID string) graph.ContainerResolver
FormatRestoreDestination(
destinationContainerName string,
collectionFullPath path.Path,
) *path.Builder
@ -95,7 +95,7 @@ type containerAPI interface {
ctx context.Context,
userID, parentContainerID, containerName string,
) (graph.Container, error)
defaultRootContainer() string
DefaultRootContainer() string
}
type containerByNamer interface {
@ -107,7 +107,7 @@ type containerByNamer interface {
}
// primary interface controller for all per-cateogry restoration behavior.
func restoreHandlers(
func RestoreHandlers(
ac api.Client,
) map[path.CategoryType]restoreHandler {
return map[path.CategoryType]restoreHandler{
@ -124,7 +124,7 @@ type getItemsByCollisionKeyser interface {
// Collision key checks are used during restore to handle the on-
// collision restore configurations that cause the item restore to get
// skipped, replaced, or copied.
getItemsInContainerByCollisionKey(
GetItemsInContainerByCollisionKey(
ctx context.Context,
userID, containerID string,
) (map[string]string, error)

View File

@ -32,7 +32,7 @@ func newMailRestoreHandler(
}
}
func (h mailRestoreHandler) newContainerCache(userID string) graph.ContainerResolver {
func (h mailRestoreHandler) NewContainerCache(userID string) graph.ContainerResolver {
return &mailContainerCache{
userID: userID,
enumer: h.ac,
@ -40,7 +40,7 @@ func (h mailRestoreHandler) newContainerCache(userID string) graph.ContainerReso
}
}
func (h mailRestoreHandler) formatRestoreDestination(
func (h mailRestoreHandler) FormatRestoreDestination(
destinationContainerName string,
collectionFullPath path.Path,
) *path.Builder {
@ -65,7 +65,7 @@ func (h mailRestoreHandler) GetContainerByName(
return h.ac.GetContainerByName(ctx, userID, parentContainerID, containerName)
}
func (h mailRestoreHandler) defaultRootContainer() string {
func (h mailRestoreHandler) DefaultRootContainer() string {
return api.MsgFolderRoot
}
@ -216,7 +216,7 @@ func setMessageSVEPs(msg models.Messageable) models.Messageable {
return msg
}
func (h mailRestoreHandler) getItemsInContainerByCollisionKey(
func (h mailRestoreHandler) GetItemsInContainerByCollisionKey(
ctx context.Context,
userID, containerID string,
) (map[string]string, error) {

View File

@ -0,0 +1,164 @@
package exchange
import (
"context"
"encoding/json"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
)
// MetadataFileNames produces the category-specific set of filenames used to
// store graph metadata such as delta tokens and folderID->path references.
func MetadataFileNames(cat path.CategoryType) []string {
switch cat {
case path.EmailCategory, path.ContactsCategory:
return []string{graph.DeltaURLsFileName, graph.PreviousPathFileName}
default:
return []string{graph.PreviousPathFileName}
}
}
type CatDeltaPaths map[path.CategoryType]DeltaPaths
type DeltaPaths map[string]DeltaPath
func (dps DeltaPaths) AddDelta(k, d string) {
dp, ok := dps[k]
if !ok {
dp = DeltaPath{}
}
dp.Delta = d
dps[k] = dp
}
func (dps DeltaPaths) AddPath(k, p string) {
dp, ok := dps[k]
if !ok {
dp = DeltaPath{}
}
dp.Path = p
dps[k] = dp
}
type DeltaPath struct {
Delta string
Path string
}
// ParseMetadataCollections produces a map of structs holding delta
// and path lookup maps.
func ParseMetadataCollections(
ctx context.Context,
colls []data.RestoreCollection,
) (CatDeltaPaths, bool, error) {
// cdp stores metadata
cdp := CatDeltaPaths{
path.ContactsCategory: {},
path.EmailCategory: {},
path.EventsCategory: {},
}
// found tracks the metadata we've loaded, to make sure we don't
// fetch overlapping copies.
found := map[path.CategoryType]map[string]struct{}{
path.ContactsCategory: {},
path.EmailCategory: {},
path.EventsCategory: {},
}
// errors from metadata items should not stop the backup,
// but it should prevent us from using previous backups
errs := fault.New(true)
for _, coll := range colls {
var (
breakLoop bool
items = coll.Items(ctx, errs)
category = coll.FullPath().Category()
)
for {
select {
case <-ctx.Done():
return nil, false, clues.Wrap(ctx.Err(), "parsing collection metadata").WithClues(ctx)
case item, ok := <-items:
if !ok || errs.Failure() != nil {
breakLoop = true
break
}
var (
m = map[string]string{}
cdps = cdp[category]
)
err := json.NewDecoder(item.ToReader()).Decode(&m)
if err != nil {
return nil, false, clues.New("decoding metadata json").WithClues(ctx)
}
switch item.ID() {
case graph.PreviousPathFileName:
if _, ok := found[category]["path"]; ok {
return nil, false, clues.Wrap(clues.New(category.String()), "multiple versions of path metadata").WithClues(ctx)
}
for k, p := range m {
cdps.AddPath(k, p)
}
found[category]["path"] = struct{}{}
case graph.DeltaURLsFileName:
if _, ok := found[category]["delta"]; ok {
return nil, false, clues.Wrap(clues.New(category.String()), "multiple versions of delta metadata").WithClues(ctx)
}
for k, d := range m {
cdps.AddDelta(k, d)
}
found[category]["delta"] = struct{}{}
}
cdp[category] = cdps
}
if breakLoop {
break
}
}
}
if errs.Failure() != nil {
logger.CtxErr(ctx, errs.Failure()).Info("reading metadata collection items")
return CatDeltaPaths{
path.ContactsCategory: {},
path.EmailCategory: {},
path.EventsCategory: {},
}, false, nil
}
// Remove any entries that contain a path or a delta, but not both.
// That metadata is considered incomplete, and needs to incur a
// complete backup on the next run.
for _, dps := range cdp {
for k, dp := range dps {
if len(dp.Path) == 0 {
delete(dps, k)
}
}
}
return cdp, true, nil
}

View File

@ -0,0 +1,255 @@
package exchange
import (
"bytes"
"context"
"runtime/trace"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
)
// RestoreCollection handles restoration of an individual collection.
func RestoreCollection(
ctx context.Context,
ir itemRestorer,
dc data.RestoreCollection,
resourceID, destinationID string,
collisionKeyToItemID map[string]string,
collisionPolicy control.CollisionPolicy,
deets *details.Builder,
errs *fault.Bus,
ctr *count.Bus,
) (support.CollectionMetrics, error) {
ctx, end := diagnostics.Span(ctx, "m365:exchange:restoreCollection", diagnostics.Label("path", dc.FullPath()))
defer end()
var (
el = errs.Local()
metrics support.CollectionMetrics
items = dc.Items(ctx, errs)
fullPath = dc.FullPath()
category = fullPath.Category()
)
colProgress := observe.CollectionProgress(
ctx,
category.String(),
fullPath.Folder(false))
defer close(colProgress)
for {
select {
case <-ctx.Done():
return metrics, clues.Wrap(ctx.Err(), "context cancelled").WithClues(ctx)
case itemData, ok := <-items:
if !ok || el.Failure() != nil {
return metrics, el.Failure()
}
ictx := clues.Add(ctx, "item_id", itemData.ID())
trace.Log(ictx, "m365:exchange:restoreCollection:item", itemData.ID())
metrics.Objects++
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(itemData.ToReader())
if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "reading item bytes").WithClues(ictx))
continue
}
body := buf.Bytes()
info, err := ir.restore(
ictx,
body,
resourceID,
destinationID,
collisionKeyToItemID,
collisionPolicy,
errs,
ctr)
if err != nil {
if !graph.IsErrItemAlreadyExistsConflict(err) {
el.AddRecoverable(ictx, err)
}
continue
}
metrics.Bytes += int64(len(body))
metrics.Successes++
// FIXME: this may be the incorrect path. If we restored within a top-level
// destination folder, then the restore path no longer matches the fullPath.
itemPath, err := fullPath.AppendItem(itemData.ID())
if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "adding item to collection path").WithClues(ctx))
continue
}
locationRef := path.Builder{}.Append(itemPath.Folders()...)
err = deets.Add(
itemPath,
locationRef,
details.ItemInfo{
Exchange: info,
})
if err != nil {
// These deets additions are for cli display purposes only.
// no need to fail out on error.
logger.Ctx(ctx).Infow("accounting for restored item", "error", err)
}
colProgress <- struct{}{}
}
}
}
// CreateDestination creates folders in sequence
// [root leaf1 leaf2] similar to a linked list.
// @param directory is the desired path from the root to the container
// that the items will be restored into.
func CreateDestination(
ctx context.Context,
ca containerAPI,
destination *path.Builder,
resourceID string,
gcr graph.ContainerResolver,
errs *fault.Bus,
) (string, graph.ContainerResolver, error) {
var (
cache = gcr
restoreLoc = &path.Builder{}
containerParentID string
)
for _, container := range destination.Elements() {
restoreLoc = restoreLoc.Append(container)
ictx := clues.Add(
ctx,
"container_parent_id", containerParentID,
"container_name", container,
"restore_location", restoreLoc)
containerID, err := getOrPopulateContainer(
ictx,
ca,
cache,
restoreLoc,
resourceID,
containerParentID,
container,
errs)
if err != nil {
return "", cache, clues.Stack(err)
}
containerParentID = containerID
}
// containerParentID now identifies the last created container,
// not its parent.
return containerParentID, cache, nil
}
func getOrPopulateContainer(
ctx context.Context,
ca containerAPI,
gcr graph.ContainerResolver,
restoreLoc *path.Builder,
resourceID, containerParentID, containerName string,
errs *fault.Bus,
) (string, error) {
cached, ok := gcr.LocationInCache(restoreLoc.String())
if ok {
return cached, nil
}
c, err := ca.CreateContainer(ctx, resourceID, containerParentID, containerName)
// 409 handling case:
// attempt to fetch the container by name and add that result to the cache.
// This is rare, but may happen if CreateContainer() POST fails with 5xx:
// sometimes the backend will create the folder despite the 5xx response,
// leaving our local containerResolver with inconsistent state.
if graph.IsErrFolderExists(err) {
cc, e := ca.GetContainerByName(ctx, resourceID, containerParentID, containerName)
if e != nil {
err = clues.Stack(err, e)
} else {
c = cc
err = nil
}
}
if err != nil {
return "", clues.Wrap(err, "creating restore container")
}
folderID := ptr.Val(c.GetId())
if err = gcr.AddToCache(ctx, c); err != nil {
return "", clues.Wrap(err, "adding container to cache")
}
return folderID, nil
}
func uploadAttachments(
ctx context.Context,
ap attachmentPoster,
as []models.Attachmentable,
resourceID, destinationID, itemID string,
errs *fault.Bus,
) error {
el := errs.Local()
for _, a := range as {
if el.Failure() != nil {
return el.Failure()
}
err := uploadAttachment(
ctx,
ap,
resourceID,
destinationID,
itemID,
a)
if err != nil {
// FIXME: I don't know why we're swallowing this error case.
// It needs investigation: https://github.com/alcionai/corso/issues/3498
if ptr.Val(a.GetOdataType()) == "#microsoft.graph.itemAttachment" {
name := ptr.Val(a.GetName())
logger.CtxErr(ctx, err).
With("attachment_name", name).
Info("mail upload failed")
continue
}
el.AddRecoverable(ctx, clues.Wrap(err, "uploading mail attachment").WithClues(ctx))
}
}
return el.Failure()
}

View File

@ -166,7 +166,7 @@ func (suite *RestoreIntgSuite) TestRestoreEvent() {
func (suite *RestoreIntgSuite) TestRestoreExchangeObject() {
t := suite.T()
handlers := restoreHandlers(suite.ac)
handlers := RestoreHandlers(suite.ac)
userID := tconfig.M365UserID(suite.T())

View File

@ -2,23 +2,17 @@ package exchange
import (
"context"
"encoding/json"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/common/pii"
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/collection/exchange"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -41,7 +35,7 @@ func ProduceBackupCollections(
collections = []data.BackupCollection{}
el = errs.Local()
categories = map[path.CategoryType]struct{}{}
handlers = BackupHandlers(ac)
handlers = exchange.BackupHandlers(ac)
)
// Turn on concurrency limiter middleware for exchange backups
@ -51,7 +45,7 @@ func ProduceBackupCollections(
bpc.Options.ToggleFeatures.DisableConcurrencyLimiter,
bpc.Options.Parallelism.ItemFetch)
cdps, canUsePreviousBackup, err := parseMetadataCollections(ctx, bpc.MetadataCollections)
cdps, canUsePreviousBackup, err := exchange.ParseMetadataCollections(ctx, bpc.MetadataCollections)
if err != nil {
return nil, nil, false, err
}
@ -63,7 +57,7 @@ func ProduceBackupCollections(
break
}
dcs, err := createCollections(
dcs, err := exchange.CreateCollections(
ctx,
bpc,
handlers,
@ -101,518 +95,3 @@ func ProduceBackupCollections(
return collections, nil, canUsePreviousBackup, el.Failure()
}
// createCollections - utility function that retrieves M365
// IDs through Microsoft Graph API. The selectors.ExchangeScope
// determines the type of collections that are retrieved.
func createCollections(
ctx context.Context,
bpc inject.BackupProducerConfig,
handlers map[path.CategoryType]backupHandler,
tenantID string,
scope selectors.ExchangeScope,
dps DeltaPaths,
su support.StatusUpdater,
errs *fault.Bus,
) ([]data.BackupCollection, error) {
ctx = clues.Add(ctx, "category", scope.Category().PathType())
var (
allCollections = make([]data.BackupCollection, 0)
category = scope.Category().PathType()
qp = graph.QueryParams{
Category: category,
ProtectedResource: bpc.ProtectedResource,
TenantID: tenantID,
}
)
handler, ok := handlers[category]
if !ok {
return nil, clues.New("unsupported backup category type").WithClues(ctx)
}
foldersComplete := observe.MessageWithCompletion(
ctx,
observe.Bulletf("%s", qp.Category))
defer close(foldersComplete)
rootFolder, cc := handler.NewContainerCache(bpc.ProtectedResource.ID())
if err := cc.Populate(ctx, errs, rootFolder); err != nil {
return nil, clues.Wrap(err, "populating container cache")
}
collections, err := populateCollections(
ctx,
qp,
handler,
su,
cc,
scope,
dps,
bpc.Options,
errs)
if err != nil {
return nil, clues.Wrap(err, "filling collections")
}
foldersComplete <- struct{}{}
for _, coll := range collections {
allCollections = append(allCollections, coll)
}
return allCollections, nil
}
// populateCollections is a utility function
// that places the M365 object ids belonging to specific directories
// into a BackupCollection. Messages outside of those directories are omitted.
// @param collection is filled with during this function.
// Supports all exchange applications: Contacts, Events, and Mail
//
// TODO(ashmrtn): This should really return []data.BackupCollection but
// unfortunately some of our tests rely on being able to lookup returned
// collections by ID and it would be non-trivial to change them.
func populateCollections(
ctx context.Context,
qp graph.QueryParams,
bh backupHandler,
statusUpdater support.StatusUpdater,
resolver graph.ContainerResolver,
scope selectors.ExchangeScope,
dps DeltaPaths,
ctrlOpts control.Options,
errs *fault.Bus,
) (map[string]data.BackupCollection, error) {
var (
// folder ID -> BackupCollection.
collections = map[string]data.BackupCollection{}
// folder ID -> delta url or folder path lookups
deltaURLs = map[string]string{}
currPaths = map[string]string{}
// copy of previousPaths. any folder found in the resolver get
// deleted from this map, leaving only the deleted folders behind
tombstones = makeTombstones(dps)
category = qp.Category
)
logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps))
el := errs.Local()
for _, c := range resolver.Items() {
if el.Failure() != nil {
return nil, el.Failure()
}
cID := ptr.Val(c.GetId())
delete(tombstones, cID)
var (
err error
dp = dps[cID]
prevDelta = dp.Delta
prevPathStr = dp.Path // do not log: pii; log prevPath instead
prevPath path.Path
ictx = clues.Add(
ctx,
"container_id", cID,
"previous_delta", pii.SafeURL{
URL: prevDelta,
SafePathElems: graph.SafeURLPathParams,
SafeQueryKeys: graph.SafeURLQueryParams,
})
)
currPath, locPath, ok := includeContainer(ictx, qp, c, scope, category)
// Only create a collection if the path matches the scope.
if !ok {
continue
}
if len(prevPathStr) > 0 {
if prevPath, err = pathFromPrevString(prevPathStr); err != nil {
logger.CtxErr(ictx, err).Error("parsing prev path")
// if the previous path is unusable, then the delta must be, too.
prevDelta = ""
}
}
ictx = clues.Add(ictx, "previous_path", prevPath)
added, removed, newDelta, err := bh.itemEnumerator().
GetAddedAndRemovedItemIDs(
ictx,
qp.ProtectedResource.ID(),
cID,
prevDelta,
ctrlOpts.ToggleFeatures.ExchangeImmutableIDs,
!ctrlOpts.ToggleFeatures.DisableDelta)
if err != nil {
if !graph.IsErrDeletedInFlight(err) {
el.AddRecoverable(ctx, clues.Stack(err).Label(fault.LabelForceNoBackupCreation))
continue
}
// race conditions happen, containers might get deleted while
// this process is in flight. If that happens, force the collection
// to reset. This prevents any old items from being retained in
// storage. If the container (or its children) are sill missing
// on the next backup, they'll get tombstoned.
newDelta = api.DeltaUpdate{Reset: true}
}
if len(newDelta.URL) > 0 {
deltaURLs[cID] = newDelta.URL
} else if !newDelta.Reset {
logger.Ctx(ictx).Info("missing delta url")
}
edc := NewCollection(
qp.ProtectedResource.ID(),
currPath,
prevPath,
locPath,
category,
bh.itemHandler(),
statusUpdater,
ctrlOpts,
newDelta.Reset)
collections[cID] = &edc
for _, add := range added {
edc.added[add] = struct{}{}
}
// Remove any deleted IDs from the set of added IDs because items that are
// deleted and then restored will have a different ID than they did
// originally.
for _, remove := range removed {
delete(edc.added, remove)
edc.removed[remove] = struct{}{}
}
// add the current path for the container ID to be used in the next backup
// as the "previous path", for reference in case of a rename or relocation.
currPaths[cID] = currPath.String()
}
// A tombstone is a folder that needs to be marked for deletion.
// The only situation where a tombstone should appear is if the folder exists
// in the `previousPath` set, but does not exist in the current container
// resolver (which contains all the resource owners' current containers).
for id, p := range tombstones {
if el.Failure() != nil {
return nil, el.Failure()
}
var (
err error
ictx = clues.Add(ctx, "tombstone_id", id)
)
if collections[id] != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ictx))
continue
}
// only occurs if it was a new folder that we picked up during the container
// resolver phase that got deleted in flight by the time we hit this stage.
if len(p) == 0 {
continue
}
prevPath, err := pathFromPrevString(p)
if err != nil {
// technically shouldn't ever happen. But just in case...
logger.CtxErr(ictx, err).Error("parsing tombstone prev path")
continue
}
edc := NewCollection(
qp.ProtectedResource.ID(),
nil, // marks the collection as deleted
prevPath,
nil, // tombstones don't need a location
category,
bh.itemHandler(),
statusUpdater,
ctrlOpts,
false)
collections[id] = &edc
}
logger.Ctx(ctx).Infow(
"adding metadata collection entries",
"num_paths_entries", len(currPaths),
"num_deltas_entries", len(deltaURLs))
col, err := graph.MakeMetadataCollection(
qp.TenantID,
qp.ProtectedResource.ID(),
path.ExchangeService,
qp.Category,
[]graph.MetadataCollectionEntry{
graph.NewMetadataEntry(graph.PreviousPathFileName, currPaths),
graph.NewMetadataEntry(graph.DeltaURLsFileName, deltaURLs),
},
statusUpdater)
if err != nil {
return nil, clues.Wrap(err, "making metadata collection")
}
collections["metadata"] = col
return collections, el.Failure()
}
// produces a set of id:path pairs from the deltapaths map.
// Each entry in the set will, if not removed, produce a collection
// that will delete the tombstone by path.
func makeTombstones(dps DeltaPaths) map[string]string {
r := make(map[string]string, len(dps))
for id, v := range dps {
r[id] = v.Path
}
return r
}
func pathFromPrevString(ps string) (path.Path, error) {
p, err := path.FromDataLayerPath(ps, false)
if err != nil {
return nil, clues.Wrap(err, "parsing previous path string")
}
return p, nil
}
// Returns true if the container passes the scope comparison and should be included.
// Returns:
// - the path representing the directory as it should be stored in the repository.
// - the human-readable path using display names.
// - true if the path passes the scope comparison.
func includeContainer(
ctx context.Context,
qp graph.QueryParams,
c graph.CachedContainer,
scope selectors.ExchangeScope,
category path.CategoryType,
) (path.Path, *path.Builder, bool) {
var (
directory string
locPath path.Path
pb = c.Path()
loc = c.Location()
)
// Clause ensures that DefaultContactFolder is inspected properly
if category == path.ContactsCategory && ptr.Val(c.GetDisplayName()) == api.DefaultContacts {
loc = loc.Append(api.DefaultContacts)
}
dirPath, err := pb.ToDataLayerExchangePathForCategory(
qp.TenantID,
qp.ProtectedResource.ID(),
category,
false)
// Containers without a path (e.g. Root mail folder) always err here.
if err != nil {
return nil, nil, false
}
directory = dirPath.Folder(false)
if loc != nil {
locPath, err = loc.ToDataLayerExchangePathForCategory(
qp.TenantID,
qp.ProtectedResource.ID(),
category,
false)
// Containers without a path (e.g. Root mail folder) always err here.
if err != nil {
return nil, nil, false
}
directory = locPath.Folder(false)
}
var ok bool
switch category {
case path.EmailCategory:
ok = scope.Matches(selectors.ExchangeMailFolder, directory)
case path.ContactsCategory:
ok = scope.Matches(selectors.ExchangeContactFolder, directory)
case path.EventsCategory:
ok = scope.Matches(selectors.ExchangeEventCalendar, directory)
default:
return nil, nil, false
}
logger.Ctx(ctx).With(
"included", ok,
"scope", scope,
"matches_input", directory,
).Debug("backup folder selection filter")
return dirPath, loc, ok
}
// ---------------------------------------------------------------------------
// metadata collection parsing
// ---------------------------------------------------------------------------
// MetadataFileNames produces the category-specific set of filenames used to
// store graph metadata such as delta tokens and folderID->path references.
func MetadataFileNames(cat path.CategoryType) []string {
switch cat {
case path.EmailCategory, path.ContactsCategory:
return []string{graph.DeltaURLsFileName, graph.PreviousPathFileName}
default:
return []string{graph.PreviousPathFileName}
}
}
type CatDeltaPaths map[path.CategoryType]DeltaPaths
type DeltaPaths map[string]DeltaPath
func (dps DeltaPaths) AddDelta(k, d string) {
dp, ok := dps[k]
if !ok {
dp = DeltaPath{}
}
dp.Delta = d
dps[k] = dp
}
func (dps DeltaPaths) AddPath(k, p string) {
dp, ok := dps[k]
if !ok {
dp = DeltaPath{}
}
dp.Path = p
dps[k] = dp
}
type DeltaPath struct {
Delta string
Path string
}
// ParseMetadataCollections produces a map of structs holding delta
// and path lookup maps.
func parseMetadataCollections(
ctx context.Context,
colls []data.RestoreCollection,
) (CatDeltaPaths, bool, error) {
// cdp stores metadata
cdp := CatDeltaPaths{
path.ContactsCategory: {},
path.EmailCategory: {},
path.EventsCategory: {},
}
// found tracks the metadata we've loaded, to make sure we don't
// fetch overlapping copies.
found := map[path.CategoryType]map[string]struct{}{
path.ContactsCategory: {},
path.EmailCategory: {},
path.EventsCategory: {},
}
// errors from metadata items should not stop the backup,
// but it should prevent us from using previous backups
errs := fault.New(true)
for _, coll := range colls {
var (
breakLoop bool
items = coll.Items(ctx, errs)
category = coll.FullPath().Category()
)
for {
select {
case <-ctx.Done():
return nil, false, clues.Wrap(ctx.Err(), "parsing collection metadata").WithClues(ctx)
case item, ok := <-items:
if !ok || errs.Failure() != nil {
breakLoop = true
break
}
var (
m = map[string]string{}
cdps = cdp[category]
)
err := json.NewDecoder(item.ToReader()).Decode(&m)
if err != nil {
return nil, false, clues.New("decoding metadata json").WithClues(ctx)
}
switch item.ID() {
case graph.PreviousPathFileName:
if _, ok := found[category]["path"]; ok {
return nil, false, clues.Wrap(clues.New(category.String()), "multiple versions of path metadata").WithClues(ctx)
}
for k, p := range m {
cdps.AddPath(k, p)
}
found[category]["path"] = struct{}{}
case graph.DeltaURLsFileName:
if _, ok := found[category]["delta"]; ok {
return nil, false, clues.Wrap(clues.New(category.String()), "multiple versions of delta metadata").WithClues(ctx)
}
for k, d := range m {
cdps.AddDelta(k, d)
}
found[category]["delta"] = struct{}{}
}
cdp[category] = cdps
}
if breakLoop {
break
}
}
}
if errs.Failure() != nil {
logger.CtxErr(ctx, errs.Failure()).Info("reading metadata collection items")
return CatDeltaPaths{
path.ContactsCategory: {},
path.EmailCategory: {},
path.EventsCategory: {},
}, false, nil
}
// Remove any entries that contain a path or a delta, but not both.
// That metadata is considered incomplete, and needs to incur a
// complete backup on the next run.
for _, dps := range cdp {
for k, dp := range dps {
if len(dp.Path) == 0 {
delete(dps, k)
}
}
}
return cdp, true, nil
}

View File

@ -1,25 +1,18 @@
package exchange
import (
"bytes"
"context"
"runtime/trace"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/m365/collection/exchange"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/support"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
@ -42,7 +35,7 @@ func ConsumeRestoreCollections(
var (
resourceID = rcc.ProtectedResource.ID()
directoryCache = make(map[path.CategoryType]graph.ContainerResolver)
handlers = restoreHandlers(ac)
handlers = exchange.RestoreHandlers(ac)
metrics support.CollectionMetrics
el = errs.Local()
)
@ -67,18 +60,18 @@ func ConsumeRestoreCollections(
}
if directoryCache[category] == nil {
gcr := handler.newContainerCache(resourceID)
if err := gcr.Populate(ctx, errs, handler.defaultRootContainer()); err != nil {
gcr := handler.NewContainerCache(resourceID)
if err := gcr.Populate(ctx, errs, handler.DefaultRootContainer()); err != nil {
return nil, clues.Wrap(err, "populating container cache")
}
directoryCache[category] = gcr
}
containerID, gcc, err := createDestination(
containerID, gcc, err := exchange.CreateDestination(
ictx,
handler,
handler.formatRestoreDestination(rcc.RestoreConfig.Location, dc.FullPath()),
handler.FormatRestoreDestination(rcc.RestoreConfig.Location, dc.FullPath()),
resourceID,
directoryCache[category],
errs)
@ -90,13 +83,13 @@ func ConsumeRestoreCollections(
directoryCache[category] = gcc
ictx = clues.Add(ictx, "restore_destination_id", containerID)
collisionKeyToItemID, err := handler.getItemsInContainerByCollisionKey(ctx, resourceID, containerID)
collisionKeyToItemID, err := handler.GetItemsInContainerByCollisionKey(ctx, resourceID, containerID)
if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "building item collision cache"))
continue
}
temp, err := restoreCollection(
temp, err := exchange.RestoreCollection(
ictx,
handler,
dc,
@ -128,235 +121,3 @@ func ConsumeRestoreCollections(
return status, el.Failure()
}
// restoreCollection handles restoration of an individual collection.
func restoreCollection(
ctx context.Context,
ir itemRestorer,
dc data.RestoreCollection,
resourceID, destinationID string,
collisionKeyToItemID map[string]string,
collisionPolicy control.CollisionPolicy,
deets *details.Builder,
errs *fault.Bus,
ctr *count.Bus,
) (support.CollectionMetrics, error) {
ctx, end := diagnostics.Span(ctx, "m365:exchange:restoreCollection", diagnostics.Label("path", dc.FullPath()))
defer end()
var (
el = errs.Local()
metrics support.CollectionMetrics
items = dc.Items(ctx, errs)
fullPath = dc.FullPath()
category = fullPath.Category()
)
colProgress := observe.CollectionProgress(
ctx,
category.String(),
fullPath.Folder(false))
defer close(colProgress)
for {
select {
case <-ctx.Done():
return metrics, clues.Wrap(ctx.Err(), "context cancelled").WithClues(ctx)
case itemData, ok := <-items:
if !ok || el.Failure() != nil {
return metrics, el.Failure()
}
ictx := clues.Add(ctx, "item_id", itemData.ID())
trace.Log(ictx, "m365:exchange:restoreCollection:item", itemData.ID())
metrics.Objects++
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(itemData.ToReader())
if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "reading item bytes").WithClues(ictx))
continue
}
body := buf.Bytes()
info, err := ir.restore(
ictx,
body,
resourceID,
destinationID,
collisionKeyToItemID,
collisionPolicy,
errs,
ctr)
if err != nil {
if !graph.IsErrItemAlreadyExistsConflict(err) {
el.AddRecoverable(ictx, err)
}
continue
}
metrics.Bytes += int64(len(body))
metrics.Successes++
// FIXME: this may be the incorrect path. If we restored within a top-level
// destination folder, then the restore path no longer matches the fullPath.
itemPath, err := fullPath.AppendItem(itemData.ID())
if err != nil {
el.AddRecoverable(ctx, clues.Wrap(err, "adding item to collection path").WithClues(ctx))
continue
}
locationRef := path.Builder{}.Append(itemPath.Folders()...)
err = deets.Add(
itemPath,
locationRef,
details.ItemInfo{
Exchange: info,
})
if err != nil {
// These deets additions are for cli display purposes only.
// no need to fail out on error.
logger.Ctx(ctx).Infow("accounting for restored item", "error", err)
}
colProgress <- struct{}{}
}
}
}
// createDestination creates folders in sequence
// [root leaf1 leaf2] similar to a linked list.
// @param directory is the desired path from the root to the container
// that the items will be restored into.
func createDestination(
ctx context.Context,
ca containerAPI,
destination *path.Builder,
resourceID string,
gcr graph.ContainerResolver,
errs *fault.Bus,
) (string, graph.ContainerResolver, error) {
var (
cache = gcr
restoreLoc = &path.Builder{}
containerParentID string
)
for _, container := range destination.Elements() {
restoreLoc = restoreLoc.Append(container)
ictx := clues.Add(
ctx,
"container_parent_id", containerParentID,
"container_name", container,
"restore_location", restoreLoc)
containerID, err := getOrPopulateContainer(
ictx,
ca,
cache,
restoreLoc,
resourceID,
containerParentID,
container,
errs)
if err != nil {
return "", cache, clues.Stack(err)
}
containerParentID = containerID
}
// containerParentID now identifies the last created container,
// not its parent.
return containerParentID, cache, nil
}
func getOrPopulateContainer(
ctx context.Context,
ca containerAPI,
gcr graph.ContainerResolver,
restoreLoc *path.Builder,
resourceID, containerParentID, containerName string,
errs *fault.Bus,
) (string, error) {
cached, ok := gcr.LocationInCache(restoreLoc.String())
if ok {
return cached, nil
}
c, err := ca.CreateContainer(ctx, resourceID, containerParentID, containerName)
// 409 handling case:
// attempt to fetch the container by name and add that result to the cache.
// This is rare, but may happen if CreateContainer() POST fails with 5xx:
// sometimes the backend will create the folder despite the 5xx response,
// leaving our local containerResolver with inconsistent state.
if graph.IsErrFolderExists(err) {
cc, e := ca.GetContainerByName(ctx, resourceID, containerParentID, containerName)
if e != nil {
err = clues.Stack(err, e)
} else {
c = cc
err = nil
}
}
if err != nil {
return "", clues.Wrap(err, "creating restore container")
}
folderID := ptr.Val(c.GetId())
if err = gcr.AddToCache(ctx, c); err != nil {
return "", clues.Wrap(err, "adding container to cache")
}
return folderID, nil
}
func uploadAttachments(
ctx context.Context,
ap attachmentPoster,
as []models.Attachmentable,
resourceID, destinationID, itemID string,
errs *fault.Bus,
) error {
el := errs.Local()
for _, a := range as {
if el.Failure() != nil {
return el.Failure()
}
err := uploadAttachment(
ctx,
ap,
resourceID,
destinationID,
itemID,
a)
if err != nil {
// FIXME: I don't know why we're swallowing this error case.
// It needs investigation: https://github.com/alcionai/corso/issues/3498
if ptr.Val(a.GetOdataType()) == "#microsoft.graph.itemAttachment" {
name := ptr.Val(a.GetName())
logger.CtxErr(ctx, err).
With("attachment_name", name).
Info("mail upload failed")
continue
}
el.AddRecoverable(ctx, clues.Wrap(err, "uploading mail attachment").WithClues(ctx))
}
}
return el.Failure()
}

View File

@ -7,8 +7,8 @@ import (
"github.com/alcionai/clues"
"github.com/stretchr/testify/require"
"github.com/alcionai/corso/src/internal/m365/collection/exchange"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/service/exchange"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"

View File

@ -18,9 +18,9 @@ import (
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/events"
evmock "github.com/alcionai/corso/src/internal/events/mock"
"github.com/alcionai/corso/src/internal/m365/collection/exchange"
"github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/resource"
"github.com/alcionai/corso/src/internal/m365/service/exchange"
exchMock "github.com/alcionai/corso/src/internal/m365/service/exchange/mock"
exchTD "github.com/alcionai/corso/src/internal/m365/service/exchange/testdata"
"github.com/alcionai/corso/src/internal/tester"