diff --git a/src/internal/common/str/str.go b/src/internal/common/str/str.go index 59f36933d..ac9caf7d3 100644 --- a/src/internal/common/str/str.go +++ b/src/internal/common/str/str.go @@ -80,3 +80,13 @@ func Preview(s string, size int) string { return ss } + +func SliceToMap(ss []string) map[string]struct{} { + m := map[string]struct{}{} + + for _, s := range ss { + m[s] = struct{}{} + } + + return m +} diff --git a/src/internal/m365/collection/groups/backup.go b/src/internal/m365/collection/groups/backup.go index 82ab67daa..864bed697 100644 --- a/src/internal/m365/collection/groups/backup.go +++ b/src/internal/m365/collection/groups/backup.go @@ -8,6 +8,7 @@ import ( "github.com/alcionai/corso/src/internal/common/pii" "github.com/alcionai/corso/src/internal/common/ptr" + "github.com/alcionai/corso/src/internal/common/str" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/support" @@ -154,12 +155,19 @@ func populateCollections( ictx = clues.Add(ictx, "previous_path", prevPath) - added, removed, du, err := bh.getChannelMessageIDsDelta(ctx, cID, prevDelta) + // if the channel has no email property, it is unable to process delta tokens + // and will return an error if a delta token is queried. + canMakeDeltaQueries := len(ptr.Val(c.GetEmail())) > 0 + + add, rem, du, err := bh.getChannelMessageIDs(ctx, cID, prevDelta, canMakeDeltaQueries) if err != nil { el.AddRecoverable(ctx, clues.Stack(err)) continue } + added := str.SliceToMap(add) + removed := str.SliceToMap(rem) + if len(du.URL) > 0 { deltaURLs[cID] = du.URL } else if !du.Reset { diff --git a/src/internal/m365/collection/groups/backup_test.go b/src/internal/m365/collection/groups/backup_test.go index 8492abb06..60c6b4bf3 100644 --- a/src/internal/m365/collection/groups/backup_test.go +++ b/src/internal/m365/collection/groups/backup_test.go @@ -40,8 +40,8 @@ var _ backupHandler = &mockBackupHandler{} type mockBackupHandler struct { channels []models.Channelable channelsErr error - messageIDs map[string]struct{} - deletedMsgIDs map[string]struct{} + messageIDs []string + deletedMsgIDs []string messagesErr error messages map[string]models.ChatMessageable info map[string]*details.GroupsInfo @@ -53,10 +53,11 @@ func (bh mockBackupHandler) getChannels(context.Context) ([]models.Channelable, return bh.channels, bh.channelsErr } -func (bh mockBackupHandler) getChannelMessageIDsDelta( +func (bh mockBackupHandler) getChannelMessageIDs( _ context.Context, _, _ string, -) (map[string]struct{}, map[string]struct{}, api.DeltaUpdate, error) { + _ bool, +) ([]string, []string, api.DeltaUpdate, error) { return bh.messageIDs, bh.deletedMsgIDs, api.DeltaUpdate{}, bh.messagesErr } @@ -131,7 +132,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() { name: "happy path, one container", mock: mockBackupHandler{ channels: testdata.StubChannels("one"), - messageIDs: map[string]struct{}{"msg-one": {}}, + messageIDs: []string{"msg-one"}, }, expectErr: require.NoError, expectColls: 2, @@ -142,7 +143,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() { name: "happy path, one container, only deleted messages", mock: mockBackupHandler{ channels: testdata.StubChannels("one"), - deletedMsgIDs: map[string]struct{}{"msg-one": {}}, + deletedMsgIDs: []string{"msg-one"}, }, expectErr: require.NoError, expectColls: 2, @@ -153,7 +154,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() { name: "happy path, many containers", mock: mockBackupHandler{ channels: testdata.StubChannels("one", "two"), - messageIDs: map[string]struct{}{"msg-one": {}}, + messageIDs: []string{"msg-one"}, }, expectErr: require.NoError, expectColls: 3, @@ -291,7 +292,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections_incremental() { name: "non incremental", mock: mockBackupHandler{ channels: testdata.StubChannels("chan"), - messageIDs: map[string]struct{}{"msg": {}}, + messageIDs: []string{"msg"}, }, deltaPaths: metadata.DeltaPaths{}, expectErr: require.NoError, @@ -304,7 +305,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections_incremental() { name: "incremental", mock: mockBackupHandler{ channels: testdata.StubChannels("chan"), - deletedMsgIDs: map[string]struct{}{"msg": {}}, + deletedMsgIDs: []string{"msg"}, }, deltaPaths: metadata.DeltaPaths{ "chan": { @@ -356,7 +357,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections_incremental() { name: "incremental new and deleted channel", mock: mockBackupHandler{ channels: testdata.StubChannels("chan2"), - messageIDs: map[string]struct{}{"msg": {}}, + messageIDs: []string{"msg"}, }, deltaPaths: metadata.DeltaPaths{ "chan": { diff --git a/src/internal/m365/collection/groups/channel_handler.go b/src/internal/m365/collection/groups/channel_handler.go index 895f57baa..1512e2dbf 100644 --- a/src/internal/m365/collection/groups/channel_handler.go +++ b/src/internal/m365/collection/groups/channel_handler.go @@ -36,11 +36,12 @@ func (bh channelsBackupHandler) getChannels( return bh.ac.GetChannels(ctx, bh.protectedResource) } -func (bh channelsBackupHandler) getChannelMessageIDsDelta( +func (bh channelsBackupHandler) getChannelMessageIDs( ctx context.Context, channelID, prevDelta string, -) (map[string]struct{}, map[string]struct{}, api.DeltaUpdate, error) { - return bh.ac.GetChannelMessageIDsDelta(ctx, bh.protectedResource, channelID, prevDelta) + canMakeDeltaQueries bool, +) ([]string, []string, api.DeltaUpdate, error) { + return bh.ac.GetChannelMessageIDs(ctx, bh.protectedResource, channelID, prevDelta, canMakeDeltaQueries) } func (bh channelsBackupHandler) includeContainer( diff --git a/src/internal/m365/collection/groups/handlers.go b/src/internal/m365/collection/groups/handlers.go index de00dca66..f46b3a099 100644 --- a/src/internal/m365/collection/groups/handlers.go +++ b/src/internal/m365/collection/groups/handlers.go @@ -20,11 +20,12 @@ type backupHandler interface { ctx context.Context, ) ([]models.Channelable, error) - // gets all message IDs by delta in the channel - getChannelMessageIDsDelta( + // gets all message IDs (by delta, if possible) in the channel + getChannelMessageIDs( ctx context.Context, channelID, prevDelta string, - ) (map[string]struct{}, map[string]struct{}, api.DeltaUpdate, error) + canMakeDeltaQueries bool, + ) ([]string, []string, api.DeltaUpdate, error) // includeContainer evaluates whether the channel is included // in the provided scope. diff --git a/src/pkg/services/m365/api/channels_pager.go b/src/pkg/services/m365/api/channels_pager.go index 1e26c458f..ae6b0ee19 100644 --- a/src/pkg/services/m365/api/channels_pager.go +++ b/src/pkg/services/m365/api/channels_pager.go @@ -3,18 +3,72 @@ package api import ( "context" + "github.com/alcionai/clues" "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/teams" "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/m365/graph" - "github.com/alcionai/corso/src/pkg/logger" ) // --------------------------------------------------------------------------- // channel message pager // --------------------------------------------------------------------------- +var _ Pager[models.ChatMessageable] = &channelMessagePageCtrl{} + +type channelMessagePageCtrl struct { + resourceID, channelID string + gs graph.Servicer + builder *teams.ItemChannelsItemMessagesRequestBuilder + options *teams.ItemChannelsItemMessagesRequestBuilderGetRequestConfiguration +} + +func (p *channelMessagePageCtrl) SetNextLink(nextLink string) { + p.builder = teams.NewItemChannelsItemMessagesRequestBuilder(nextLink, p.gs.Adapter()) +} + +func (p *channelMessagePageCtrl) GetPage( + ctx context.Context, +) (NextLinkValuer[models.ChatMessageable], error) { + resp, err := p.builder.Get(ctx, p.options) + return resp, graph.Stack(ctx, err).OrNil() +} + +func (c Channels) NewChannelMessagePager( + teamID, channelID string, + selectProps ...string, +) *channelMessagePageCtrl { + builder := c.Stable. + Client(). + Teams(). + ByTeamIdString(teamID). + Channels(). + ByChannelIdString(channelID). + Messages() + + options := &teams.ItemChannelsItemMessagesRequestBuilderGetRequestConfiguration{ + QueryParameters: &teams.ItemChannelsItemMessagesRequestBuilderGetQueryParameters{}, + Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)), + } + + if len(selectProps) > 0 { + options.QueryParameters.Select = selectProps + } + + return &channelMessagePageCtrl{ + resourceID: teamID, + channelID: channelID, + builder: builder, + gs: c.Stable, + options: options, + } +} + +// --------------------------------------------------------------------------- +// channel message delta pager +// --------------------------------------------------------------------------- + var _ DeltaPager[models.ChatMessageable] = &channelMessageDeltaPageCtrl{} type channelMessageDeltaPageCtrl struct { @@ -65,7 +119,7 @@ func (c Channels) NewChannelMessageDeltaPager( options := &teams.ItemChannelsItemMessagesDeltaRequestBuilderGetRequestConfiguration{ QueryParameters: &teams.ItemChannelsItemMessagesDeltaRequestBuilderGetQueryParameters{}, - Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)), + Headers: newPreferHeaders(preferPageSize(maxDeltaPageSize)), } if len(selectProps) > 0 { @@ -83,44 +137,20 @@ func (c Channels) NewChannelMessageDeltaPager( // GetChannelMessageIDsDelta fetches a delta of all messages in the channel. // returns two maps: addedItems, deletedItems -func (c Channels) GetChannelMessageIDsDelta( +func (c Channels) GetChannelMessageIDs( ctx context.Context, - teamID, channelID, prevDelta string, -) (map[string]struct{}, map[string]struct{}, DeltaUpdate, error) { - var ( - added = map[string]struct{}{} - deleted = map[string]struct{}{} - // select is not currently allowed on messages - // this func will still isolate to the ID, however, - // because we need the follow-up get request to gather - // all replies to the message. - // selectProps = idAnd() - pager = c.NewChannelMessageDeltaPager(teamID, channelID, prevDelta) - ) + teamID, channelID, prevDeltaLink string, + canMakeDeltaQueries bool, +) ([]string, []string, DeltaUpdate, error) { + added, removed, du, err := getAddedAndRemovedItemIDs( + ctx, + c.NewChannelMessagePager(teamID, channelID), + c.NewChannelMessageDeltaPager(teamID, channelID, prevDeltaLink), + prevDeltaLink, + canMakeDeltaQueries, + addedAndRemovedByDeletedDateTime) - results, du, err := deltaEnumerateItems[models.ChatMessageable](ctx, pager, prevDelta) - if graph.IsErrInvalidDelta(err) { - logger.Ctx(ctx).Infow("delta token not supported", "delta_link", prevDelta) - - added = map[string]struct{}{} - deleted = map[string]struct{}{} - - return added, deleted, du, nil - } - - if err != nil { - return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "extracting channel messages from response") - } - - for _, r := range results { - if r.GetAdditionalData()[graph.AddtlDataRemoved] == nil { - added[ptr.Val(r.GetId())] = struct{}{} - } else { - deleted[ptr.Val(r.GetId())] = struct{}{} - } - } - - return added, deleted, du, nil + return added, removed, du, clues.Stack(err).OrNil() } // --------------------------------------------------------------------------- diff --git a/src/pkg/services/m365/api/channels_pager_test.go b/src/pkg/services/m365/api/channels_pager_test.go index b0bbc0bf5..488ac8e0d 100644 --- a/src/pkg/services/m365/api/channels_pager_test.go +++ b/src/pkg/services/m365/api/channels_pager_test.go @@ -56,28 +56,30 @@ func (suite *ChannelsPagerIntgSuite) TestEnumerateChannelMessages() { ctx, flush := tester.NewContext(t) defer flush() - addedIDs, _, du, err := ac.GetChannelMessageIDsDelta( + addedIDs, _, du, err := ac.GetChannelMessageIDs( ctx, suite.its.group.id, suite.its.group.testContainerID, - "") + "", + true) require.NoError(t, err, clues.ToCore(err)) require.NotEmpty(t, addedIDs) require.NotZero(t, du.URL, "delta link") require.True(t, du.Reset, "reset due to empty prev delta link") - addedIDs, deletedIDs, du, err := ac.GetChannelMessageIDsDelta( + addedIDs, deletedIDs, du, err := ac.GetChannelMessageIDs( ctx, suite.its.group.id, suite.its.group.testContainerID, - du.URL) + du.URL, + true) require.NoError(t, err, clues.ToCore(err)) require.Empty(t, addedIDs, "should have no new messages from delta") require.Empty(t, deletedIDs, "should have no deleted messages from delta") require.NotZero(t, du.URL, "delta link") require.False(t, du.Reset, "prev delta link should be valid") - for id := range addedIDs { + for _, id := range addedIDs { suite.Run(id+"-replies", func() { testEnumerateChannelMessageReplies( suite.T(), diff --git a/src/pkg/services/m365/api/contacts_pager.go b/src/pkg/services/m365/api/contacts_pager.go index 33b83db6e..711cb6948 100644 --- a/src/pkg/services/m365/api/contacts_pager.go +++ b/src/pkg/services/m365/api/contacts_pager.go @@ -278,5 +278,6 @@ func (c Contacts) GetAddedAndRemovedItemIDs( pager, deltaPager, prevDeltaLink, - canMakeDeltaQueries) + canMakeDeltaQueries, + addedAndRemovedByAddtlData) } diff --git a/src/pkg/services/m365/api/events_pager.go b/src/pkg/services/m365/api/events_pager.go index 542479add..4a2f1adaf 100644 --- a/src/pkg/services/m365/api/events_pager.go +++ b/src/pkg/services/m365/api/events_pager.go @@ -277,5 +277,6 @@ func (c Events) GetAddedAndRemovedItemIDs( pager, deltaPager, prevDeltaLink, - canMakeDeltaQueries) + canMakeDeltaQueries, + addedAndRemovedByAddtlData) } diff --git a/src/pkg/services/m365/api/item_pager.go b/src/pkg/services/m365/api/item_pager.go index 4182b3c99..1f0ac17b2 100644 --- a/src/pkg/services/m365/api/item_pager.go +++ b/src/pkg/services/m365/api/item_pager.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/alcionai/clues" @@ -172,12 +173,15 @@ func deltaEnumerateItems[T any]( // shared enumeration runner funcs // --------------------------------------------------------------------------- +type addedAndRemovedHandler[T any] func(items []T) ([]string, []string, error) + func getAddedAndRemovedItemIDs[T any]( ctx context.Context, pager Pager[T], deltaPager DeltaPager[T], prevDeltaLink string, canMakeDeltaQueries bool, + aarh addedAndRemovedHandler[T], ) ([]string, []string, DeltaUpdate, error) { if canMakeDeltaQueries { ts, du, err := deltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink) @@ -186,7 +190,7 @@ func getAddedAndRemovedItemIDs[T any]( } if err == nil { - a, r, err := addedAndRemovedByAddtlData(ts) + a, r, err := aarh(ts) return a, r, du, graph.Stack(ctx, err).OrNil() } } @@ -198,13 +202,19 @@ func getAddedAndRemovedItemIDs[T any]( return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err) } - a, r, err := addedAndRemovedByAddtlData[T](ts) + a, r, err := aarh(ts) return a, r, du, graph.Stack(ctx, err).OrNil() } -type getIDAndAddtler interface { +type getIDer interface { GetId() *string +} + +// for added and removed by additionalData[@removed] + +type getIDAndAddtler interface { + getIDer GetAdditionalData() map[string]any } @@ -230,3 +240,30 @@ func addedAndRemovedByAddtlData[T any](items []T) ([]string, []string, error) { return added, removed, nil } + +// for added and removed by GetDeletedDateTime() + +type getIDAndDeletedDateTimer interface { + getIDer + GetDeletedDateTime() *time.Time +} + +func addedAndRemovedByDeletedDateTime[T any](items []T) ([]string, []string, error) { + added, removed := []string{}, []string{} + + for _, item := range items { + giaddt, ok := any(item).(getIDAndDeletedDateTimer) + if !ok { + return nil, nil, clues.New("item does not provide id and deleted date time getters"). + With("item_type", fmt.Sprintf("%T", item)) + } + + if giaddt.GetDeletedDateTime() == nil { + added = append(added, ptr.Val(giaddt.GetId())) + } else { + removed = append(removed, ptr.Val(giaddt.GetId())) + } + } + + return added, removed, nil +} diff --git a/src/pkg/services/m365/api/item_pager_test.go b/src/pkg/services/m365/api/item_pager_test.go index f7f053c33..e072e1b85 100644 --- a/src/pkg/services/m365/api/item_pager_test.go +++ b/src/pkg/services/m365/api/item_pager_test.go @@ -363,7 +363,8 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { test.pagerGetter(t), test.deltaPagerGetter(t), test.prevDelta, - test.canDelta) + test.canDelta, + addedAndRemovedByAddtlData) require.NoErrorf(t, err, "getting added and removed item IDs: %+v", clues.ToCore(err)) require.EqualValues(t, test.expect.added, added, "added item IDs") diff --git a/src/pkg/services/m365/api/mail_pager.go b/src/pkg/services/m365/api/mail_pager.go index c15e89dd1..1b4263b88 100644 --- a/src/pkg/services/m365/api/mail_pager.go +++ b/src/pkg/services/m365/api/mail_pager.go @@ -308,5 +308,6 @@ func (c Mail) GetAddedAndRemovedItemIDs( pager, deltaPager, prevDeltaLink, - canMakeDeltaQueries) + canMakeDeltaQueries, + addedAndRemovedByAddtlData) }