From 7aed7eba0ed6affe99dfb039d448dd79326f249e Mon Sep 17 00:00:00 2001 From: Abin Simon Date: Mon, 14 Aug 2023 12:38:11 +0530 Subject: [PATCH] Split exchange collection from service (#4011) --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [ ] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [x] :broom: Tech Debt/Cleanup #### Issue(s) * # #### Test Plan - [ ] :muscle: Manual - [ ] :zap: Unit test - [ ] :green_heart: E2E --- .../exchange/attachment.go | 0 .../exchange/attendees.go | 0 .../m365/collection/exchange/backup.go | 382 +++++++++++++ .../exchange/backup_test.go | 20 +- .../exchange/cache_container.go | 0 .../exchange/collection.go | 0 .../exchange/collection_test.go | 0 .../exchange/consts.go | 0 .../exchange/contacts_backup.go | 0 .../exchange/contacts_container_cache.go | 0 .../exchange/contacts_restore.go | 8 +- .../exchange/contacts_restore_test.go | 0 .../exchange/container_resolver.go | 0 .../exchange/container_resolver_test.go | 12 +- .../exchange/events_backup.go | 0 .../exchange/events_container_cache.go | 0 .../exchange/events_instance_restore.go | 0 .../exchange/events_restore.go | 8 +- .../exchange/events_restore_test.go | 0 .../exchange/handlers.go | 10 +- .../exchange/helper_test.go | 0 .../exchange/mail_backup.go | 0 .../exchange/mail_container_cache.go | 0 .../exchange/mail_container_cache_test.go | 0 .../exchange/mail_restore.go | 8 +- .../exchange/mail_restore_test.go | 0 .../m365/collection/exchange/metadata.go | 164 ++++++ .../m365/collection/exchange/restore.go | 255 +++++++++ .../exchange/restore_test.go | 2 +- .../exchange/transform.go | 0 .../exchange/transform_test.go | 0 src/internal/m365/service/exchange/backup.go | 529 +----------------- src/internal/m365/service/exchange/restore.go | 255 +-------- .../service/exchange/testdata/handlers.go | 2 +- src/internal/operations/test/exchange_test.go | 2 +- 35 files changed, 849 insertions(+), 808 deletions(-) rename src/internal/m365/{service => collection}/exchange/attachment.go (100%) rename src/internal/m365/{service => collection}/exchange/attendees.go (100%) create mode 100644 src/internal/m365/collection/exchange/backup.go rename src/internal/m365/{service => collection}/exchange/backup_test.go (99%) rename src/internal/m365/{service => collection}/exchange/cache_container.go (100%) rename src/internal/m365/{service => collection}/exchange/collection.go (100%) rename src/internal/m365/{service => collection}/exchange/collection_test.go (100%) rename src/internal/m365/{service => collection}/exchange/consts.go (100%) rename src/internal/m365/{service => collection}/exchange/contacts_backup.go (100%) rename src/internal/m365/{service => collection}/exchange/contacts_container_cache.go (100%) rename src/internal/m365/{service => collection}/exchange/contacts_restore.go (94%) rename src/internal/m365/{service => collection}/exchange/contacts_restore_test.go (100%) rename src/internal/m365/{service => collection}/exchange/container_resolver.go (100%) rename src/internal/m365/{service => collection}/exchange/container_resolver_test.go (98%) rename src/internal/m365/{service => collection}/exchange/events_backup.go (100%) rename src/internal/m365/{service => collection}/exchange/events_container_cache.go (100%) rename src/internal/m365/{service => collection}/exchange/events_instance_restore.go (100%) rename src/internal/m365/{service => collection}/exchange/events_restore.go (97%) rename src/internal/m365/{service => collection}/exchange/events_restore_test.go (100%) rename src/internal/m365/{service => collection}/exchange/handlers.go (95%) rename src/internal/m365/{service => collection}/exchange/helper_test.go (100%) rename src/internal/m365/{service => collection}/exchange/mail_backup.go (100%) rename src/internal/m365/{service => collection}/exchange/mail_container_cache.go (100%) rename src/internal/m365/{service => collection}/exchange/mail_container_cache_test.go (100%) rename src/internal/m365/{service => collection}/exchange/mail_restore.go (95%) rename src/internal/m365/{service => collection}/exchange/mail_restore_test.go (100%) create mode 100644 src/internal/m365/collection/exchange/metadata.go create mode 100644 src/internal/m365/collection/exchange/restore.go rename src/internal/m365/{service => collection}/exchange/restore_test.go (99%) rename src/internal/m365/{service => collection}/exchange/transform.go (100%) rename src/internal/m365/{service => collection}/exchange/transform_test.go (100%) diff --git a/src/internal/m365/service/exchange/attachment.go b/src/internal/m365/collection/exchange/attachment.go similarity index 100% rename from src/internal/m365/service/exchange/attachment.go rename to src/internal/m365/collection/exchange/attachment.go diff --git a/src/internal/m365/service/exchange/attendees.go b/src/internal/m365/collection/exchange/attendees.go similarity index 100% rename from src/internal/m365/service/exchange/attendees.go rename to src/internal/m365/collection/exchange/attendees.go diff --git a/src/internal/m365/collection/exchange/backup.go b/src/internal/m365/collection/exchange/backup.go new file mode 100644 index 000000000..359701629 --- /dev/null +++ b/src/internal/m365/collection/exchange/backup.go @@ -0,0 +1,382 @@ +package exchange + +import ( + "context" + + "github.com/alcionai/clues" + + "github.com/alcionai/corso/src/internal/common/pii" + "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/m365/graph" + "github.com/alcionai/corso/src/internal/m365/support" + "github.com/alcionai/corso/src/internal/observe" + "github.com/alcionai/corso/src/internal/operations/inject" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/selectors" + "github.com/alcionai/corso/src/pkg/services/m365/api" +) + +// CreateCollections - utility function that retrieves M365 +// IDs through Microsoft Graph API. The selectors.ExchangeScope +// determines the type of collections that are retrieved. +func CreateCollections( + ctx context.Context, + bpc inject.BackupProducerConfig, + handlers map[path.CategoryType]backupHandler, + tenantID string, + scope selectors.ExchangeScope, + dps DeltaPaths, + su support.StatusUpdater, + errs *fault.Bus, +) ([]data.BackupCollection, error) { + ctx = clues.Add(ctx, "category", scope.Category().PathType()) + + var ( + allCollections = make([]data.BackupCollection, 0) + category = scope.Category().PathType() + qp = graph.QueryParams{ + Category: category, + ProtectedResource: bpc.ProtectedResource, + TenantID: tenantID, + } + ) + + handler, ok := handlers[category] + if !ok { + return nil, clues.New("unsupported backup category type").WithClues(ctx) + } + + foldersComplete := observe.MessageWithCompletion( + ctx, + observe.Bulletf("%s", qp.Category)) + defer close(foldersComplete) + + rootFolder, cc := handler.NewContainerCache(bpc.ProtectedResource.ID()) + + if err := cc.Populate(ctx, errs, rootFolder); err != nil { + return nil, clues.Wrap(err, "populating container cache") + } + + collections, err := populateCollections( + ctx, + qp, + handler, + su, + cc, + scope, + dps, + bpc.Options, + errs) + if err != nil { + return nil, clues.Wrap(err, "filling collections") + } + + foldersComplete <- struct{}{} + + for _, coll := range collections { + allCollections = append(allCollections, coll) + } + + return allCollections, nil +} + +// populateCollections is a utility function +// that places the M365 object ids belonging to specific directories +// into a BackupCollection. Messages outside of those directories are omitted. +// @param collection is filled with during this function. +// Supports all exchange applications: Contacts, Events, and Mail +// +// TODO(ashmrtn): This should really return []data.BackupCollection but +// unfortunately some of our tests rely on being able to lookup returned +// collections by ID and it would be non-trivial to change them. +func populateCollections( + ctx context.Context, + qp graph.QueryParams, + bh backupHandler, + statusUpdater support.StatusUpdater, + resolver graph.ContainerResolver, + scope selectors.ExchangeScope, + dps DeltaPaths, + ctrlOpts control.Options, + errs *fault.Bus, +) (map[string]data.BackupCollection, error) { + var ( + // folder ID -> BackupCollection. + collections = map[string]data.BackupCollection{} + // folder ID -> delta url or folder path lookups + deltaURLs = map[string]string{} + currPaths = map[string]string{} + // copy of previousPaths. any folder found in the resolver get + // deleted from this map, leaving only the deleted folders behind + tombstones = makeTombstones(dps) + category = qp.Category + ) + + logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps)) + + el := errs.Local() + + for _, c := range resolver.Items() { + if el.Failure() != nil { + return nil, el.Failure() + } + + cID := ptr.Val(c.GetId()) + delete(tombstones, cID) + + var ( + err error + dp = dps[cID] + prevDelta = dp.Delta + prevPathStr = dp.Path // do not log: pii; log prevPath instead + prevPath path.Path + ictx = clues.Add( + ctx, + "container_id", cID, + "previous_delta", pii.SafeURL{ + URL: prevDelta, + SafePathElems: graph.SafeURLPathParams, + SafeQueryKeys: graph.SafeURLQueryParams, + }) + ) + + currPath, locPath, ok := includeContainer(ictx, qp, c, scope, category) + // Only create a collection if the path matches the scope. + if !ok { + continue + } + + if len(prevPathStr) > 0 { + if prevPath, err = pathFromPrevString(prevPathStr); err != nil { + logger.CtxErr(ictx, err).Error("parsing prev path") + // if the previous path is unusable, then the delta must be, too. + prevDelta = "" + } + } + + ictx = clues.Add(ictx, "previous_path", prevPath) + + added, removed, newDelta, err := bh.itemEnumerator(). + GetAddedAndRemovedItemIDs( + ictx, + qp.ProtectedResource.ID(), + cID, + prevDelta, + ctrlOpts.ToggleFeatures.ExchangeImmutableIDs, + !ctrlOpts.ToggleFeatures.DisableDelta) + if err != nil { + if !graph.IsErrDeletedInFlight(err) { + el.AddRecoverable(ctx, clues.Stack(err).Label(fault.LabelForceNoBackupCreation)) + continue + } + + // race conditions happen, containers might get deleted while + // this process is in flight. If that happens, force the collection + // to reset. This prevents any old items from being retained in + // storage. If the container (or its children) are sill missing + // on the next backup, they'll get tombstoned. + newDelta = api.DeltaUpdate{Reset: true} + } + + if len(newDelta.URL) > 0 { + deltaURLs[cID] = newDelta.URL + } else if !newDelta.Reset { + logger.Ctx(ictx).Info("missing delta url") + } + + edc := NewCollection( + qp.ProtectedResource.ID(), + currPath, + prevPath, + locPath, + category, + bh.itemHandler(), + statusUpdater, + ctrlOpts, + newDelta.Reset) + + collections[cID] = &edc + + for _, add := range added { + edc.added[add] = struct{}{} + } + + // Remove any deleted IDs from the set of added IDs because items that are + // deleted and then restored will have a different ID than they did + // originally. + for _, remove := range removed { + delete(edc.added, remove) + edc.removed[remove] = struct{}{} + } + + // add the current path for the container ID to be used in the next backup + // as the "previous path", for reference in case of a rename or relocation. + currPaths[cID] = currPath.String() + } + + // A tombstone is a folder that needs to be marked for deletion. + // The only situation where a tombstone should appear is if the folder exists + // in the `previousPath` set, but does not exist in the current container + // resolver (which contains all the resource owners' current containers). + for id, p := range tombstones { + if el.Failure() != nil { + return nil, el.Failure() + } + + var ( + err error + ictx = clues.Add(ctx, "tombstone_id", id) + ) + + if collections[id] != nil { + el.AddRecoverable(ctx, clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ictx)) + continue + } + + // only occurs if it was a new folder that we picked up during the container + // resolver phase that got deleted in flight by the time we hit this stage. + if len(p) == 0 { + continue + } + + prevPath, err := pathFromPrevString(p) + if err != nil { + // technically shouldn't ever happen. But just in case... + logger.CtxErr(ictx, err).Error("parsing tombstone prev path") + continue + } + + edc := NewCollection( + qp.ProtectedResource.ID(), + nil, // marks the collection as deleted + prevPath, + nil, // tombstones don't need a location + category, + bh.itemHandler(), + statusUpdater, + ctrlOpts, + false) + collections[id] = &edc + } + + logger.Ctx(ctx).Infow( + "adding metadata collection entries", + "num_paths_entries", len(currPaths), + "num_deltas_entries", len(deltaURLs)) + + col, err := graph.MakeMetadataCollection( + qp.TenantID, + qp.ProtectedResource.ID(), + path.ExchangeService, + qp.Category, + []graph.MetadataCollectionEntry{ + graph.NewMetadataEntry(graph.PreviousPathFileName, currPaths), + graph.NewMetadataEntry(graph.DeltaURLsFileName, deltaURLs), + }, + statusUpdater) + if err != nil { + return nil, clues.Wrap(err, "making metadata collection") + } + + collections["metadata"] = col + + return collections, el.Failure() +} + +// produces a set of id:path pairs from the deltapaths map. +// Each entry in the set will, if not removed, produce a collection +// that will delete the tombstone by path. +func makeTombstones(dps DeltaPaths) map[string]string { + r := make(map[string]string, len(dps)) + + for id, v := range dps { + r[id] = v.Path + } + + return r +} + +func pathFromPrevString(ps string) (path.Path, error) { + p, err := path.FromDataLayerPath(ps, false) + if err != nil { + return nil, clues.Wrap(err, "parsing previous path string") + } + + return p, nil +} + +// Returns true if the container passes the scope comparison and should be included. +// Returns: +// - the path representing the directory as it should be stored in the repository. +// - the human-readable path using display names. +// - true if the path passes the scope comparison. +func includeContainer( + ctx context.Context, + qp graph.QueryParams, + c graph.CachedContainer, + scope selectors.ExchangeScope, + category path.CategoryType, +) (path.Path, *path.Builder, bool) { + var ( + directory string + locPath path.Path + pb = c.Path() + loc = c.Location() + ) + + // Clause ensures that DefaultContactFolder is inspected properly + if category == path.ContactsCategory && ptr.Val(c.GetDisplayName()) == api.DefaultContacts { + loc = loc.Append(api.DefaultContacts) + } + + dirPath, err := pb.ToDataLayerExchangePathForCategory( + qp.TenantID, + qp.ProtectedResource.ID(), + category, + false) + // Containers without a path (e.g. Root mail folder) always err here. + if err != nil { + return nil, nil, false + } + + directory = dirPath.Folder(false) + + if loc != nil { + locPath, err = loc.ToDataLayerExchangePathForCategory( + qp.TenantID, + qp.ProtectedResource.ID(), + category, + false) + // Containers without a path (e.g. Root mail folder) always err here. + if err != nil { + return nil, nil, false + } + + directory = locPath.Folder(false) + } + + var ok bool + + switch category { + case path.EmailCategory: + ok = scope.Matches(selectors.ExchangeMailFolder, directory) + case path.ContactsCategory: + ok = scope.Matches(selectors.ExchangeContactFolder, directory) + case path.EventsCategory: + ok = scope.Matches(selectors.ExchangeEventCalendar, directory) + default: + return nil, nil, false + } + + logger.Ctx(ctx).With( + "included", ok, + "scope", scope, + "matches_input", directory, + ).Debug("backup folder selection filter") + + return dirPath, loc, ok +} diff --git a/src/internal/m365/service/exchange/backup_test.go b/src/internal/m365/collection/exchange/backup_test.go similarity index 99% rename from src/internal/m365/service/exchange/backup_test.go rename to src/internal/m365/collection/exchange/backup_test.go index 5e0f140d6..6e4e20ba1 100644 --- a/src/internal/m365/service/exchange/backup_test.go +++ b/src/internal/m365/collection/exchange/backup_test.go @@ -308,7 +308,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { ) require.NoError(t, err, clues.ToCore(err)) - cdps, canUsePreviousBackup, err := parseMetadataCollections(ctx, []data.RestoreCollection{ + cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{ data.NoFetchRestoreCollection{Collection: coll}, }) test.expectError(t, err, clues.ToCore(err)) @@ -368,7 +368,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections_ReadFailure( fc := failingColl{t} - _, canUsePreviousBackup, err := parseMetadataCollections(ctx, []data.RestoreCollection{fc}) + _, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{fc}) require.NoError(t, err) require.False(t, canUsePreviousBackup) } @@ -477,7 +477,7 @@ func (suite *BackupIntgSuite) TestMailFetch() { ProtectedResource: inMock.NewProvider(userID, userID), } - collections, err := createCollections( + collections, err := CreateCollections( ctx, bpc, handlers, @@ -560,7 +560,7 @@ func (suite *BackupIntgSuite) TestDelta() { } // get collections without providing any delta history (ie: full backup) - collections, err := createCollections( + collections, err := CreateCollections( ctx, bpc, handlers, @@ -582,7 +582,7 @@ func (suite *BackupIntgSuite) TestDelta() { require.NotNil(t, metadata, "collections contains a metadata collection") - cdps, canUsePreviousBackup, err := parseMetadataCollections(ctx, []data.RestoreCollection{ + cdps, canUsePreviousBackup, err := ParseMetadataCollections(ctx, []data.RestoreCollection{ data.NoFetchRestoreCollection{Collection: metadata}, }) require.NoError(t, err, clues.ToCore(err)) @@ -592,7 +592,7 @@ func (suite *BackupIntgSuite) TestDelta() { // now do another backup with the previous delta tokens, // which should only contain the difference. - collections, err = createCollections( + collections, err = CreateCollections( ctx, bpc, handlers, @@ -644,7 +644,7 @@ func (suite *BackupIntgSuite) TestMailSerializationRegression() { Selector: sel.Selector, } - collections, err := createCollections( + collections, err := CreateCollections( ctx, bpc, handlers, @@ -725,7 +725,7 @@ func (suite *BackupIntgSuite) TestContactSerializationRegression() { ProtectedResource: inMock.NewProvider(suite.user, suite.user), } - edcs, err := createCollections( + edcs, err := CreateCollections( ctx, bpc, handlers, @@ -855,7 +855,7 @@ func (suite *BackupIntgSuite) TestEventsSerializationRegression() { ProtectedResource: inMock.NewProvider(suite.user, suite.user), } - collections, err := createCollections( + collections, err := CreateCollections( ctx, bpc, handlers, @@ -1198,7 +1198,7 @@ func checkMetadata( expect DeltaPaths, c data.BackupCollection, ) { - catPaths, _, err := parseMetadataCollections( + catPaths, _, err := ParseMetadataCollections( ctx, []data.RestoreCollection{data.NoFetchRestoreCollection{Collection: c}}) if !assert.NoError(t, err, "getting metadata", clues.ToCore(err)) { diff --git a/src/internal/m365/service/exchange/cache_container.go b/src/internal/m365/collection/exchange/cache_container.go similarity index 100% rename from src/internal/m365/service/exchange/cache_container.go rename to src/internal/m365/collection/exchange/cache_container.go diff --git a/src/internal/m365/service/exchange/collection.go b/src/internal/m365/collection/exchange/collection.go similarity index 100% rename from src/internal/m365/service/exchange/collection.go rename to src/internal/m365/collection/exchange/collection.go diff --git a/src/internal/m365/service/exchange/collection_test.go b/src/internal/m365/collection/exchange/collection_test.go similarity index 100% rename from src/internal/m365/service/exchange/collection_test.go rename to src/internal/m365/collection/exchange/collection_test.go diff --git a/src/internal/m365/service/exchange/consts.go b/src/internal/m365/collection/exchange/consts.go similarity index 100% rename from src/internal/m365/service/exchange/consts.go rename to src/internal/m365/collection/exchange/consts.go diff --git a/src/internal/m365/service/exchange/contacts_backup.go b/src/internal/m365/collection/exchange/contacts_backup.go similarity index 100% rename from src/internal/m365/service/exchange/contacts_backup.go rename to src/internal/m365/collection/exchange/contacts_backup.go diff --git a/src/internal/m365/service/exchange/contacts_container_cache.go b/src/internal/m365/collection/exchange/contacts_container_cache.go similarity index 100% rename from src/internal/m365/service/exchange/contacts_container_cache.go rename to src/internal/m365/collection/exchange/contacts_container_cache.go diff --git a/src/internal/m365/service/exchange/contacts_restore.go b/src/internal/m365/collection/exchange/contacts_restore.go similarity index 94% rename from src/internal/m365/service/exchange/contacts_restore.go rename to src/internal/m365/collection/exchange/contacts_restore.go index ea5dc2441..6dfb29300 100644 --- a/src/internal/m365/service/exchange/contacts_restore.go +++ b/src/internal/m365/collection/exchange/contacts_restore.go @@ -31,7 +31,7 @@ func newContactRestoreHandler( } } -func (h contactRestoreHandler) newContainerCache(userID string) graph.ContainerResolver { +func (h contactRestoreHandler) NewContainerCache(userID string) graph.ContainerResolver { return &contactContainerCache{ userID: userID, enumer: h.ac, @@ -39,7 +39,7 @@ func (h contactRestoreHandler) newContainerCache(userID string) graph.ContainerR } } -func (h contactRestoreHandler) formatRestoreDestination( +func (h contactRestoreHandler) FormatRestoreDestination( destinationContainerName string, _ path.Path, // contact folders cannot be nested ) *path.Builder { @@ -60,7 +60,7 @@ func (h contactRestoreHandler) GetContainerByName( return h.ac.GetContainerByName(ctx, userID, "", containerName) } -func (h contactRestoreHandler) defaultRootContainer() string { +func (h contactRestoreHandler) DefaultRootContainer() string { return api.DefaultContacts } @@ -163,7 +163,7 @@ func restoreContact( return info, nil } -func (h contactRestoreHandler) getItemsInContainerByCollisionKey( +func (h contactRestoreHandler) GetItemsInContainerByCollisionKey( ctx context.Context, userID, containerID string, ) (map[string]string, error) { diff --git a/src/internal/m365/service/exchange/contacts_restore_test.go b/src/internal/m365/collection/exchange/contacts_restore_test.go similarity index 100% rename from src/internal/m365/service/exchange/contacts_restore_test.go rename to src/internal/m365/collection/exchange/contacts_restore_test.go diff --git a/src/internal/m365/service/exchange/container_resolver.go b/src/internal/m365/collection/exchange/container_resolver.go similarity index 100% rename from src/internal/m365/service/exchange/container_resolver.go rename to src/internal/m365/collection/exchange/container_resolver.go diff --git a/src/internal/m365/service/exchange/container_resolver_test.go b/src/internal/m365/collection/exchange/container_resolver_test.go similarity index 98% rename from src/internal/m365/service/exchange/container_resolver_test.go rename to src/internal/m365/collection/exchange/container_resolver_test.go index b2ff30830..12f8c06ee 100644 --- a/src/internal/m365/service/exchange/container_resolver_test.go +++ b/src/internal/m365/collection/exchange/container_resolver_test.go @@ -804,10 +804,10 @@ func runCreateDestinationTest( var ( svc = path.ExchangeService - gcc = handler.newContainerCache(userID) + gcc = handler.NewContainerCache(userID) ) - err := gcc.Populate(ctx, fault.New(true), handler.defaultRootContainer()) + err := gcc.Populate(ctx, fault.New(true), handler.DefaultRootContainer()) require.NoError(t, err, clues.ToCore(err)) path1, err := path.Build( @@ -819,10 +819,10 @@ func runCreateDestinationTest( containerNames1...) require.NoError(t, err, clues.ToCore(err)) - containerID, gcc, err := createDestination( + containerID, gcc, err := CreateDestination( ctx, handler, - handler.formatRestoreDestination(destinationName, path1), + handler.FormatRestoreDestination(destinationName, path1), userID, gcc, fault.New(true)) @@ -840,10 +840,10 @@ func runCreateDestinationTest( containerNames2...) require.NoError(t, err, clues.ToCore(err)) - containerID, gcc, err = createDestination( + containerID, gcc, err = CreateDestination( ctx, handler, - handler.formatRestoreDestination(destinationName, path2), + handler.FormatRestoreDestination(destinationName, path2), userID, gcc, fault.New(true)) diff --git a/src/internal/m365/service/exchange/events_backup.go b/src/internal/m365/collection/exchange/events_backup.go similarity index 100% rename from src/internal/m365/service/exchange/events_backup.go rename to src/internal/m365/collection/exchange/events_backup.go diff --git a/src/internal/m365/service/exchange/events_container_cache.go b/src/internal/m365/collection/exchange/events_container_cache.go similarity index 100% rename from src/internal/m365/service/exchange/events_container_cache.go rename to src/internal/m365/collection/exchange/events_container_cache.go diff --git a/src/internal/m365/service/exchange/events_instance_restore.go b/src/internal/m365/collection/exchange/events_instance_restore.go similarity index 100% rename from src/internal/m365/service/exchange/events_instance_restore.go rename to src/internal/m365/collection/exchange/events_instance_restore.go diff --git a/src/internal/m365/service/exchange/events_restore.go b/src/internal/m365/collection/exchange/events_restore.go similarity index 97% rename from src/internal/m365/service/exchange/events_restore.go rename to src/internal/m365/collection/exchange/events_restore.go index 922d7a0b0..ddf098778 100644 --- a/src/internal/m365/service/exchange/events_restore.go +++ b/src/internal/m365/collection/exchange/events_restore.go @@ -32,7 +32,7 @@ func newEventRestoreHandler( } } -func (h eventRestoreHandler) newContainerCache(userID string) graph.ContainerResolver { +func (h eventRestoreHandler) NewContainerCache(userID string) graph.ContainerResolver { return &eventContainerCache{ userID: userID, enumer: h.ac, @@ -40,7 +40,7 @@ func (h eventRestoreHandler) newContainerCache(userID string) graph.ContainerRes } } -func (h eventRestoreHandler) formatRestoreDestination( +func (h eventRestoreHandler) FormatRestoreDestination( destinationContainerName string, _ path.Path, // ignored because calendars cannot be nested ) *path.Builder { @@ -66,7 +66,7 @@ func (h eventRestoreHandler) GetContainerByName( } // always returns the provided value -func (h eventRestoreHandler) defaultRootContainer() string { +func (h eventRestoreHandler) DefaultRootContainer() string { return api.DefaultCalendar } @@ -323,7 +323,7 @@ func updateAttachments( return el.Failure() } -func (h eventRestoreHandler) getItemsInContainerByCollisionKey( +func (h eventRestoreHandler) GetItemsInContainerByCollisionKey( ctx context.Context, userID, containerID string, ) (map[string]string, error) { diff --git a/src/internal/m365/service/exchange/events_restore_test.go b/src/internal/m365/collection/exchange/events_restore_test.go similarity index 100% rename from src/internal/m365/service/exchange/events_restore_test.go rename to src/internal/m365/collection/exchange/events_restore_test.go diff --git a/src/internal/m365/service/exchange/handlers.go b/src/internal/m365/collection/exchange/handlers.go similarity index 95% rename from src/internal/m365/service/exchange/handlers.go rename to src/internal/m365/collection/exchange/handlers.go index 55ab7d42a..cf207d5a2 100644 --- a/src/internal/m365/service/exchange/handlers.go +++ b/src/internal/m365/collection/exchange/handlers.go @@ -63,8 +63,8 @@ type restoreHandler interface { itemRestorer containerAPI getItemsByCollisionKeyser - newContainerCache(userID string) graph.ContainerResolver - formatRestoreDestination( + NewContainerCache(userID string) graph.ContainerResolver + FormatRestoreDestination( destinationContainerName string, collectionFullPath path.Path, ) *path.Builder @@ -95,7 +95,7 @@ type containerAPI interface { ctx context.Context, userID, parentContainerID, containerName string, ) (graph.Container, error) - defaultRootContainer() string + DefaultRootContainer() string } type containerByNamer interface { @@ -107,7 +107,7 @@ type containerByNamer interface { } // primary interface controller for all per-cateogry restoration behavior. -func restoreHandlers( +func RestoreHandlers( ac api.Client, ) map[path.CategoryType]restoreHandler { return map[path.CategoryType]restoreHandler{ @@ -124,7 +124,7 @@ type getItemsByCollisionKeyser interface { // Collision key checks are used during restore to handle the on- // collision restore configurations that cause the item restore to get // skipped, replaced, or copied. - getItemsInContainerByCollisionKey( + GetItemsInContainerByCollisionKey( ctx context.Context, userID, containerID string, ) (map[string]string, error) diff --git a/src/internal/m365/service/exchange/helper_test.go b/src/internal/m365/collection/exchange/helper_test.go similarity index 100% rename from src/internal/m365/service/exchange/helper_test.go rename to src/internal/m365/collection/exchange/helper_test.go diff --git a/src/internal/m365/service/exchange/mail_backup.go b/src/internal/m365/collection/exchange/mail_backup.go similarity index 100% rename from src/internal/m365/service/exchange/mail_backup.go rename to src/internal/m365/collection/exchange/mail_backup.go diff --git a/src/internal/m365/service/exchange/mail_container_cache.go b/src/internal/m365/collection/exchange/mail_container_cache.go similarity index 100% rename from src/internal/m365/service/exchange/mail_container_cache.go rename to src/internal/m365/collection/exchange/mail_container_cache.go diff --git a/src/internal/m365/service/exchange/mail_container_cache_test.go b/src/internal/m365/collection/exchange/mail_container_cache_test.go similarity index 100% rename from src/internal/m365/service/exchange/mail_container_cache_test.go rename to src/internal/m365/collection/exchange/mail_container_cache_test.go diff --git a/src/internal/m365/service/exchange/mail_restore.go b/src/internal/m365/collection/exchange/mail_restore.go similarity index 95% rename from src/internal/m365/service/exchange/mail_restore.go rename to src/internal/m365/collection/exchange/mail_restore.go index 6828361f8..e8a7e8b52 100644 --- a/src/internal/m365/service/exchange/mail_restore.go +++ b/src/internal/m365/collection/exchange/mail_restore.go @@ -32,7 +32,7 @@ func newMailRestoreHandler( } } -func (h mailRestoreHandler) newContainerCache(userID string) graph.ContainerResolver { +func (h mailRestoreHandler) NewContainerCache(userID string) graph.ContainerResolver { return &mailContainerCache{ userID: userID, enumer: h.ac, @@ -40,7 +40,7 @@ func (h mailRestoreHandler) newContainerCache(userID string) graph.ContainerReso } } -func (h mailRestoreHandler) formatRestoreDestination( +func (h mailRestoreHandler) FormatRestoreDestination( destinationContainerName string, collectionFullPath path.Path, ) *path.Builder { @@ -65,7 +65,7 @@ func (h mailRestoreHandler) GetContainerByName( return h.ac.GetContainerByName(ctx, userID, parentContainerID, containerName) } -func (h mailRestoreHandler) defaultRootContainer() string { +func (h mailRestoreHandler) DefaultRootContainer() string { return api.MsgFolderRoot } @@ -216,7 +216,7 @@ func setMessageSVEPs(msg models.Messageable) models.Messageable { return msg } -func (h mailRestoreHandler) getItemsInContainerByCollisionKey( +func (h mailRestoreHandler) GetItemsInContainerByCollisionKey( ctx context.Context, userID, containerID string, ) (map[string]string, error) { diff --git a/src/internal/m365/service/exchange/mail_restore_test.go b/src/internal/m365/collection/exchange/mail_restore_test.go similarity index 100% rename from src/internal/m365/service/exchange/mail_restore_test.go rename to src/internal/m365/collection/exchange/mail_restore_test.go diff --git a/src/internal/m365/collection/exchange/metadata.go b/src/internal/m365/collection/exchange/metadata.go new file mode 100644 index 000000000..2c2824f49 --- /dev/null +++ b/src/internal/m365/collection/exchange/metadata.go @@ -0,0 +1,164 @@ +package exchange + +import ( + "context" + "encoding/json" + + "github.com/alcionai/clues" + + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/m365/graph" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" +) + +// MetadataFileNames produces the category-specific set of filenames used to +// store graph metadata such as delta tokens and folderID->path references. +func MetadataFileNames(cat path.CategoryType) []string { + switch cat { + case path.EmailCategory, path.ContactsCategory: + return []string{graph.DeltaURLsFileName, graph.PreviousPathFileName} + default: + return []string{graph.PreviousPathFileName} + } +} + +type CatDeltaPaths map[path.CategoryType]DeltaPaths + +type DeltaPaths map[string]DeltaPath + +func (dps DeltaPaths) AddDelta(k, d string) { + dp, ok := dps[k] + if !ok { + dp = DeltaPath{} + } + + dp.Delta = d + dps[k] = dp +} + +func (dps DeltaPaths) AddPath(k, p string) { + dp, ok := dps[k] + if !ok { + dp = DeltaPath{} + } + + dp.Path = p + dps[k] = dp +} + +type DeltaPath struct { + Delta string + Path string +} + +// ParseMetadataCollections produces a map of structs holding delta +// and path lookup maps. +func ParseMetadataCollections( + ctx context.Context, + colls []data.RestoreCollection, +) (CatDeltaPaths, bool, error) { + // cdp stores metadata + cdp := CatDeltaPaths{ + path.ContactsCategory: {}, + path.EmailCategory: {}, + path.EventsCategory: {}, + } + + // found tracks the metadata we've loaded, to make sure we don't + // fetch overlapping copies. + found := map[path.CategoryType]map[string]struct{}{ + path.ContactsCategory: {}, + path.EmailCategory: {}, + path.EventsCategory: {}, + } + + // errors from metadata items should not stop the backup, + // but it should prevent us from using previous backups + errs := fault.New(true) + + for _, coll := range colls { + var ( + breakLoop bool + items = coll.Items(ctx, errs) + category = coll.FullPath().Category() + ) + + for { + select { + case <-ctx.Done(): + return nil, false, clues.Wrap(ctx.Err(), "parsing collection metadata").WithClues(ctx) + + case item, ok := <-items: + if !ok || errs.Failure() != nil { + breakLoop = true + break + } + + var ( + m = map[string]string{} + cdps = cdp[category] + ) + + err := json.NewDecoder(item.ToReader()).Decode(&m) + if err != nil { + return nil, false, clues.New("decoding metadata json").WithClues(ctx) + } + + switch item.ID() { + case graph.PreviousPathFileName: + if _, ok := found[category]["path"]; ok { + return nil, false, clues.Wrap(clues.New(category.String()), "multiple versions of path metadata").WithClues(ctx) + } + + for k, p := range m { + cdps.AddPath(k, p) + } + + found[category]["path"] = struct{}{} + + case graph.DeltaURLsFileName: + if _, ok := found[category]["delta"]; ok { + return nil, false, clues.Wrap(clues.New(category.String()), "multiple versions of delta metadata").WithClues(ctx) + } + + for k, d := range m { + cdps.AddDelta(k, d) + } + + found[category]["delta"] = struct{}{} + } + + cdp[category] = cdps + } + + if breakLoop { + break + } + } + } + + if errs.Failure() != nil { + logger.CtxErr(ctx, errs.Failure()).Info("reading metadata collection items") + + return CatDeltaPaths{ + path.ContactsCategory: {}, + path.EmailCategory: {}, + path.EventsCategory: {}, + }, false, nil + } + + // Remove any entries that contain a path or a delta, but not both. + // That metadata is considered incomplete, and needs to incur a + // complete backup on the next run. + for _, dps := range cdp { + for k, dp := range dps { + if len(dp.Path) == 0 { + delete(dps, k) + } + } + } + + return cdp, true, nil +} diff --git a/src/internal/m365/collection/exchange/restore.go b/src/internal/m365/collection/exchange/restore.go new file mode 100644 index 000000000..6d6b6721c --- /dev/null +++ b/src/internal/m365/collection/exchange/restore.go @@ -0,0 +1,255 @@ +package exchange + +import ( + "bytes" + "context" + "runtime/trace" + + "github.com/alcionai/clues" + "github.com/microsoftgraph/msgraph-sdk-go/models" + + "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/diagnostics" + "github.com/alcionai/corso/src/internal/m365/graph" + "github.com/alcionai/corso/src/internal/m365/support" + "github.com/alcionai/corso/src/internal/observe" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/count" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" +) + +// RestoreCollection handles restoration of an individual collection. +func RestoreCollection( + ctx context.Context, + ir itemRestorer, + dc data.RestoreCollection, + resourceID, destinationID string, + collisionKeyToItemID map[string]string, + collisionPolicy control.CollisionPolicy, + deets *details.Builder, + errs *fault.Bus, + ctr *count.Bus, +) (support.CollectionMetrics, error) { + ctx, end := diagnostics.Span(ctx, "m365:exchange:restoreCollection", diagnostics.Label("path", dc.FullPath())) + defer end() + + var ( + el = errs.Local() + metrics support.CollectionMetrics + items = dc.Items(ctx, errs) + fullPath = dc.FullPath() + category = fullPath.Category() + ) + + colProgress := observe.CollectionProgress( + ctx, + category.String(), + fullPath.Folder(false)) + defer close(colProgress) + + for { + select { + case <-ctx.Done(): + return metrics, clues.Wrap(ctx.Err(), "context cancelled").WithClues(ctx) + + case itemData, ok := <-items: + if !ok || el.Failure() != nil { + return metrics, el.Failure() + } + + ictx := clues.Add(ctx, "item_id", itemData.ID()) + trace.Log(ictx, "m365:exchange:restoreCollection:item", itemData.ID()) + metrics.Objects++ + + buf := &bytes.Buffer{} + + _, err := buf.ReadFrom(itemData.ToReader()) + if err != nil { + el.AddRecoverable(ctx, clues.Wrap(err, "reading item bytes").WithClues(ictx)) + continue + } + + body := buf.Bytes() + + info, err := ir.restore( + ictx, + body, + resourceID, + destinationID, + collisionKeyToItemID, + collisionPolicy, + errs, + ctr) + if err != nil { + if !graph.IsErrItemAlreadyExistsConflict(err) { + el.AddRecoverable(ictx, err) + } + + continue + } + + metrics.Bytes += int64(len(body)) + metrics.Successes++ + + // FIXME: this may be the incorrect path. If we restored within a top-level + // destination folder, then the restore path no longer matches the fullPath. + itemPath, err := fullPath.AppendItem(itemData.ID()) + if err != nil { + el.AddRecoverable(ctx, clues.Wrap(err, "adding item to collection path").WithClues(ctx)) + continue + } + + locationRef := path.Builder{}.Append(itemPath.Folders()...) + + err = deets.Add( + itemPath, + locationRef, + details.ItemInfo{ + Exchange: info, + }) + if err != nil { + // These deets additions are for cli display purposes only. + // no need to fail out on error. + logger.Ctx(ctx).Infow("accounting for restored item", "error", err) + } + + colProgress <- struct{}{} + } + } +} + +// CreateDestination creates folders in sequence +// [root leaf1 leaf2] similar to a linked list. +// @param directory is the desired path from the root to the container +// that the items will be restored into. +func CreateDestination( + ctx context.Context, + ca containerAPI, + destination *path.Builder, + resourceID string, + gcr graph.ContainerResolver, + errs *fault.Bus, +) (string, graph.ContainerResolver, error) { + var ( + cache = gcr + restoreLoc = &path.Builder{} + containerParentID string + ) + + for _, container := range destination.Elements() { + restoreLoc = restoreLoc.Append(container) + + ictx := clues.Add( + ctx, + "container_parent_id", containerParentID, + "container_name", container, + "restore_location", restoreLoc) + + containerID, err := getOrPopulateContainer( + ictx, + ca, + cache, + restoreLoc, + resourceID, + containerParentID, + container, + errs) + if err != nil { + return "", cache, clues.Stack(err) + } + + containerParentID = containerID + } + + // containerParentID now identifies the last created container, + // not its parent. + return containerParentID, cache, nil +} + +func getOrPopulateContainer( + ctx context.Context, + ca containerAPI, + gcr graph.ContainerResolver, + restoreLoc *path.Builder, + resourceID, containerParentID, containerName string, + errs *fault.Bus, +) (string, error) { + cached, ok := gcr.LocationInCache(restoreLoc.String()) + if ok { + return cached, nil + } + + c, err := ca.CreateContainer(ctx, resourceID, containerParentID, containerName) + + // 409 handling case: + // attempt to fetch the container by name and add that result to the cache. + // This is rare, but may happen if CreateContainer() POST fails with 5xx: + // sometimes the backend will create the folder despite the 5xx response, + // leaving our local containerResolver with inconsistent state. + if graph.IsErrFolderExists(err) { + cc, e := ca.GetContainerByName(ctx, resourceID, containerParentID, containerName) + if e != nil { + err = clues.Stack(err, e) + } else { + c = cc + err = nil + } + } + + if err != nil { + return "", clues.Wrap(err, "creating restore container") + } + + folderID := ptr.Val(c.GetId()) + + if err = gcr.AddToCache(ctx, c); err != nil { + return "", clues.Wrap(err, "adding container to cache") + } + + return folderID, nil +} + +func uploadAttachments( + ctx context.Context, + ap attachmentPoster, + as []models.Attachmentable, + resourceID, destinationID, itemID string, + errs *fault.Bus, +) error { + el := errs.Local() + + for _, a := range as { + if el.Failure() != nil { + return el.Failure() + } + + err := uploadAttachment( + ctx, + ap, + resourceID, + destinationID, + itemID, + a) + if err != nil { + // FIXME: I don't know why we're swallowing this error case. + // It needs investigation: https://github.com/alcionai/corso/issues/3498 + if ptr.Val(a.GetOdataType()) == "#microsoft.graph.itemAttachment" { + name := ptr.Val(a.GetName()) + + logger.CtxErr(ctx, err). + With("attachment_name", name). + Info("mail upload failed") + + continue + } + + el.AddRecoverable(ctx, clues.Wrap(err, "uploading mail attachment").WithClues(ctx)) + } + } + + return el.Failure() +} diff --git a/src/internal/m365/service/exchange/restore_test.go b/src/internal/m365/collection/exchange/restore_test.go similarity index 99% rename from src/internal/m365/service/exchange/restore_test.go rename to src/internal/m365/collection/exchange/restore_test.go index 88983114e..ac34946f4 100644 --- a/src/internal/m365/service/exchange/restore_test.go +++ b/src/internal/m365/collection/exchange/restore_test.go @@ -166,7 +166,7 @@ func (suite *RestoreIntgSuite) TestRestoreEvent() { func (suite *RestoreIntgSuite) TestRestoreExchangeObject() { t := suite.T() - handlers := restoreHandlers(suite.ac) + handlers := RestoreHandlers(suite.ac) userID := tconfig.M365UserID(suite.T()) diff --git a/src/internal/m365/service/exchange/transform.go b/src/internal/m365/collection/exchange/transform.go similarity index 100% rename from src/internal/m365/service/exchange/transform.go rename to src/internal/m365/collection/exchange/transform.go diff --git a/src/internal/m365/service/exchange/transform_test.go b/src/internal/m365/collection/exchange/transform_test.go similarity index 100% rename from src/internal/m365/service/exchange/transform_test.go rename to src/internal/m365/collection/exchange/transform_test.go diff --git a/src/internal/m365/service/exchange/backup.go b/src/internal/m365/service/exchange/backup.go index dc1d0e9f0..39190c773 100644 --- a/src/internal/m365/service/exchange/backup.go +++ b/src/internal/m365/service/exchange/backup.go @@ -2,23 +2,17 @@ package exchange import ( "context" - "encoding/json" "github.com/alcionai/clues" - "github.com/alcionai/corso/src/internal/common/pii" "github.com/alcionai/corso/src/internal/common/prefixmatcher" - "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/m365/collection/exchange" "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/support" - "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/internal/operations/inject" - "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/fault" - "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" - "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -41,7 +35,7 @@ func ProduceBackupCollections( collections = []data.BackupCollection{} el = errs.Local() categories = map[path.CategoryType]struct{}{} - handlers = BackupHandlers(ac) + handlers = exchange.BackupHandlers(ac) ) // Turn on concurrency limiter middleware for exchange backups @@ -51,7 +45,7 @@ func ProduceBackupCollections( bpc.Options.ToggleFeatures.DisableConcurrencyLimiter, bpc.Options.Parallelism.ItemFetch) - cdps, canUsePreviousBackup, err := parseMetadataCollections(ctx, bpc.MetadataCollections) + cdps, canUsePreviousBackup, err := exchange.ParseMetadataCollections(ctx, bpc.MetadataCollections) if err != nil { return nil, nil, false, err } @@ -63,7 +57,7 @@ func ProduceBackupCollections( break } - dcs, err := createCollections( + dcs, err := exchange.CreateCollections( ctx, bpc, handlers, @@ -101,518 +95,3 @@ func ProduceBackupCollections( return collections, nil, canUsePreviousBackup, el.Failure() } - -// createCollections - utility function that retrieves M365 -// IDs through Microsoft Graph API. The selectors.ExchangeScope -// determines the type of collections that are retrieved. -func createCollections( - ctx context.Context, - bpc inject.BackupProducerConfig, - handlers map[path.CategoryType]backupHandler, - tenantID string, - scope selectors.ExchangeScope, - dps DeltaPaths, - su support.StatusUpdater, - errs *fault.Bus, -) ([]data.BackupCollection, error) { - ctx = clues.Add(ctx, "category", scope.Category().PathType()) - - var ( - allCollections = make([]data.BackupCollection, 0) - category = scope.Category().PathType() - qp = graph.QueryParams{ - Category: category, - ProtectedResource: bpc.ProtectedResource, - TenantID: tenantID, - } - ) - - handler, ok := handlers[category] - if !ok { - return nil, clues.New("unsupported backup category type").WithClues(ctx) - } - - foldersComplete := observe.MessageWithCompletion( - ctx, - observe.Bulletf("%s", qp.Category)) - defer close(foldersComplete) - - rootFolder, cc := handler.NewContainerCache(bpc.ProtectedResource.ID()) - - if err := cc.Populate(ctx, errs, rootFolder); err != nil { - return nil, clues.Wrap(err, "populating container cache") - } - - collections, err := populateCollections( - ctx, - qp, - handler, - su, - cc, - scope, - dps, - bpc.Options, - errs) - if err != nil { - return nil, clues.Wrap(err, "filling collections") - } - - foldersComplete <- struct{}{} - - for _, coll := range collections { - allCollections = append(allCollections, coll) - } - - return allCollections, nil -} - -// populateCollections is a utility function -// that places the M365 object ids belonging to specific directories -// into a BackupCollection. Messages outside of those directories are omitted. -// @param collection is filled with during this function. -// Supports all exchange applications: Contacts, Events, and Mail -// -// TODO(ashmrtn): This should really return []data.BackupCollection but -// unfortunately some of our tests rely on being able to lookup returned -// collections by ID and it would be non-trivial to change them. -func populateCollections( - ctx context.Context, - qp graph.QueryParams, - bh backupHandler, - statusUpdater support.StatusUpdater, - resolver graph.ContainerResolver, - scope selectors.ExchangeScope, - dps DeltaPaths, - ctrlOpts control.Options, - errs *fault.Bus, -) (map[string]data.BackupCollection, error) { - var ( - // folder ID -> BackupCollection. - collections = map[string]data.BackupCollection{} - // folder ID -> delta url or folder path lookups - deltaURLs = map[string]string{} - currPaths = map[string]string{} - // copy of previousPaths. any folder found in the resolver get - // deleted from this map, leaving only the deleted folders behind - tombstones = makeTombstones(dps) - category = qp.Category - ) - - logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps)) - - el := errs.Local() - - for _, c := range resolver.Items() { - if el.Failure() != nil { - return nil, el.Failure() - } - - cID := ptr.Val(c.GetId()) - delete(tombstones, cID) - - var ( - err error - dp = dps[cID] - prevDelta = dp.Delta - prevPathStr = dp.Path // do not log: pii; log prevPath instead - prevPath path.Path - ictx = clues.Add( - ctx, - "container_id", cID, - "previous_delta", pii.SafeURL{ - URL: prevDelta, - SafePathElems: graph.SafeURLPathParams, - SafeQueryKeys: graph.SafeURLQueryParams, - }) - ) - - currPath, locPath, ok := includeContainer(ictx, qp, c, scope, category) - // Only create a collection if the path matches the scope. - if !ok { - continue - } - - if len(prevPathStr) > 0 { - if prevPath, err = pathFromPrevString(prevPathStr); err != nil { - logger.CtxErr(ictx, err).Error("parsing prev path") - // if the previous path is unusable, then the delta must be, too. - prevDelta = "" - } - } - - ictx = clues.Add(ictx, "previous_path", prevPath) - - added, removed, newDelta, err := bh.itemEnumerator(). - GetAddedAndRemovedItemIDs( - ictx, - qp.ProtectedResource.ID(), - cID, - prevDelta, - ctrlOpts.ToggleFeatures.ExchangeImmutableIDs, - !ctrlOpts.ToggleFeatures.DisableDelta) - if err != nil { - if !graph.IsErrDeletedInFlight(err) { - el.AddRecoverable(ctx, clues.Stack(err).Label(fault.LabelForceNoBackupCreation)) - continue - } - - // race conditions happen, containers might get deleted while - // this process is in flight. If that happens, force the collection - // to reset. This prevents any old items from being retained in - // storage. If the container (or its children) are sill missing - // on the next backup, they'll get tombstoned. - newDelta = api.DeltaUpdate{Reset: true} - } - - if len(newDelta.URL) > 0 { - deltaURLs[cID] = newDelta.URL - } else if !newDelta.Reset { - logger.Ctx(ictx).Info("missing delta url") - } - - edc := NewCollection( - qp.ProtectedResource.ID(), - currPath, - prevPath, - locPath, - category, - bh.itemHandler(), - statusUpdater, - ctrlOpts, - newDelta.Reset) - - collections[cID] = &edc - - for _, add := range added { - edc.added[add] = struct{}{} - } - - // Remove any deleted IDs from the set of added IDs because items that are - // deleted and then restored will have a different ID than they did - // originally. - for _, remove := range removed { - delete(edc.added, remove) - edc.removed[remove] = struct{}{} - } - - // add the current path for the container ID to be used in the next backup - // as the "previous path", for reference in case of a rename or relocation. - currPaths[cID] = currPath.String() - } - - // A tombstone is a folder that needs to be marked for deletion. - // The only situation where a tombstone should appear is if the folder exists - // in the `previousPath` set, but does not exist in the current container - // resolver (which contains all the resource owners' current containers). - for id, p := range tombstones { - if el.Failure() != nil { - return nil, el.Failure() - } - - var ( - err error - ictx = clues.Add(ctx, "tombstone_id", id) - ) - - if collections[id] != nil { - el.AddRecoverable(ctx, clues.Wrap(err, "conflict: tombstone exists for a live collection").WithClues(ictx)) - continue - } - - // only occurs if it was a new folder that we picked up during the container - // resolver phase that got deleted in flight by the time we hit this stage. - if len(p) == 0 { - continue - } - - prevPath, err := pathFromPrevString(p) - if err != nil { - // technically shouldn't ever happen. But just in case... - logger.CtxErr(ictx, err).Error("parsing tombstone prev path") - continue - } - - edc := NewCollection( - qp.ProtectedResource.ID(), - nil, // marks the collection as deleted - prevPath, - nil, // tombstones don't need a location - category, - bh.itemHandler(), - statusUpdater, - ctrlOpts, - false) - collections[id] = &edc - } - - logger.Ctx(ctx).Infow( - "adding metadata collection entries", - "num_paths_entries", len(currPaths), - "num_deltas_entries", len(deltaURLs)) - - col, err := graph.MakeMetadataCollection( - qp.TenantID, - qp.ProtectedResource.ID(), - path.ExchangeService, - qp.Category, - []graph.MetadataCollectionEntry{ - graph.NewMetadataEntry(graph.PreviousPathFileName, currPaths), - graph.NewMetadataEntry(graph.DeltaURLsFileName, deltaURLs), - }, - statusUpdater) - if err != nil { - return nil, clues.Wrap(err, "making metadata collection") - } - - collections["metadata"] = col - - return collections, el.Failure() -} - -// produces a set of id:path pairs from the deltapaths map. -// Each entry in the set will, if not removed, produce a collection -// that will delete the tombstone by path. -func makeTombstones(dps DeltaPaths) map[string]string { - r := make(map[string]string, len(dps)) - - for id, v := range dps { - r[id] = v.Path - } - - return r -} - -func pathFromPrevString(ps string) (path.Path, error) { - p, err := path.FromDataLayerPath(ps, false) - if err != nil { - return nil, clues.Wrap(err, "parsing previous path string") - } - - return p, nil -} - -// Returns true if the container passes the scope comparison and should be included. -// Returns: -// - the path representing the directory as it should be stored in the repository. -// - the human-readable path using display names. -// - true if the path passes the scope comparison. -func includeContainer( - ctx context.Context, - qp graph.QueryParams, - c graph.CachedContainer, - scope selectors.ExchangeScope, - category path.CategoryType, -) (path.Path, *path.Builder, bool) { - var ( - directory string - locPath path.Path - pb = c.Path() - loc = c.Location() - ) - - // Clause ensures that DefaultContactFolder is inspected properly - if category == path.ContactsCategory && ptr.Val(c.GetDisplayName()) == api.DefaultContacts { - loc = loc.Append(api.DefaultContacts) - } - - dirPath, err := pb.ToDataLayerExchangePathForCategory( - qp.TenantID, - qp.ProtectedResource.ID(), - category, - false) - // Containers without a path (e.g. Root mail folder) always err here. - if err != nil { - return nil, nil, false - } - - directory = dirPath.Folder(false) - - if loc != nil { - locPath, err = loc.ToDataLayerExchangePathForCategory( - qp.TenantID, - qp.ProtectedResource.ID(), - category, - false) - // Containers without a path (e.g. Root mail folder) always err here. - if err != nil { - return nil, nil, false - } - - directory = locPath.Folder(false) - } - - var ok bool - - switch category { - case path.EmailCategory: - ok = scope.Matches(selectors.ExchangeMailFolder, directory) - case path.ContactsCategory: - ok = scope.Matches(selectors.ExchangeContactFolder, directory) - case path.EventsCategory: - ok = scope.Matches(selectors.ExchangeEventCalendar, directory) - default: - return nil, nil, false - } - - logger.Ctx(ctx).With( - "included", ok, - "scope", scope, - "matches_input", directory, - ).Debug("backup folder selection filter") - - return dirPath, loc, ok -} - -// --------------------------------------------------------------------------- -// metadata collection parsing -// --------------------------------------------------------------------------- - -// MetadataFileNames produces the category-specific set of filenames used to -// store graph metadata such as delta tokens and folderID->path references. -func MetadataFileNames(cat path.CategoryType) []string { - switch cat { - case path.EmailCategory, path.ContactsCategory: - return []string{graph.DeltaURLsFileName, graph.PreviousPathFileName} - default: - return []string{graph.PreviousPathFileName} - } -} - -type CatDeltaPaths map[path.CategoryType]DeltaPaths - -type DeltaPaths map[string]DeltaPath - -func (dps DeltaPaths) AddDelta(k, d string) { - dp, ok := dps[k] - if !ok { - dp = DeltaPath{} - } - - dp.Delta = d - dps[k] = dp -} - -func (dps DeltaPaths) AddPath(k, p string) { - dp, ok := dps[k] - if !ok { - dp = DeltaPath{} - } - - dp.Path = p - dps[k] = dp -} - -type DeltaPath struct { - Delta string - Path string -} - -// ParseMetadataCollections produces a map of structs holding delta -// and path lookup maps. -func parseMetadataCollections( - ctx context.Context, - colls []data.RestoreCollection, -) (CatDeltaPaths, bool, error) { - // cdp stores metadata - cdp := CatDeltaPaths{ - path.ContactsCategory: {}, - path.EmailCategory: {}, - path.EventsCategory: {}, - } - - // found tracks the metadata we've loaded, to make sure we don't - // fetch overlapping copies. - found := map[path.CategoryType]map[string]struct{}{ - path.ContactsCategory: {}, - path.EmailCategory: {}, - path.EventsCategory: {}, - } - - // errors from metadata items should not stop the backup, - // but it should prevent us from using previous backups - errs := fault.New(true) - - for _, coll := range colls { - var ( - breakLoop bool - items = coll.Items(ctx, errs) - category = coll.FullPath().Category() - ) - - for { - select { - case <-ctx.Done(): - return nil, false, clues.Wrap(ctx.Err(), "parsing collection metadata").WithClues(ctx) - - case item, ok := <-items: - if !ok || errs.Failure() != nil { - breakLoop = true - break - } - - var ( - m = map[string]string{} - cdps = cdp[category] - ) - - err := json.NewDecoder(item.ToReader()).Decode(&m) - if err != nil { - return nil, false, clues.New("decoding metadata json").WithClues(ctx) - } - - switch item.ID() { - case graph.PreviousPathFileName: - if _, ok := found[category]["path"]; ok { - return nil, false, clues.Wrap(clues.New(category.String()), "multiple versions of path metadata").WithClues(ctx) - } - - for k, p := range m { - cdps.AddPath(k, p) - } - - found[category]["path"] = struct{}{} - - case graph.DeltaURLsFileName: - if _, ok := found[category]["delta"]; ok { - return nil, false, clues.Wrap(clues.New(category.String()), "multiple versions of delta metadata").WithClues(ctx) - } - - for k, d := range m { - cdps.AddDelta(k, d) - } - - found[category]["delta"] = struct{}{} - } - - cdp[category] = cdps - } - - if breakLoop { - break - } - } - } - - if errs.Failure() != nil { - logger.CtxErr(ctx, errs.Failure()).Info("reading metadata collection items") - - return CatDeltaPaths{ - path.ContactsCategory: {}, - path.EmailCategory: {}, - path.EventsCategory: {}, - }, false, nil - } - - // Remove any entries that contain a path or a delta, but not both. - // That metadata is considered incomplete, and needs to incur a - // complete backup on the next run. - for _, dps := range cdp { - for k, dp := range dps { - if len(dp.Path) == 0 { - delete(dps, k) - } - } - } - - return cdp, true, nil -} diff --git a/src/internal/m365/service/exchange/restore.go b/src/internal/m365/service/exchange/restore.go index eb5477bf4..2298acbfb 100644 --- a/src/internal/m365/service/exchange/restore.go +++ b/src/internal/m365/service/exchange/restore.go @@ -1,25 +1,18 @@ package exchange import ( - "bytes" "context" - "runtime/trace" "github.com/alcionai/clues" - "github.com/microsoftgraph/msgraph-sdk-go/models" - "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/data" - "github.com/alcionai/corso/src/internal/diagnostics" + "github.com/alcionai/corso/src/internal/m365/collection/exchange" "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/support" - "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/pkg/backup/details" - "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" - "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" ) @@ -42,7 +35,7 @@ func ConsumeRestoreCollections( var ( resourceID = rcc.ProtectedResource.ID() directoryCache = make(map[path.CategoryType]graph.ContainerResolver) - handlers = restoreHandlers(ac) + handlers = exchange.RestoreHandlers(ac) metrics support.CollectionMetrics el = errs.Local() ) @@ -67,18 +60,18 @@ func ConsumeRestoreCollections( } if directoryCache[category] == nil { - gcr := handler.newContainerCache(resourceID) - if err := gcr.Populate(ctx, errs, handler.defaultRootContainer()); err != nil { + gcr := handler.NewContainerCache(resourceID) + if err := gcr.Populate(ctx, errs, handler.DefaultRootContainer()); err != nil { return nil, clues.Wrap(err, "populating container cache") } directoryCache[category] = gcr } - containerID, gcc, err := createDestination( + containerID, gcc, err := exchange.CreateDestination( ictx, handler, - handler.formatRestoreDestination(rcc.RestoreConfig.Location, dc.FullPath()), + handler.FormatRestoreDestination(rcc.RestoreConfig.Location, dc.FullPath()), resourceID, directoryCache[category], errs) @@ -90,13 +83,13 @@ func ConsumeRestoreCollections( directoryCache[category] = gcc ictx = clues.Add(ictx, "restore_destination_id", containerID) - collisionKeyToItemID, err := handler.getItemsInContainerByCollisionKey(ctx, resourceID, containerID) + collisionKeyToItemID, err := handler.GetItemsInContainerByCollisionKey(ctx, resourceID, containerID) if err != nil { el.AddRecoverable(ctx, clues.Wrap(err, "building item collision cache")) continue } - temp, err := restoreCollection( + temp, err := exchange.RestoreCollection( ictx, handler, dc, @@ -128,235 +121,3 @@ func ConsumeRestoreCollections( return status, el.Failure() } - -// restoreCollection handles restoration of an individual collection. -func restoreCollection( - ctx context.Context, - ir itemRestorer, - dc data.RestoreCollection, - resourceID, destinationID string, - collisionKeyToItemID map[string]string, - collisionPolicy control.CollisionPolicy, - deets *details.Builder, - errs *fault.Bus, - ctr *count.Bus, -) (support.CollectionMetrics, error) { - ctx, end := diagnostics.Span(ctx, "m365:exchange:restoreCollection", diagnostics.Label("path", dc.FullPath())) - defer end() - - var ( - el = errs.Local() - metrics support.CollectionMetrics - items = dc.Items(ctx, errs) - fullPath = dc.FullPath() - category = fullPath.Category() - ) - - colProgress := observe.CollectionProgress( - ctx, - category.String(), - fullPath.Folder(false)) - defer close(colProgress) - - for { - select { - case <-ctx.Done(): - return metrics, clues.Wrap(ctx.Err(), "context cancelled").WithClues(ctx) - - case itemData, ok := <-items: - if !ok || el.Failure() != nil { - return metrics, el.Failure() - } - - ictx := clues.Add(ctx, "item_id", itemData.ID()) - trace.Log(ictx, "m365:exchange:restoreCollection:item", itemData.ID()) - metrics.Objects++ - - buf := &bytes.Buffer{} - - _, err := buf.ReadFrom(itemData.ToReader()) - if err != nil { - el.AddRecoverable(ctx, clues.Wrap(err, "reading item bytes").WithClues(ictx)) - continue - } - - body := buf.Bytes() - - info, err := ir.restore( - ictx, - body, - resourceID, - destinationID, - collisionKeyToItemID, - collisionPolicy, - errs, - ctr) - if err != nil { - if !graph.IsErrItemAlreadyExistsConflict(err) { - el.AddRecoverable(ictx, err) - } - - continue - } - - metrics.Bytes += int64(len(body)) - metrics.Successes++ - - // FIXME: this may be the incorrect path. If we restored within a top-level - // destination folder, then the restore path no longer matches the fullPath. - itemPath, err := fullPath.AppendItem(itemData.ID()) - if err != nil { - el.AddRecoverable(ctx, clues.Wrap(err, "adding item to collection path").WithClues(ctx)) - continue - } - - locationRef := path.Builder{}.Append(itemPath.Folders()...) - - err = deets.Add( - itemPath, - locationRef, - details.ItemInfo{ - Exchange: info, - }) - if err != nil { - // These deets additions are for cli display purposes only. - // no need to fail out on error. - logger.Ctx(ctx).Infow("accounting for restored item", "error", err) - } - - colProgress <- struct{}{} - } - } -} - -// createDestination creates folders in sequence -// [root leaf1 leaf2] similar to a linked list. -// @param directory is the desired path from the root to the container -// that the items will be restored into. -func createDestination( - ctx context.Context, - ca containerAPI, - destination *path.Builder, - resourceID string, - gcr graph.ContainerResolver, - errs *fault.Bus, -) (string, graph.ContainerResolver, error) { - var ( - cache = gcr - restoreLoc = &path.Builder{} - containerParentID string - ) - - for _, container := range destination.Elements() { - restoreLoc = restoreLoc.Append(container) - - ictx := clues.Add( - ctx, - "container_parent_id", containerParentID, - "container_name", container, - "restore_location", restoreLoc) - - containerID, err := getOrPopulateContainer( - ictx, - ca, - cache, - restoreLoc, - resourceID, - containerParentID, - container, - errs) - if err != nil { - return "", cache, clues.Stack(err) - } - - containerParentID = containerID - } - - // containerParentID now identifies the last created container, - // not its parent. - return containerParentID, cache, nil -} - -func getOrPopulateContainer( - ctx context.Context, - ca containerAPI, - gcr graph.ContainerResolver, - restoreLoc *path.Builder, - resourceID, containerParentID, containerName string, - errs *fault.Bus, -) (string, error) { - cached, ok := gcr.LocationInCache(restoreLoc.String()) - if ok { - return cached, nil - } - - c, err := ca.CreateContainer(ctx, resourceID, containerParentID, containerName) - - // 409 handling case: - // attempt to fetch the container by name and add that result to the cache. - // This is rare, but may happen if CreateContainer() POST fails with 5xx: - // sometimes the backend will create the folder despite the 5xx response, - // leaving our local containerResolver with inconsistent state. - if graph.IsErrFolderExists(err) { - cc, e := ca.GetContainerByName(ctx, resourceID, containerParentID, containerName) - if e != nil { - err = clues.Stack(err, e) - } else { - c = cc - err = nil - } - } - - if err != nil { - return "", clues.Wrap(err, "creating restore container") - } - - folderID := ptr.Val(c.GetId()) - - if err = gcr.AddToCache(ctx, c); err != nil { - return "", clues.Wrap(err, "adding container to cache") - } - - return folderID, nil -} - -func uploadAttachments( - ctx context.Context, - ap attachmentPoster, - as []models.Attachmentable, - resourceID, destinationID, itemID string, - errs *fault.Bus, -) error { - el := errs.Local() - - for _, a := range as { - if el.Failure() != nil { - return el.Failure() - } - - err := uploadAttachment( - ctx, - ap, - resourceID, - destinationID, - itemID, - a) - if err != nil { - // FIXME: I don't know why we're swallowing this error case. - // It needs investigation: https://github.com/alcionai/corso/issues/3498 - if ptr.Val(a.GetOdataType()) == "#microsoft.graph.itemAttachment" { - name := ptr.Val(a.GetName()) - - logger.CtxErr(ctx, err). - With("attachment_name", name). - Info("mail upload failed") - - continue - } - - el.AddRecoverable(ctx, clues.Wrap(err, "uploading mail attachment").WithClues(ctx)) - } - } - - return el.Failure() -} diff --git a/src/internal/m365/service/exchange/testdata/handlers.go b/src/internal/m365/service/exchange/testdata/handlers.go index 2a62e609f..93b230310 100644 --- a/src/internal/m365/service/exchange/testdata/handlers.go +++ b/src/internal/m365/service/exchange/testdata/handlers.go @@ -7,8 +7,8 @@ import ( "github.com/alcionai/clues" "github.com/stretchr/testify/require" + "github.com/alcionai/corso/src/internal/m365/collection/exchange" "github.com/alcionai/corso/src/internal/m365/graph" - "github.com/alcionai/corso/src/internal/m365/service/exchange" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/services/m365/api" diff --git a/src/internal/operations/test/exchange_test.go b/src/internal/operations/test/exchange_test.go index 226b8265c..6be354d42 100644 --- a/src/internal/operations/test/exchange_test.go +++ b/src/internal/operations/test/exchange_test.go @@ -18,9 +18,9 @@ import ( "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/events" evmock "github.com/alcionai/corso/src/internal/events/mock" + "github.com/alcionai/corso/src/internal/m365/collection/exchange" "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/resource" - "github.com/alcionai/corso/src/internal/m365/service/exchange" exchMock "github.com/alcionai/corso/src/internal/m365/service/exchange/mock" exchTD "github.com/alcionai/corso/src/internal/m365/service/exchange/testdata" "github.com/alcionai/corso/src/internal/tester"