From de22131b234bb4471201bd832d976073000ea634 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Mon, 5 Feb 2024 14:24:09 -0800 Subject: [PATCH] Split tombstone handling for groups services (#5170) * Prerequisite to the currently failing PR https://github.com/alcionai/corso/pull/5154 * Conversations tombstone IDs are currently 2 part ( `convID/threadID`). * However, tombstones are looked up by `convID` part only, see [code](https://github.com/alcionai/corso/blob/9a603d1f2181b359be43c4fce262c1ad1ac724eb/src/internal/m365/collection/groups/backup.go#L128). * Adding a change so that tombstone IDs are reduced to `convID`. Otherwise, we'd never delete tombstones while processing collections, and we'll run into `conflict: tombstone exists for a live collection` errors. * This is safe to do as there is always a 1:1 relationship between `convID` and `threadID`. For e.g. attempting to create another thread inside a conversation creates a new conversation. --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [ ] :sunflower: Feature - [x] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) * # #### Test Plan - [x] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- src/internal/m365/collection/groups/backup.go | 14 ++-- .../m365/collection/groups/backup_test.go | 7 ++ .../m365/collection/groups/channel_handler.go | 7 ++ .../collection/groups/conversation_handler.go | 31 ++++++++ .../groups/conversation_handler_test.go | 70 +++++++++++++++++++ .../m365/collection/groups/handlers.go | 9 +++ 6 files changed, 133 insertions(+), 5 deletions(-) diff --git a/src/internal/m365/collection/groups/backup.go b/src/internal/m365/collection/groups/backup.go index 290c24aff..0d23fa83e 100644 --- a/src/internal/m365/collection/groups/backup.go +++ b/src/internal/m365/collection/groups/backup.go @@ -105,13 +105,17 @@ func populateCollections[C graph.GetIDer, I groupsItemer]( // channel ID -> delta url or folder path lookups deltaURLs = map[string]string{} currPaths = map[string]string{} - // copy of previousPaths. every channel present in the slice param - // gets removed from this map; the remaining channels at the end of - // the process have been deleted. - tombstones = makeTombstones(dps) - el = errs.Local() + el = errs.Local() ) + // Copy of previousPaths. Every container present in the slice param + // gets removed from this map; the remaining containers at the end of + // the process have been deleted. + tombstones, err := bh.makeTombstones(dps) + if err != nil { + return nil, clues.StackWC(ctx, err) + } + logger.Ctx(ctx).Infow("filling collections", "len_deltapaths", len(dps)) for _, c := range containers { diff --git a/src/internal/m365/collection/groups/backup_test.go b/src/internal/m365/collection/groups/backup_test.go index d7ed5c368..b52d5644d 100644 --- a/src/internal/m365/collection/groups/backup_test.go +++ b/src/internal/m365/collection/groups/backup_test.go @@ -149,6 +149,13 @@ func (bh mockBackupHandler) getItemMetadata( return nil, 0, errMetadataFilesNotSupported } +//lint:ignore U1000 false linter issue due to generics +func (bh mockBackupHandler) makeTombstones( + dps metadata.DeltaPaths, +) (map[string]string, error) { + return makeTombstones(dps), nil +} + // --------------------------------------------------------------------------- // Unit Suite // --------------------------------------------------------------------------- diff --git a/src/internal/m365/collection/groups/channel_handler.go b/src/internal/m365/collection/groups/channel_handler.go index 1ed528a5c..7894295ac 100644 --- a/src/internal/m365/collection/groups/channel_handler.go +++ b/src/internal/m365/collection/groups/channel_handler.go @@ -9,6 +9,7 @@ import ( "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/backup/metadata" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/services/m365/api" @@ -131,6 +132,12 @@ func (bh channelsBackupHandler) supportsItemMetadata() bool { return false } +func (bh channelsBackupHandler) makeTombstones( + dps metadata.DeltaPaths, +) (map[string]string, error) { + return makeTombstones(dps), nil +} + func channelContainer(ch models.Channelable) container[models.Channelable] { return container[models.Channelable]{ storageDirFolders: path.Elements{ptr.Val(ch.GetId())}, diff --git a/src/internal/m365/collection/groups/conversation_handler.go b/src/internal/m365/collection/groups/conversation_handler.go index 9cadee5ed..89a20d995 100644 --- a/src/internal/m365/collection/groups/conversation_handler.go +++ b/src/internal/m365/collection/groups/conversation_handler.go @@ -12,6 +12,7 @@ import ( "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/m365/collection/groups/metadata" "github.com/alcionai/corso/src/pkg/backup/details" + deltaPath "github.com/alcionai/corso/src/pkg/backup/metadata" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/services/m365/api" @@ -175,6 +176,36 @@ func (bh conversationsBackupHandler) supportsItemMetadata() bool { return true } +func (bh conversationsBackupHandler) makeTombstones( + dps deltaPath.DeltaPaths, +) (map[string]string, error) { + r := make(map[string]string, len(dps)) + + for id, v := range dps { + // ID is of format conversationID/threadID. Tombstones are looked up + // by conversationID only, so remove the threadID part. This is safe + // because every conversation has only one thread. + elems := path.Split(id) + if len(elems) != 2 { + return nil, clues.New("invalid prev path") + } + + r[elems[0]] = v.Path + } + + // We are assuming a 1:1 mapping between conversations and threads. While + // this is true today, graph behavior may change in future. Throw an error + // if the assumption is violated. + // + // We cannot catch this error with tests because creating conversations + // requires delegated access. + if len(dps) != len(r) { + return nil, clues.New("multiple threads exist for a conversation") + } + + return r, nil +} + func conversationThreadContainer( c models.Conversationable, t models.ConversationThreadable, diff --git a/src/internal/m365/collection/groups/conversation_handler_test.go b/src/internal/m365/collection/groups/conversation_handler_test.go index 8f86db5f4..b75bebd5c 100644 --- a/src/internal/m365/collection/groups/conversation_handler_test.go +++ b/src/internal/m365/collection/groups/conversation_handler_test.go @@ -14,6 +14,7 @@ import ( "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/m365/collection/groups/metadata" "github.com/alcionai/corso/src/internal/tester" + deltaPath "github.com/alcionai/corso/src/pkg/backup/metadata" ) const ( @@ -64,3 +65,72 @@ func (suite *ConversationHandlerUnitSuite) TestGetItemMetadata() { assert.Equal(t, []string{resourceEmail}, meta.Recipients, "incorrect recipients") assert.Equal(t, ptr.Val(conv.GetTopic()), meta.Topic, "incorrect topic") } + +func (suite *ConversationHandlerUnitSuite) TestMakeTombstones() { + table := []struct { + name string + dps deltaPath.DeltaPaths + expected map[string]string + expectedErr require.ErrorAssertionFunc + }{ + { + name: "valid", + dps: deltaPath.DeltaPaths{ + "c1/t1": deltaPath.DeltaPath{ + Path: "p1", + }, + "c2/t2": deltaPath.DeltaPath{ + Path: "p2", + }, + }, + expected: map[string]string{ + "c1": "p1", + "c2": "p2", + }, + expectedErr: require.NoError, + }, + { + name: "invalid prev path", + dps: deltaPath.DeltaPaths{ + "c1": deltaPath.DeltaPath{ + Path: "p1", + }, + }, + expected: nil, + expectedErr: require.Error, + }, + { + name: "invalid prev path 2", + dps: deltaPath.DeltaPaths{ + "c1/t1/a1": deltaPath.DeltaPath{ + Path: "p1", + }, + }, + expected: nil, + expectedErr: require.Error, + }, + { + name: "multiple threads exist for a conversation", + dps: deltaPath.DeltaPaths{ + "c1/t1": deltaPath.DeltaPath{ + Path: "p1", + }, + "c1/t2": deltaPath.DeltaPath{ + Path: "p2", + }, + }, + expected: nil, + expectedErr: require.Error, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + bh := conversationsBackupHandler{} + + result, err := bh.makeTombstones(test.dps) + test.expectedErr(t, err) + assert.Equal(t, test.expected, result) + }) + } +} diff --git a/src/internal/m365/collection/groups/handlers.go b/src/internal/m365/collection/groups/handlers.go index 8c7090d2f..82279ebd3 100644 --- a/src/internal/m365/collection/groups/handlers.go +++ b/src/internal/m365/collection/groups/handlers.go @@ -7,6 +7,7 @@ import ( "github.com/microsoft/kiota-abstractions-go/serialization" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/backup/metadata" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/services/m365/api" @@ -30,6 +31,7 @@ type backupHandler[C graph.GetIDer, I groupsItemer] interface { includeContainerer[C] canonicalPather canMakeDeltaQuerieser + makeTombstoneser } type getItemAndAugmentInfoer[C graph.GetIDer, I groupsItemer] interface { @@ -107,6 +109,13 @@ type canMakeDeltaQuerieser interface { canMakeDeltaQueries() bool } +// produces a set of id:path pairs from the deltapaths map. +// Each entry in the set will, if not removed, produce a collection +// that will delete the tombstone by path. +type makeTombstoneser interface { + makeTombstones(dps metadata.DeltaPaths) (map[string]string, error) +} + // --------------------------------------------------------------------------- // Container management // ---------------------------------------------------------------------------