From e3bde064575822169c60d5ce0122d4c09ff7ba6c Mon Sep 17 00:00:00 2001 From: Keepers Date: Tue, 12 Sep 2023 19:48:44 -0600 Subject: [PATCH] add non-delta message paging path (#4229) similar to exchange, there are conditions under which channel messages are unable to use the delta api. In particular, when the channel has no valid email property. This change ensures that we enumerate the entire channel under those conditions, instead of failing out. Technically, the only situation we know of where the email property is missing, and thus the channel cannot make delta queries, is when there are zero messages in the channel already. However, there may be unknown cases that we haven't caught, so instead of skipping the channel we're going to handle non-delta enumeration for future safeguarding. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :bug: Bugfix #### Issue(s) * #3989 #### Test Plan - [x] :zap: Unit test - [x] :green_heart: E2E --- src/internal/common/str/str.go | 10 ++ src/internal/m365/collection/groups/backup.go | 10 +- .../m365/collection/groups/backup_test.go | 21 ++-- .../m365/collection/groups/channel_handler.go | 7 +- .../m365/collection/groups/handlers.go | 7 +- src/pkg/services/m365/api/channels_pager.go | 106 +++++++++++------- .../services/m365/api/channels_pager_test.go | 12 +- src/pkg/services/m365/api/contacts_pager.go | 3 +- src/pkg/services/m365/api/events_pager.go | 3 +- src/pkg/services/m365/api/item_pager.go | 43 ++++++- src/pkg/services/m365/api/item_pager_test.go | 3 +- src/pkg/services/m365/api/mail_pager.go | 3 +- 12 files changed, 161 insertions(+), 67 deletions(-) diff --git a/src/internal/common/str/str.go b/src/internal/common/str/str.go index 59f36933d..ac9caf7d3 100644 --- a/src/internal/common/str/str.go +++ b/src/internal/common/str/str.go @@ -80,3 +80,13 @@ func Preview(s string, size int) string { return ss } + +func SliceToMap(ss []string) map[string]struct{} { + m := map[string]struct{}{} + + for _, s := range ss { + m[s] = struct{}{} + } + + return m +} diff --git a/src/internal/m365/collection/groups/backup.go b/src/internal/m365/collection/groups/backup.go index 82ab67daa..864bed697 100644 --- a/src/internal/m365/collection/groups/backup.go +++ b/src/internal/m365/collection/groups/backup.go @@ -8,6 +8,7 @@ import ( "github.com/alcionai/corso/src/internal/common/pii" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/str" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/support" @@ -154,12 +155,19 @@ func populateCollections( ictx = clues.Add(ictx, "previous_path", prevPath) - added, removed, du, err := bh.getChannelMessageIDsDelta(ctx, cID, prevDelta) + // if the channel has no email property, it is unable to process delta tokens + // and will return an error if a delta token is queried. + canMakeDeltaQueries := len(ptr.Val(c.GetEmail())) > 0 + + add, rem, du, err := bh.getChannelMessageIDs(ctx, cID, prevDelta, canMakeDeltaQueries) if err != nil { el.AddRecoverable(ctx, clues.Stack(err)) continue } + added := str.SliceToMap(add) + removed := str.SliceToMap(rem) + if len(du.URL) > 0 { deltaURLs[cID] = du.URL } else if !du.Reset { diff --git a/src/internal/m365/collection/groups/backup_test.go b/src/internal/m365/collection/groups/backup_test.go index 8492abb06..60c6b4bf3 100644 --- a/src/internal/m365/collection/groups/backup_test.go +++ b/src/internal/m365/collection/groups/backup_test.go @@ -40,8 +40,8 @@ var _ backupHandler = &mockBackupHandler{} type mockBackupHandler struct { channels []models.Channelable channelsErr error - messageIDs map[string]struct{} - deletedMsgIDs map[string]struct{} + messageIDs []string + deletedMsgIDs []string messagesErr error messages map[string]models.ChatMessageable info map[string]*details.GroupsInfo @@ -53,10 +53,11 @@ func (bh mockBackupHandler) getChannels(context.Context) ([]models.Channelable, return bh.channels, bh.channelsErr } -func (bh mockBackupHandler) getChannelMessageIDsDelta( +func (bh mockBackupHandler) getChannelMessageIDs( _ context.Context, _, _ string, -) (map[string]struct{}, map[string]struct{}, api.DeltaUpdate, error) { + _ bool, +) ([]string, []string, api.DeltaUpdate, error) { return bh.messageIDs, bh.deletedMsgIDs, api.DeltaUpdate{}, bh.messagesErr } @@ -131,7 +132,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() { name: "happy path, one container", mock: mockBackupHandler{ channels: testdata.StubChannels("one"), - messageIDs: map[string]struct{}{"msg-one": {}}, + messageIDs: []string{"msg-one"}, }, expectErr: require.NoError, expectColls: 2, @@ -142,7 +143,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() { name: "happy path, one container, only deleted messages", mock: mockBackupHandler{ channels: testdata.StubChannels("one"), - deletedMsgIDs: map[string]struct{}{"msg-one": {}}, + deletedMsgIDs: []string{"msg-one"}, }, expectErr: require.NoError, expectColls: 2, @@ -153,7 +154,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() { name: "happy path, many containers", mock: mockBackupHandler{ channels: testdata.StubChannels("one", "two"), - messageIDs: map[string]struct{}{"msg-one": {}}, + messageIDs: []string{"msg-one"}, }, expectErr: require.NoError, expectColls: 3, @@ -291,7 +292,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections_incremental() { name: "non incremental", mock: mockBackupHandler{ channels: testdata.StubChannels("chan"), - messageIDs: map[string]struct{}{"msg": {}}, + messageIDs: []string{"msg"}, }, deltaPaths: metadata.DeltaPaths{}, expectErr: require.NoError, @@ -304,7 +305,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections_incremental() { name: "incremental", mock: mockBackupHandler{ channels: testdata.StubChannels("chan"), - deletedMsgIDs: map[string]struct{}{"msg": {}}, + deletedMsgIDs: []string{"msg"}, }, deltaPaths: metadata.DeltaPaths{ "chan": { @@ -356,7 +357,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections_incremental() { name: "incremental new and deleted channel", mock: mockBackupHandler{ channels: testdata.StubChannels("chan2"), - messageIDs: map[string]struct{}{"msg": {}}, + messageIDs: []string{"msg"}, }, deltaPaths: metadata.DeltaPaths{ "chan": { diff --git a/src/internal/m365/collection/groups/channel_handler.go b/src/internal/m365/collection/groups/channel_handler.go index 895f57baa..1512e2dbf 100644 --- a/src/internal/m365/collection/groups/channel_handler.go +++ b/src/internal/m365/collection/groups/channel_handler.go @@ -36,11 +36,12 @@ func (bh channelsBackupHandler) getChannels( return bh.ac.GetChannels(ctx, bh.protectedResource) } -func (bh channelsBackupHandler) getChannelMessageIDsDelta( +func (bh channelsBackupHandler) getChannelMessageIDs( ctx context.Context, channelID, prevDelta string, -) (map[string]struct{}, map[string]struct{}, api.DeltaUpdate, error) { - return bh.ac.GetChannelMessageIDsDelta(ctx, bh.protectedResource, channelID, prevDelta) + canMakeDeltaQueries bool, +) ([]string, []string, api.DeltaUpdate, error) { + return bh.ac.GetChannelMessageIDs(ctx, bh.protectedResource, channelID, prevDelta, canMakeDeltaQueries) } func (bh channelsBackupHandler) includeContainer( diff --git a/src/internal/m365/collection/groups/handlers.go b/src/internal/m365/collection/groups/handlers.go index de00dca66..f46b3a099 100644 --- a/src/internal/m365/collection/groups/handlers.go +++ b/src/internal/m365/collection/groups/handlers.go @@ -20,11 +20,12 @@ type backupHandler interface { ctx context.Context, ) ([]models.Channelable, error) - // gets all message IDs by delta in the channel - getChannelMessageIDsDelta( + // gets all message IDs (by delta, if possible) in the channel + getChannelMessageIDs( ctx context.Context, channelID, prevDelta string, - ) (map[string]struct{}, map[string]struct{}, api.DeltaUpdate, error) + canMakeDeltaQueries bool, + ) ([]string, []string, api.DeltaUpdate, error) // includeContainer evaluates whether the channel is included // in the provided scope. diff --git a/src/pkg/services/m365/api/channels_pager.go b/src/pkg/services/m365/api/channels_pager.go index 1e26c458f..ae6b0ee19 100644 --- a/src/pkg/services/m365/api/channels_pager.go +++ b/src/pkg/services/m365/api/channels_pager.go @@ -3,18 +3,72 @@ package api import ( "context" + "github.com/alcionai/clues" "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/teams" "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/m365/graph" - "github.com/alcionai/corso/src/pkg/logger" ) // --------------------------------------------------------------------------- // channel message pager // --------------------------------------------------------------------------- +var _ Pager[models.ChatMessageable] = &channelMessagePageCtrl{} + +type channelMessagePageCtrl struct { + resourceID, channelID string + gs graph.Servicer + builder *teams.ItemChannelsItemMessagesRequestBuilder + options *teams.ItemChannelsItemMessagesRequestBuilderGetRequestConfiguration +} + +func (p *channelMessagePageCtrl) SetNextLink(nextLink string) { + p.builder = teams.NewItemChannelsItemMessagesRequestBuilder(nextLink, p.gs.Adapter()) +} + +func (p *channelMessagePageCtrl) GetPage( + ctx context.Context, +) (NextLinkValuer[models.ChatMessageable], error) { + resp, err := p.builder.Get(ctx, p.options) + return resp, graph.Stack(ctx, err).OrNil() +} + +func (c Channels) NewChannelMessagePager( + teamID, channelID string, + selectProps ...string, +) *channelMessagePageCtrl { + builder := c.Stable. + Client(). + Teams(). + ByTeamIdString(teamID). + Channels(). + ByChannelIdString(channelID). + Messages() + + options := &teams.ItemChannelsItemMessagesRequestBuilderGetRequestConfiguration{ + QueryParameters: &teams.ItemChannelsItemMessagesRequestBuilderGetQueryParameters{}, + Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)), + } + + if len(selectProps) > 0 { + options.QueryParameters.Select = selectProps + } + + return &channelMessagePageCtrl{ + resourceID: teamID, + channelID: channelID, + builder: builder, + gs: c.Stable, + options: options, + } +} + +// --------------------------------------------------------------------------- +// channel message delta pager +// --------------------------------------------------------------------------- + var _ DeltaPager[models.ChatMessageable] = &channelMessageDeltaPageCtrl{} type channelMessageDeltaPageCtrl struct { @@ -65,7 +119,7 @@ func (c Channels) NewChannelMessageDeltaPager( options := &teams.ItemChannelsItemMessagesDeltaRequestBuilderGetRequestConfiguration{ QueryParameters: &teams.ItemChannelsItemMessagesDeltaRequestBuilderGetQueryParameters{}, - Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)), + Headers: newPreferHeaders(preferPageSize(maxDeltaPageSize)), } if len(selectProps) > 0 { @@ -83,44 +137,20 @@ func (c Channels) NewChannelMessageDeltaPager( // GetChannelMessageIDsDelta fetches a delta of all messages in the channel. // returns two maps: addedItems, deletedItems -func (c Channels) GetChannelMessageIDsDelta( +func (c Channels) GetChannelMessageIDs( ctx context.Context, - teamID, channelID, prevDelta string, -) (map[string]struct{}, map[string]struct{}, DeltaUpdate, error) { - var ( - added = map[string]struct{}{} - deleted = map[string]struct{}{} - // select is not currently allowed on messages - // this func will still isolate to the ID, however, - // because we need the follow-up get request to gather - // all replies to the message. - // selectProps = idAnd() - pager = c.NewChannelMessageDeltaPager(teamID, channelID, prevDelta) - ) + teamID, channelID, prevDeltaLink string, + canMakeDeltaQueries bool, +) ([]string, []string, DeltaUpdate, error) { + added, removed, du, err := getAddedAndRemovedItemIDs( + ctx, + c.NewChannelMessagePager(teamID, channelID), + c.NewChannelMessageDeltaPager(teamID, channelID, prevDeltaLink), + prevDeltaLink, + canMakeDeltaQueries, + addedAndRemovedByDeletedDateTime) - results, du, err := deltaEnumerateItems[models.ChatMessageable](ctx, pager, prevDelta) - if graph.IsErrInvalidDelta(err) { - logger.Ctx(ctx).Infow("delta token not supported", "delta_link", prevDelta) - - added = map[string]struct{}{} - deleted = map[string]struct{}{} - - return added, deleted, du, nil - } - - if err != nil { - return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "extracting channel messages from response") - } - - for _, r := range results { - if r.GetAdditionalData()[graph.AddtlDataRemoved] == nil { - added[ptr.Val(r.GetId())] = struct{}{} - } else { - deleted[ptr.Val(r.GetId())] = struct{}{} - } - } - - return added, deleted, du, nil + return added, removed, du, clues.Stack(err).OrNil() } // --------------------------------------------------------------------------- diff --git a/src/pkg/services/m365/api/channels_pager_test.go b/src/pkg/services/m365/api/channels_pager_test.go index b0bbc0bf5..488ac8e0d 100644 --- a/src/pkg/services/m365/api/channels_pager_test.go +++ b/src/pkg/services/m365/api/channels_pager_test.go @@ -56,28 +56,30 @@ func (suite *ChannelsPagerIntgSuite) TestEnumerateChannelMessages() { ctx, flush := tester.NewContext(t) defer flush() - addedIDs, _, du, err := ac.GetChannelMessageIDsDelta( + addedIDs, _, du, err := ac.GetChannelMessageIDs( ctx, suite.its.group.id, suite.its.group.testContainerID, - "") + "", + true) require.NoError(t, err, clues.ToCore(err)) require.NotEmpty(t, addedIDs) require.NotZero(t, du.URL, "delta link") require.True(t, du.Reset, "reset due to empty prev delta link") - addedIDs, deletedIDs, du, err := ac.GetChannelMessageIDsDelta( + addedIDs, deletedIDs, du, err := ac.GetChannelMessageIDs( ctx, suite.its.group.id, suite.its.group.testContainerID, - du.URL) + du.URL, + true) require.NoError(t, err, clues.ToCore(err)) require.Empty(t, addedIDs, "should have no new messages from delta") require.Empty(t, deletedIDs, "should have no deleted messages from delta") require.NotZero(t, du.URL, "delta link") require.False(t, du.Reset, "prev delta link should be valid") - for id := range addedIDs { + for _, id := range addedIDs { suite.Run(id+"-replies", func() { testEnumerateChannelMessageReplies( suite.T(), diff --git a/src/pkg/services/m365/api/contacts_pager.go b/src/pkg/services/m365/api/contacts_pager.go index 33b83db6e..711cb6948 100644 --- a/src/pkg/services/m365/api/contacts_pager.go +++ b/src/pkg/services/m365/api/contacts_pager.go @@ -278,5 +278,6 @@ func (c Contacts) GetAddedAndRemovedItemIDs( pager, deltaPager, prevDeltaLink, - canMakeDeltaQueries) + canMakeDeltaQueries, + addedAndRemovedByAddtlData) } diff --git a/src/pkg/services/m365/api/events_pager.go b/src/pkg/services/m365/api/events_pager.go index 542479add..4a2f1adaf 100644 --- a/src/pkg/services/m365/api/events_pager.go +++ b/src/pkg/services/m365/api/events_pager.go @@ -277,5 +277,6 @@ func (c Events) GetAddedAndRemovedItemIDs( pager, deltaPager, prevDeltaLink, - canMakeDeltaQueries) + canMakeDeltaQueries, + addedAndRemovedByAddtlData) } diff --git a/src/pkg/services/m365/api/item_pager.go b/src/pkg/services/m365/api/item_pager.go index 4182b3c99..1f0ac17b2 100644 --- a/src/pkg/services/m365/api/item_pager.go +++ b/src/pkg/services/m365/api/item_pager.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/alcionai/clues" @@ -172,12 +173,15 @@ func deltaEnumerateItems[T any]( // shared enumeration runner funcs // --------------------------------------------------------------------------- +type addedAndRemovedHandler[T any] func(items []T) ([]string, []string, error) + func getAddedAndRemovedItemIDs[T any]( ctx context.Context, pager Pager[T], deltaPager DeltaPager[T], prevDeltaLink string, canMakeDeltaQueries bool, + aarh addedAndRemovedHandler[T], ) ([]string, []string, DeltaUpdate, error) { if canMakeDeltaQueries { ts, du, err := deltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink) @@ -186,7 +190,7 @@ func getAddedAndRemovedItemIDs[T any]( } if err == nil { - a, r, err := addedAndRemovedByAddtlData(ts) + a, r, err := aarh(ts) return a, r, du, graph.Stack(ctx, err).OrNil() } } @@ -198,13 +202,19 @@ func getAddedAndRemovedItemIDs[T any]( return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err) } - a, r, err := addedAndRemovedByAddtlData[T](ts) + a, r, err := aarh(ts) return a, r, du, graph.Stack(ctx, err).OrNil() } -type getIDAndAddtler interface { +type getIDer interface { GetId() *string +} + +// for added and removed by additionalData[@removed] + +type getIDAndAddtler interface { + getIDer GetAdditionalData() map[string]any } @@ -230,3 +240,30 @@ func addedAndRemovedByAddtlData[T any](items []T) ([]string, []string, error) { return added, removed, nil } + +// for added and removed by GetDeletedDateTime() + +type getIDAndDeletedDateTimer interface { + getIDer + GetDeletedDateTime() *time.Time +} + +func addedAndRemovedByDeletedDateTime[T any](items []T) ([]string, []string, error) { + added, removed := []string{}, []string{} + + for _, item := range items { + giaddt, ok := any(item).(getIDAndDeletedDateTimer) + if !ok { + return nil, nil, clues.New("item does not provide id and deleted date time getters"). + With("item_type", fmt.Sprintf("%T", item)) + } + + if giaddt.GetDeletedDateTime() == nil { + added = append(added, ptr.Val(giaddt.GetId())) + } else { + removed = append(removed, ptr.Val(giaddt.GetId())) + } + } + + return added, removed, nil +} diff --git a/src/pkg/services/m365/api/item_pager_test.go b/src/pkg/services/m365/api/item_pager_test.go index f7f053c33..e072e1b85 100644 --- a/src/pkg/services/m365/api/item_pager_test.go +++ b/src/pkg/services/m365/api/item_pager_test.go @@ -363,7 +363,8 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { test.pagerGetter(t), test.deltaPagerGetter(t), test.prevDelta, - test.canDelta) + test.canDelta, + addedAndRemovedByAddtlData) require.NoErrorf(t, err, "getting added and removed item IDs: %+v", clues.ToCore(err)) require.EqualValues(t, test.expect.added, added, "added item IDs") diff --git a/src/pkg/services/m365/api/mail_pager.go b/src/pkg/services/m365/api/mail_pager.go index c15e89dd1..1b4263b88 100644 --- a/src/pkg/services/m365/api/mail_pager.go +++ b/src/pkg/services/m365/api/mail_pager.go @@ -308,5 +308,6 @@ func (c Mail) GetAddedAndRemovedItemIDs( pager, deltaPager, prevDeltaLink, - canMakeDeltaQueries) + canMakeDeltaQueries, + addedAndRemovedByAddtlData) }