add non-delta message paging path (#4229)

similar to exchange, there are conditions under
which channel messages are unable to use the
delta api.  In particular, when the channel has no valid email property.  This change ensures that
we enumerate the entire channel under those
conditions, instead of failing out.

Technically, the only situation we know of where
the email property is missing, and thus the channel cannot make delta queries, is when there are
zero messages in the channel already.  However,
there may be unknown cases that we haven't caught, so instead of skipping the channel we're going to
handle non-delta enumeration for future safeguarding.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🐛 Bugfix

#### Issue(s)

* #3989 

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-09-12 19:48:44 -06:00 committed by GitHub
parent 1257570d09
commit e3bde06457
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 161 additions and 67 deletions

View File

@ -80,3 +80,13 @@ func Preview(s string, size int) string {
return ss return ss
} }
func SliceToMap(ss []string) map[string]struct{} {
m := map[string]struct{}{}
for _, s := range ss {
m[s] = struct{}{}
}
return m
}

View File

@ -8,6 +8,7 @@ import (
"github.com/alcionai/corso/src/internal/common/pii" "github.com/alcionai/corso/src/internal/common/pii"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/common/str"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/internal/m365/support" "github.com/alcionai/corso/src/internal/m365/support"
@ -154,12 +155,19 @@ func populateCollections(
ictx = clues.Add(ictx, "previous_path", prevPath) ictx = clues.Add(ictx, "previous_path", prevPath)
added, removed, du, err := bh.getChannelMessageIDsDelta(ctx, cID, prevDelta) // if the channel has no email property, it is unable to process delta tokens
// and will return an error if a delta token is queried.
canMakeDeltaQueries := len(ptr.Val(c.GetEmail())) > 0
add, rem, du, err := bh.getChannelMessageIDs(ctx, cID, prevDelta, canMakeDeltaQueries)
if err != nil { if err != nil {
el.AddRecoverable(ctx, clues.Stack(err)) el.AddRecoverable(ctx, clues.Stack(err))
continue continue
} }
added := str.SliceToMap(add)
removed := str.SliceToMap(rem)
if len(du.URL) > 0 { if len(du.URL) > 0 {
deltaURLs[cID] = du.URL deltaURLs[cID] = du.URL
} else if !du.Reset { } else if !du.Reset {

View File

@ -40,8 +40,8 @@ var _ backupHandler = &mockBackupHandler{}
type mockBackupHandler struct { type mockBackupHandler struct {
channels []models.Channelable channels []models.Channelable
channelsErr error channelsErr error
messageIDs map[string]struct{} messageIDs []string
deletedMsgIDs map[string]struct{} deletedMsgIDs []string
messagesErr error messagesErr error
messages map[string]models.ChatMessageable messages map[string]models.ChatMessageable
info map[string]*details.GroupsInfo info map[string]*details.GroupsInfo
@ -53,10 +53,11 @@ func (bh mockBackupHandler) getChannels(context.Context) ([]models.Channelable,
return bh.channels, bh.channelsErr return bh.channels, bh.channelsErr
} }
func (bh mockBackupHandler) getChannelMessageIDsDelta( func (bh mockBackupHandler) getChannelMessageIDs(
_ context.Context, _ context.Context,
_, _ string, _, _ string,
) (map[string]struct{}, map[string]struct{}, api.DeltaUpdate, error) { _ bool,
) ([]string, []string, api.DeltaUpdate, error) {
return bh.messageIDs, bh.deletedMsgIDs, api.DeltaUpdate{}, bh.messagesErr return bh.messageIDs, bh.deletedMsgIDs, api.DeltaUpdate{}, bh.messagesErr
} }
@ -131,7 +132,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() {
name: "happy path, one container", name: "happy path, one container",
mock: mockBackupHandler{ mock: mockBackupHandler{
channels: testdata.StubChannels("one"), channels: testdata.StubChannels("one"),
messageIDs: map[string]struct{}{"msg-one": {}}, messageIDs: []string{"msg-one"},
}, },
expectErr: require.NoError, expectErr: require.NoError,
expectColls: 2, expectColls: 2,
@ -142,7 +143,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() {
name: "happy path, one container, only deleted messages", name: "happy path, one container, only deleted messages",
mock: mockBackupHandler{ mock: mockBackupHandler{
channels: testdata.StubChannels("one"), channels: testdata.StubChannels("one"),
deletedMsgIDs: map[string]struct{}{"msg-one": {}}, deletedMsgIDs: []string{"msg-one"},
}, },
expectErr: require.NoError, expectErr: require.NoError,
expectColls: 2, expectColls: 2,
@ -153,7 +154,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections() {
name: "happy path, many containers", name: "happy path, many containers",
mock: mockBackupHandler{ mock: mockBackupHandler{
channels: testdata.StubChannels("one", "two"), channels: testdata.StubChannels("one", "two"),
messageIDs: map[string]struct{}{"msg-one": {}}, messageIDs: []string{"msg-one"},
}, },
expectErr: require.NoError, expectErr: require.NoError,
expectColls: 3, expectColls: 3,
@ -291,7 +292,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections_incremental() {
name: "non incremental", name: "non incremental",
mock: mockBackupHandler{ mock: mockBackupHandler{
channels: testdata.StubChannels("chan"), channels: testdata.StubChannels("chan"),
messageIDs: map[string]struct{}{"msg": {}}, messageIDs: []string{"msg"},
}, },
deltaPaths: metadata.DeltaPaths{}, deltaPaths: metadata.DeltaPaths{},
expectErr: require.NoError, expectErr: require.NoError,
@ -304,7 +305,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections_incremental() {
name: "incremental", name: "incremental",
mock: mockBackupHandler{ mock: mockBackupHandler{
channels: testdata.StubChannels("chan"), channels: testdata.StubChannels("chan"),
deletedMsgIDs: map[string]struct{}{"msg": {}}, deletedMsgIDs: []string{"msg"},
}, },
deltaPaths: metadata.DeltaPaths{ deltaPaths: metadata.DeltaPaths{
"chan": { "chan": {
@ -356,7 +357,7 @@ func (suite *BackupUnitSuite) TestPopulateCollections_incremental() {
name: "incremental new and deleted channel", name: "incremental new and deleted channel",
mock: mockBackupHandler{ mock: mockBackupHandler{
channels: testdata.StubChannels("chan2"), channels: testdata.StubChannels("chan2"),
messageIDs: map[string]struct{}{"msg": {}}, messageIDs: []string{"msg"},
}, },
deltaPaths: metadata.DeltaPaths{ deltaPaths: metadata.DeltaPaths{
"chan": { "chan": {

View File

@ -36,11 +36,12 @@ func (bh channelsBackupHandler) getChannels(
return bh.ac.GetChannels(ctx, bh.protectedResource) return bh.ac.GetChannels(ctx, bh.protectedResource)
} }
func (bh channelsBackupHandler) getChannelMessageIDsDelta( func (bh channelsBackupHandler) getChannelMessageIDs(
ctx context.Context, ctx context.Context,
channelID, prevDelta string, channelID, prevDelta string,
) (map[string]struct{}, map[string]struct{}, api.DeltaUpdate, error) { canMakeDeltaQueries bool,
return bh.ac.GetChannelMessageIDsDelta(ctx, bh.protectedResource, channelID, prevDelta) ) ([]string, []string, api.DeltaUpdate, error) {
return bh.ac.GetChannelMessageIDs(ctx, bh.protectedResource, channelID, prevDelta, canMakeDeltaQueries)
} }
func (bh channelsBackupHandler) includeContainer( func (bh channelsBackupHandler) includeContainer(

View File

@ -20,11 +20,12 @@ type backupHandler interface {
ctx context.Context, ctx context.Context,
) ([]models.Channelable, error) ) ([]models.Channelable, error)
// gets all message IDs by delta in the channel // gets all message IDs (by delta, if possible) in the channel
getChannelMessageIDsDelta( getChannelMessageIDs(
ctx context.Context, ctx context.Context,
channelID, prevDelta string, channelID, prevDelta string,
) (map[string]struct{}, map[string]struct{}, api.DeltaUpdate, error) canMakeDeltaQueries bool,
) ([]string, []string, api.DeltaUpdate, error)
// includeContainer evaluates whether the channel is included // includeContainer evaluates whether the channel is included
// in the provided scope. // in the provided scope.

View File

@ -3,18 +3,72 @@ package api
import ( import (
"context" "context"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/microsoftgraph/msgraph-sdk-go/teams" "github.com/microsoftgraph/msgraph-sdk-go/teams"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/m365/graph" "github.com/alcionai/corso/src/internal/m365/graph"
"github.com/alcionai/corso/src/pkg/logger"
) )
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// channel message pager // channel message pager
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
var _ Pager[models.ChatMessageable] = &channelMessagePageCtrl{}
type channelMessagePageCtrl struct {
resourceID, channelID string
gs graph.Servicer
builder *teams.ItemChannelsItemMessagesRequestBuilder
options *teams.ItemChannelsItemMessagesRequestBuilderGetRequestConfiguration
}
func (p *channelMessagePageCtrl) SetNextLink(nextLink string) {
p.builder = teams.NewItemChannelsItemMessagesRequestBuilder(nextLink, p.gs.Adapter())
}
func (p *channelMessagePageCtrl) GetPage(
ctx context.Context,
) (NextLinkValuer[models.ChatMessageable], error) {
resp, err := p.builder.Get(ctx, p.options)
return resp, graph.Stack(ctx, err).OrNil()
}
func (c Channels) NewChannelMessagePager(
teamID, channelID string,
selectProps ...string,
) *channelMessagePageCtrl {
builder := c.Stable.
Client().
Teams().
ByTeamIdString(teamID).
Channels().
ByChannelIdString(channelID).
Messages()
options := &teams.ItemChannelsItemMessagesRequestBuilderGetRequestConfiguration{
QueryParameters: &teams.ItemChannelsItemMessagesRequestBuilderGetQueryParameters{},
Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)),
}
if len(selectProps) > 0 {
options.QueryParameters.Select = selectProps
}
return &channelMessagePageCtrl{
resourceID: teamID,
channelID: channelID,
builder: builder,
gs: c.Stable,
options: options,
}
}
// ---------------------------------------------------------------------------
// channel message delta pager
// ---------------------------------------------------------------------------
var _ DeltaPager[models.ChatMessageable] = &channelMessageDeltaPageCtrl{} var _ DeltaPager[models.ChatMessageable] = &channelMessageDeltaPageCtrl{}
type channelMessageDeltaPageCtrl struct { type channelMessageDeltaPageCtrl struct {
@ -65,7 +119,7 @@ func (c Channels) NewChannelMessageDeltaPager(
options := &teams.ItemChannelsItemMessagesDeltaRequestBuilderGetRequestConfiguration{ options := &teams.ItemChannelsItemMessagesDeltaRequestBuilderGetRequestConfiguration{
QueryParameters: &teams.ItemChannelsItemMessagesDeltaRequestBuilderGetQueryParameters{}, QueryParameters: &teams.ItemChannelsItemMessagesDeltaRequestBuilderGetQueryParameters{},
Headers: newPreferHeaders(preferPageSize(maxNonDeltaPageSize)), Headers: newPreferHeaders(preferPageSize(maxDeltaPageSize)),
} }
if len(selectProps) > 0 { if len(selectProps) > 0 {
@ -83,44 +137,20 @@ func (c Channels) NewChannelMessageDeltaPager(
// GetChannelMessageIDsDelta fetches a delta of all messages in the channel. // GetChannelMessageIDsDelta fetches a delta of all messages in the channel.
// returns two maps: addedItems, deletedItems // returns two maps: addedItems, deletedItems
func (c Channels) GetChannelMessageIDsDelta( func (c Channels) GetChannelMessageIDs(
ctx context.Context, ctx context.Context,
teamID, channelID, prevDelta string, teamID, channelID, prevDeltaLink string,
) (map[string]struct{}, map[string]struct{}, DeltaUpdate, error) { canMakeDeltaQueries bool,
var ( ) ([]string, []string, DeltaUpdate, error) {
added = map[string]struct{}{} added, removed, du, err := getAddedAndRemovedItemIDs(
deleted = map[string]struct{}{} ctx,
// select is not currently allowed on messages c.NewChannelMessagePager(teamID, channelID),
// this func will still isolate to the ID, however, c.NewChannelMessageDeltaPager(teamID, channelID, prevDeltaLink),
// because we need the follow-up get request to gather prevDeltaLink,
// all replies to the message. canMakeDeltaQueries,
// selectProps = idAnd() addedAndRemovedByDeletedDateTime)
pager = c.NewChannelMessageDeltaPager(teamID, channelID, prevDelta)
)
results, du, err := deltaEnumerateItems[models.ChatMessageable](ctx, pager, prevDelta) return added, removed, du, clues.Stack(err).OrNil()
if graph.IsErrInvalidDelta(err) {
logger.Ctx(ctx).Infow("delta token not supported", "delta_link", prevDelta)
added = map[string]struct{}{}
deleted = map[string]struct{}{}
return added, deleted, du, nil
}
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "extracting channel messages from response")
}
for _, r := range results {
if r.GetAdditionalData()[graph.AddtlDataRemoved] == nil {
added[ptr.Val(r.GetId())] = struct{}{}
} else {
deleted[ptr.Val(r.GetId())] = struct{}{}
}
}
return added, deleted, du, nil
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -56,28 +56,30 @@ func (suite *ChannelsPagerIntgSuite) TestEnumerateChannelMessages() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
addedIDs, _, du, err := ac.GetChannelMessageIDsDelta( addedIDs, _, du, err := ac.GetChannelMessageIDs(
ctx, ctx,
suite.its.group.id, suite.its.group.id,
suite.its.group.testContainerID, suite.its.group.testContainerID,
"") "",
true)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, addedIDs) require.NotEmpty(t, addedIDs)
require.NotZero(t, du.URL, "delta link") require.NotZero(t, du.URL, "delta link")
require.True(t, du.Reset, "reset due to empty prev delta link") require.True(t, du.Reset, "reset due to empty prev delta link")
addedIDs, deletedIDs, du, err := ac.GetChannelMessageIDsDelta( addedIDs, deletedIDs, du, err := ac.GetChannelMessageIDs(
ctx, ctx,
suite.its.group.id, suite.its.group.id,
suite.its.group.testContainerID, suite.its.group.testContainerID,
du.URL) du.URL,
true)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
require.Empty(t, addedIDs, "should have no new messages from delta") require.Empty(t, addedIDs, "should have no new messages from delta")
require.Empty(t, deletedIDs, "should have no deleted messages from delta") require.Empty(t, deletedIDs, "should have no deleted messages from delta")
require.NotZero(t, du.URL, "delta link") require.NotZero(t, du.URL, "delta link")
require.False(t, du.Reset, "prev delta link should be valid") require.False(t, du.Reset, "prev delta link should be valid")
for id := range addedIDs { for _, id := range addedIDs {
suite.Run(id+"-replies", func() { suite.Run(id+"-replies", func() {
testEnumerateChannelMessageReplies( testEnumerateChannelMessageReplies(
suite.T(), suite.T(),

View File

@ -278,5 +278,6 @@ func (c Contacts) GetAddedAndRemovedItemIDs(
pager, pager,
deltaPager, deltaPager,
prevDeltaLink, prevDeltaLink,
canMakeDeltaQueries) canMakeDeltaQueries,
addedAndRemovedByAddtlData)
} }

View File

@ -277,5 +277,6 @@ func (c Events) GetAddedAndRemovedItemIDs(
pager, pager,
deltaPager, deltaPager,
prevDeltaLink, prevDeltaLink,
canMakeDeltaQueries) canMakeDeltaQueries,
addedAndRemovedByAddtlData)
} }

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"strings" "strings"
"time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
@ -172,12 +173,15 @@ func deltaEnumerateItems[T any](
// shared enumeration runner funcs // shared enumeration runner funcs
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
type addedAndRemovedHandler[T any] func(items []T) ([]string, []string, error)
func getAddedAndRemovedItemIDs[T any]( func getAddedAndRemovedItemIDs[T any](
ctx context.Context, ctx context.Context,
pager Pager[T], pager Pager[T],
deltaPager DeltaPager[T], deltaPager DeltaPager[T],
prevDeltaLink string, prevDeltaLink string,
canMakeDeltaQueries bool, canMakeDeltaQueries bool,
aarh addedAndRemovedHandler[T],
) ([]string, []string, DeltaUpdate, error) { ) ([]string, []string, DeltaUpdate, error) {
if canMakeDeltaQueries { if canMakeDeltaQueries {
ts, du, err := deltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink) ts, du, err := deltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink)
@ -186,7 +190,7 @@ func getAddedAndRemovedItemIDs[T any](
} }
if err == nil { if err == nil {
a, r, err := addedAndRemovedByAddtlData(ts) a, r, err := aarh(ts)
return a, r, du, graph.Stack(ctx, err).OrNil() return a, r, du, graph.Stack(ctx, err).OrNil()
} }
} }
@ -198,13 +202,19 @@ func getAddedAndRemovedItemIDs[T any](
return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err) return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err)
} }
a, r, err := addedAndRemovedByAddtlData[T](ts) a, r, err := aarh(ts)
return a, r, du, graph.Stack(ctx, err).OrNil() return a, r, du, graph.Stack(ctx, err).OrNil()
} }
type getIDAndAddtler interface { type getIDer interface {
GetId() *string GetId() *string
}
// for added and removed by additionalData[@removed]
type getIDAndAddtler interface {
getIDer
GetAdditionalData() map[string]any GetAdditionalData() map[string]any
} }
@ -230,3 +240,30 @@ func addedAndRemovedByAddtlData[T any](items []T) ([]string, []string, error) {
return added, removed, nil return added, removed, nil
} }
// for added and removed by GetDeletedDateTime()
type getIDAndDeletedDateTimer interface {
getIDer
GetDeletedDateTime() *time.Time
}
func addedAndRemovedByDeletedDateTime[T any](items []T) ([]string, []string, error) {
added, removed := []string{}, []string{}
for _, item := range items {
giaddt, ok := any(item).(getIDAndDeletedDateTimer)
if !ok {
return nil, nil, clues.New("item does not provide id and deleted date time getters").
With("item_type", fmt.Sprintf("%T", item))
}
if giaddt.GetDeletedDateTime() == nil {
added = append(added, ptr.Val(giaddt.GetId()))
} else {
removed = append(removed, ptr.Val(giaddt.GetId()))
}
}
return added, removed, nil
}

View File

@ -363,7 +363,8 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
test.pagerGetter(t), test.pagerGetter(t),
test.deltaPagerGetter(t), test.deltaPagerGetter(t),
test.prevDelta, test.prevDelta,
test.canDelta) test.canDelta,
addedAndRemovedByAddtlData)
require.NoErrorf(t, err, "getting added and removed item IDs: %+v", clues.ToCore(err)) require.NoErrorf(t, err, "getting added and removed item IDs: %+v", clues.ToCore(err))
require.EqualValues(t, test.expect.added, added, "added item IDs") require.EqualValues(t, test.expect.added, added, "added item IDs")

View File

@ -308,5 +308,6 @@ func (c Mail) GetAddedAndRemovedItemIDs(
pager, pager,
deltaPager, deltaPager,
prevDeltaLink, prevDeltaLink,
canMakeDeltaQueries) canMakeDeltaQueries,
addedAndRemovedByAddtlData)
} }