Allow limiting or cancelling pager enumeration (#4625)

Add the ability to cancel a pager partway through enumeration. This
will allow us to later request a specific number of items from the
pager. In those cases we should ideally fetch one page beyond what's
required to get the requested number of items (due to the pager
running in another goroutine)

Does not change the result set or add the ability to request a certain
number of items right now

May be easier to review by commit

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [ ]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [ ]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2023-11-10 12:33:27 -08:00 committed by GitHub
parent 52c788eac5
commit c85d364bcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 585 additions and 124 deletions

View File

@ -55,6 +55,8 @@ func (suite *URLCacheIntegrationSuite) SetupSuite() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
graph.InitializeConcurrencyLimiter(ctx, true, 4)
suite.user = tconfig.SecondaryM365UserID(t) suite.user = tconfig.SecondaryM365UserID(t)
acct := tconfig.NewM365Account(t) acct := tconfig.NewM365Account(t)
@ -111,9 +113,14 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
Select: api.URLCacheDriveItemProps(), Select: api.URLCacheDriveItemProps(),
}) })
// normally we'd page through all the pager.NextPage // We need to go through all the pages of results so we don't get stuck. This
// enumerations first. But Results should make sure // is the only way to get a delta token since getting one requires going
// that we don't need to drain lower-level communication first. // through all request results.
//
//revive:disable-next-line:empty-block
for _, _, done := pager.NextPage(); !done; _, _, done = pager.NextPage() {
}
du, err := pager.Results() du, err := pager.Results()
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, du.URL) require.NotEmpty(t, du.URL)

View File

@ -193,6 +193,7 @@ func (c Channels) GetChannelMessageIDs(
c.NewChannelMessageDeltaPager(teamID, channelID, prevDeltaLink), c.NewChannelMessageDeltaPager(teamID, channelID, prevDeltaLink),
prevDeltaLink, prevDeltaLink,
cc.CanMakeDeltaQueries, cc.CanMakeDeltaQueries,
0,
pagers.AddedAndRemovedByDeletedDateTime[models.ChatMessageable], pagers.AddedAndRemovedByDeletedDateTime[models.ChatMessageable],
filterOutSystemMessages) filterOutSystemMessages)

View File

@ -275,5 +275,6 @@ func (c Contacts) GetAddedAndRemovedItemIDs(
deltaPager, deltaPager,
prevDeltaLink, prevDeltaLink,
cc.CanMakeDeltaQueries, cc.CanMakeDeltaQueries,
0,
pagers.AddedAndRemovedByAddtlData[models.Contactable]) pagers.AddedAndRemovedByAddtlData[models.Contactable])
} }

View File

@ -245,6 +245,7 @@ func (c Conversations) GetConversationThreadPostIDs(
nil, nil,
"", "",
canMakeDeltaQueries, canMakeDeltaQueries,
0,
pagers.AddedAndRemovedAddAll[models.Postable], pagers.AddedAndRemovedAddAll[models.Postable],
pagers.FilterIncludeAll[models.Postable]) pagers.FilterIncludeAll[models.Postable])

View File

@ -270,5 +270,6 @@ func (c Events) GetAddedAndRemovedItemIDs(
deltaPager, deltaPager,
prevDeltaLink, prevDeltaLink,
cc.CanMakeDeltaQueries, cc.CanMakeDeltaQueries,
0,
pagers.AddedAndRemovedByAddtlData[models.Eventable]) pagers.AddedAndRemovedByAddtlData[models.Eventable])
} }

View File

@ -272,5 +272,6 @@ func (c Mail) GetAddedAndRemovedItemIDs(
deltaPager, deltaPager,
prevDeltaLink, prevDeltaLink,
cc.CanMakeDeltaQueries, cc.CanMakeDeltaQueries,
0,
pagers.AddedAndRemovedByAddtlData[models.Messageable]) pagers.AddedAndRemovedByAddtlData[models.Messageable])
} }

View File

@ -2,6 +2,7 @@ package pagers
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"strings" "strings"
"time" "time"
@ -18,6 +19,8 @@ import (
// common structs // common structs
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
var errCancelled = clues.New("enumeration cancelled")
// DeltaUpdate holds the results of a current delta token. It normally // DeltaUpdate holds the results of a current delta token. It normally
// gets produced when aggregating the addition and removal of items in // gets produced when aggregating the addition and removal of items in
// a delta-queryable folder. // a delta-queryable folder.
@ -55,6 +58,8 @@ var _ NextPageResulter[any] = &nextPageResults[any]{}
type nextPageResults[T any] struct { type nextPageResults[T any] struct {
pages chan nextPage[T] pages chan nextPage[T]
cancel chan struct{}
done chan struct{}
du DeltaUpdate du DeltaUpdate
err error err error
} }
@ -62,6 +67,8 @@ type nextPageResults[T any] struct {
func NewNextPageResults[T any]() *nextPageResults[T] { func NewNextPageResults[T any]() *nextPageResults[T] {
return &nextPageResults[T]{ return &nextPageResults[T]{
pages: make(chan nextPage[T]), pages: make(chan nextPage[T]),
cancel: make(chan struct{}),
done: make(chan struct{}),
} }
} }
@ -76,7 +83,13 @@ func (npr *nextPageResults[T]) writeNextPage(
select { select {
case <-ctx.Done(): case <-ctx.Done():
return clues.Wrap(context.Canceled, "writing next page") return clues.Wrap(
clues.Stack(ctx.Err(), context.Cause(ctx)),
"writing next page")
case <-npr.cancel:
return clues.Stack(errCancelled)
case npr.pages <- nextPage[T]{ case npr.pages <- nextPage[T]{
items: items, items: items,
reset: reset, reset: reset,
@ -95,15 +108,17 @@ func (npr *nextPageResults[T]) NextPage() ([]T, bool, bool) {
return np.items, np.reset, !ok return np.items, np.reset, !ok
} }
func (npr *nextPageResults[T]) Results() (DeltaUpdate, error) { // Cancel stops the current pager enumeration. Must only be called at most once
if npr.pages != nil { // per pager.
//nolint:revive func (npr *nextPageResults[T]) Cancel() {
for range npr.pages { close(npr.cancel)
// if the pager hasn't closed yet, drain out the pages iterator }
// to avoid leaking routines, and to ensure we get results.
}
}
// Results returns the final status of the pager. This is a blocking call. To
// avoid deadlocking, either call Cancel before this or iterate through all
// pager results using NextPage.
func (npr *nextPageResults[T]) Results() (DeltaUpdate, error) {
<-npr.done
return npr.du, npr.err return npr.du, npr.err
} }
@ -111,6 +126,10 @@ func (npr *nextPageResults[T]) close() {
if npr.pages != nil { if npr.pages != nil {
close(npr.pages) close(npr.pages)
} }
if npr.done != nil {
close(npr.done)
}
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -211,7 +230,10 @@ func EnumerateItems[T any](
pageCount++ pageCount++
if err := npr.writeNextPage(ctx, pageResults, false); err != nil { if err := npr.writeNextPage(ctx, pageResults, false); err != nil {
if !errors.Is(err, errCancelled) {
npr.err = clues.Stack(err) npr.err = clues.Stack(err)
}
return return
} }
@ -231,13 +253,11 @@ func BatchEnumerateItems[T any](
pager NonDeltaHandler[T], pager NonDeltaHandler[T],
) ([]T, error) { ) ([]T, error) {
var ( var (
npr = nextPageResults[T]{ npr = NewNextPageResults[T]()
pages: make(chan nextPage[T]),
}
items = []T{} items = []T{}
) )
go EnumerateItems[T](ctx, pager, &npr) go EnumerateItems[T](ctx, pager, npr)
for page, _, done := npr.NextPage(); !done; page, _, done = npr.NextPage() { for page, _, done := npr.NextPage(); !done; page, _, done = npr.NextPage() {
items = append(items, page...) items = append(items, page...)
@ -280,6 +300,15 @@ func DeltaEnumerateItems[T any](
consume = graph.SingleGetOrDeltaLC consume = graph.SingleGetOrDeltaLC
) )
// Ensure we always populate info about the delta token even if we exit before
// going through all pages of results.
defer func() {
npr.du = DeltaUpdate{
URL: newDeltaLink,
Reset: invalidPrevDelta,
}
}()
if invalidPrevDelta { if invalidPrevDelta {
// Delta queries with no previous token cost more. // Delta queries with no previous token cost more.
consume = graph.DeltaNoTokenLC consume = graph.DeltaNoTokenLC
@ -294,7 +323,10 @@ func DeltaEnumerateItems[T any](
pager.Reset(ctx) pager.Reset(ctx)
if err := npr.writeNextPage(ctx, nil, true); err != nil { if err := npr.writeNextPage(ctx, nil, true); err != nil {
if !errors.Is(err, errCancelled) {
npr.err = clues.Stack(err) npr.err = clues.Stack(err)
}
return return
} }
@ -318,7 +350,10 @@ func DeltaEnumerateItems[T any](
itemCount = 0 itemCount = 0
if err := npr.writeNextPage(ctx, nil, true); err != nil { if err := npr.writeNextPage(ctx, nil, true); err != nil {
if !errors.Is(err, errCancelled) {
npr.err = clues.Stack(err) npr.err = clues.Stack(err)
}
return return
} }
@ -336,7 +371,10 @@ func DeltaEnumerateItems[T any](
pageCount++ pageCount++
if err := npr.writeNextPage(ctx, pageResults, false); err != nil { if err := npr.writeNextPage(ctx, pageResults, false); err != nil {
if !errors.Is(err, errCancelled) {
npr.err = clues.Stack(err) npr.err = clues.Stack(err)
}
return return
} }
@ -353,38 +391,6 @@ func DeltaEnumerateItems[T any](
"completed delta item enumeration", "completed delta item enumeration",
"item_count", itemCount, "item_count", itemCount,
"page_count", pageCount) "page_count", pageCount)
npr.du = DeltaUpdate{
URL: newDeltaLink,
Reset: invalidPrevDelta,
}
}
func batchDeltaEnumerateItems[T any](
ctx context.Context,
pager DeltaHandler[T],
prevDeltaLink string,
) ([]T, DeltaUpdate, error) {
var (
npr = nextPageResults[T]{
pages: make(chan nextPage[T]),
}
results = []T{}
)
go DeltaEnumerateItems[T](ctx, pager, &npr, prevDeltaLink)
for page, reset, done := npr.NextPage(); !done; page, reset, done = npr.NextPage() {
if reset {
results = []T{}
}
results = append(results, page...)
}
du, err := npr.Results()
return results, du, clues.Stack(err).OrNil()
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -415,45 +421,138 @@ type addedAndRemovedHandler[T any] func(
error, error,
) )
// batchWithMaxItemCount attempts to get itemLimit added items from the
// underlying pager. If itemLimit is 0 then gets all items from the pager. The
// item count is the actual number of added items, not just the number of added
// items seen. This means that duplicate items and items that are filtered out
// don't count towards the limit.
//
// Also respects context cancellations.
func batchWithMaxItemCount[T any](
ctx context.Context,
resultsPager *nextPageResults[T],
itemLimit int,
getAddedAndRemoved addedAndRemovedHandler[T],
filters ...func(T) bool,
) (map[string]time.Time, []string, DeltaUpdate, error) {
var (
done bool
reset bool
page []T
removed []string
added = map[string]time.Time{}
)
// Can't use a for-loop variable declaration because the line ends up too
// long.
for !done && (itemLimit <= 0 || len(added) < itemLimit) {
// If the context was cancelled then exit early. This will keep us from
// accidentally reading from the pager more since it could pick to either
// send another page or see the context cancellation. We don't need to
// cancel the pager because it should see the context cancellation once we
// stop attempting to fetch the next page.
if ctx.Err() != nil {
return nil, nil, DeltaUpdate{}, clues.Stack(ctx.Err(), context.Cause(ctx)).
WithClues(ctx)
}
// Get the next page first thing in the loop instead of last thing so we
// don't fetch an extra page we then discard when we've reached the item
// limit. That wouldn't affect correctness but would consume more tokens in
// our rate limiter.
page, reset, done = resultsPager.NextPage()
if reset {
added = map[string]time.Time{}
removed = nil
}
pageAdded, pageRemoved, err := getAddedAndRemoved(page, filters...)
if err != nil {
resultsPager.Cancel()
return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err)
}
removed = append(removed, pageRemoved...)
for k, v := range pageAdded {
added[k] = v
if itemLimit > 0 && len(added) >= itemLimit {
break
}
}
}
// Cancel the pager so we don't fetch the rest of the data it may have.
resultsPager.Cancel()
du, err := resultsPager.Results()
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err)
}
// We processed all the results from the pager.
return added, removed, du, nil
}
// GetAddedAndRemovedItemIDs returns the set of item IDs that were added or
// removed from this resource since the last delta query. If no delta query is
// passed in then returns all items found, not just changed/removed ones.
//
// Use itemLimit to stop enumeration partway through and filters to pick which
// items should be returned.
func GetAddedAndRemovedItemIDs[T any]( func GetAddedAndRemovedItemIDs[T any](
ctx context.Context, ctx context.Context,
pager NonDeltaHandler[T], pager NonDeltaHandler[T],
deltaPager DeltaHandler[T], deltaPager DeltaHandler[T],
prevDeltaLink string, prevDeltaLink string,
canMakeDeltaQueries bool, canMakeDeltaQueries bool,
itemLimit int,
aarh addedAndRemovedHandler[T], aarh addedAndRemovedHandler[T],
filters ...func(T) bool, filters ...func(T) bool,
) (AddedAndRemoved, error) { ) (AddedAndRemoved, error) {
if canMakeDeltaQueries { if canMakeDeltaQueries {
ts, du, err := batchDeltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink) npr := NewNextPageResults[T]()
if err != nil && !graph.IsErrInvalidDelta(err) && !graph.IsErrDeltaNotSupported(err) {
return AddedAndRemoved{}, graph.Stack(ctx, err)
}
if err == nil { go DeltaEnumerateItems[T](ctx, deltaPager, npr, prevDeltaLink)
a, r, err := aarh(ts, filters...)
added, removed, du, err := batchWithMaxItemCount(
ctx,
npr,
itemLimit,
aarh,
filters...)
aar := AddedAndRemoved{ aar := AddedAndRemoved{
Added: a, Added: added,
Removed: r, Removed: removed,
DU: du, DU: du,
ValidModTimes: deltaPager.ValidModTimes(), ValidModTimes: deltaPager.ValidModTimes(),
} }
if err != nil && !graph.IsErrInvalidDelta(err) && !graph.IsErrDeltaNotSupported(err) {
return AddedAndRemoved{}, graph.Stack(ctx, err)
} else if err == nil {
return aar, graph.Stack(ctx, err).OrNil() return aar, graph.Stack(ctx, err).OrNil()
} }
} }
du := DeltaUpdate{Reset: true} du := DeltaUpdate{Reset: true}
npr := NewNextPageResults[T]()
ts, err := BatchEnumerateItems(ctx, pager) go EnumerateItems[T](ctx, pager, npr)
if err != nil {
return AddedAndRemoved{}, graph.Stack(ctx, err) added, removed, _, err := batchWithMaxItemCount(
} ctx,
npr,
itemLimit,
aarh,
filters...)
a, r, err := aarh(ts, filters...)
aar := AddedAndRemoved{ aar := AddedAndRemoved{
Added: a, Added: added,
Removed: r, Removed: removed,
DU: du, DU: du,
ValidModTimes: pager.ValidModTimes(), ValidModTimes: pager.ValidModTimes(),
} }

View File

@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
@ -326,25 +325,11 @@ func assertSliceEmptyOr[S ~[]E, E any](
assertionFunc(t, expect, got, msgAndArgs) assertionFunc(t, expect, got, msgAndArgs)
} }
func assertMapEmptyOr[M ~map[K]V, K comparable, V any](
t *testing.T,
expect M,
got M,
assertionFunc assert.ComparisonAssertionFunc,
msgAndArgs ...any,
) {
if len(expect) == 0 {
assert.Empty(t, got, msgAndArgs)
return
}
assertionFunc(t, expect, got, msgAndArgs)
}
func assertAddedAndRemoved( func assertAddedAndRemoved(
t *testing.T, t *testing.T,
validModTimes bool, validModTimes bool,
wantAdded []testItem, wantAdded []testItem,
partialAdded []testItem,
gotAdded map[string]time.Time, gotAdded map[string]time.Time,
wantRemoved []testItem, wantRemoved []testItem,
gotRemoved []string, gotRemoved []string,
@ -352,31 +337,41 @@ func assertAddedAndRemoved(
epoch, err := time.Parse(time.DateOnly, "1970-01-01") epoch, err := time.Parse(time.DateOnly, "1970-01-01")
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
expectAdded := map[string]time.Time{} requireAdded := map[string]time.Time{}
for _, item := range wantAdded { for _, item := range wantAdded {
expectAdded[item.id] = item.modTime requireAdded[item.id] = item.modTime
}
maybeAdded := map[string]time.Time{}
for _, item := range partialAdded {
maybeAdded[item.id] = item.modTime
}
for id, mt := range gotAdded {
var (
wantMT time.Time
found bool
)
if wantMT, found = requireAdded[id]; found {
delete(requireAdded, id)
} else if wantMT, found = maybeAdded[id]; found {
delete(maybeAdded, id)
}
if !assert.True(t, found, "unexpected added item with ID %v", id) {
continue
} }
if validModTimes { if validModTimes {
assertMapEmptyOr( assert.Equal(t, wantMT, mt, "mod time for item with ID %v", id)
t,
expectAdded,
gotAdded,
assert.Equal,
"added item IDs and mod times")
} else { } else {
assertSliceEmptyOr( assert.True(t, mt.After(epoch), "mod time after epoch for item with ID %v", id)
t, assert.False(t, mt.IsZero(), "non-zero mod time for item with ID %v", id)
maps.Keys(expectAdded), }
maps.Keys(gotAdded), }
assert.ElementsMatch,
"added item IDs")
for _, modtime := range gotAdded { assert.Empty(t, requireAdded, "required items not added")
assert.True(t, modtime.After(epoch), "mod time after epoch")
assert.False(t, modtime.IsZero(), "non-zero mod time")
}
}
expectRemoved := []string{} expectRemoved := []string{}
for _, item := range wantRemoved { for _, item := range wantRemoved {
@ -399,6 +394,10 @@ type modTimeTest struct {
var ( var (
addedItem1 = addedItem("a_uno", time.Now()) addedItem1 = addedItem("a_uno", time.Now())
addedItem2 = addedItem("a_dos", time.Now()) addedItem2 = addedItem("a_dos", time.Now())
addedItem3 = addedItem("a_tres", time.Now())
addedItem4 = addedItem("a_quatro", time.Now())
addedItem5 = addedItem("a_cinco", time.Now())
addedItem6 = addedItem("a_seis", time.Now())
removedItem1 = removedItem("r_uno") removedItem1 = removedItem("r_uno")
removedItem2 = removedItem("r_dos") removedItem2 = removedItem("r_dos")
@ -427,7 +426,7 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
p *testIDsNonDeltaMultiPager, p *testIDsNonDeltaMultiPager,
) (NonDeltaHandler[testItem], DeltaHandler[testItem]) ) (NonDeltaHandler[testItem], DeltaHandler[testItem])
expectDeltaReset bool expectDeltaReset bool
expectDeltaLink assert.ValueAssertionFunc expectNoDelta bool
}{ }{
{ {
name: "NoPrevDelta", name: "NoPrevDelta",
@ -438,7 +437,6 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
return nil, newDeltaPager(p) return nil, newDeltaPager(p)
}, },
expectDeltaReset: true, expectDeltaReset: true,
expectDeltaLink: assert.NotEmpty,
}, },
{ {
name: "PrevDelta", name: "PrevDelta",
@ -449,7 +447,6 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
) (NonDeltaHandler[testItem], DeltaHandler[testItem]) { ) (NonDeltaHandler[testItem], DeltaHandler[testItem]) {
return nil, newDeltaPager(p) return nil, newDeltaPager(p)
}, },
expectDeltaLink: assert.NotEmpty,
}, },
{ {
name: "DeltaNotAllowed", name: "DeltaNotAllowed",
@ -460,14 +457,26 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
return p, nil return p, nil
}, },
expectDeltaReset: true, expectDeltaReset: true,
expectDeltaLink: assert.Empty, expectNoDelta: true,
}, },
} }
type expected struct { type expected struct {
errCheck assert.ErrorAssertionFunc errCheck assert.ErrorAssertionFunc
added []testItem added []testItem
// finalPageAdded is the set of items on the last page for queries that
// limit results. Some of these items may not be returned, but we don't
// make guarantees about which ones those will be.
finalPageAdded []testItem
// numAdded is the total number of added items that should be returned.
numAdded int
removed []testItem removed []testItem
// maxGetterIdx is the maximum index for the item page getter. Helps ensure
// we're stopping enumeration when we reach the item limit.
maxGetterIdx int
// expectNoDelta should be set if we exit enumeration early due to the item
// limit so we don't get a delta token.
noDelta bool
} }
table := []struct { table := []struct {
@ -478,6 +487,8 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
) *testIDsNonDeltaMultiPager ) *testIDsNonDeltaMultiPager
filter func(a testItem) bool filter func(a testItem) bool
expect expected expect expected
limit int
ctxCancelled bool
}{ }{
{ {
name: "OnePage", name: "OnePage",
@ -503,7 +514,9 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
addedItem1, addedItem1,
addedItem2, addedItem2,
}, },
numAdded: 2,
removed: []testItem{removedItem1, removedItem2}, removed: []testItem{removedItem1, removedItem2},
maxGetterIdx: 1,
}, },
}, },
{ {
@ -534,7 +547,9 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
addedItem1, addedItem1,
addedItem2, addedItem2,
}, },
numAdded: 2,
removed: []testItem{removedItem1, removedItem2}, removed: []testItem{removedItem1, removedItem2},
maxGetterIdx: 2,
}, },
}, },
{ {
@ -558,6 +573,7 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
filter: func(testItem) bool { return false }, filter: func(testItem) bool { return false },
expect: expected{ expect: expected{
errCheck: assert.NoError, errCheck: assert.NoError,
maxGetterIdx: 1,
}, },
}, },
{ {
@ -585,6 +601,7 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
filter: func(testItem) bool { return false }, filter: func(testItem) bool { return false },
expect: expected{ expect: expected{
errCheck: assert.NoError, errCheck: assert.NoError,
maxGetterIdx: 2,
}, },
}, },
{ {
@ -604,6 +621,194 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
errCheck: assert.Error, errCheck: assert.Error,
}, },
}, },
{
name: "FourValidPages OnlyPartOfSecondPage",
pagerGetter: func(
t *testing.T,
validModTimes bool,
) *testIDsNonDeltaMultiPager {
return &testIDsNonDeltaMultiPager{
t: t,
validModTimes: validModTimes,
pages: []pageResult{
{
items: []testItem{
addedItem1,
addedItem2,
},
},
{
items: []testItem{
addedItem3,
addedItem4,
},
},
{
items: []testItem{
addedItem("cinco", time.Now()),
addedItem("seis", time.Now()),
},
},
{
items: []testItem{
addedItem("siete", time.Now()),
addedItem("ocho", time.Now()),
},
},
},
}
},
expect: expected{
errCheck: assert.NoError,
added: []testItem{
addedItem1,
addedItem2,
},
finalPageAdded: []testItem{
addedItem3,
addedItem4,
},
numAdded: 3,
maxGetterIdx: 3,
noDelta: true,
},
limit: 3,
},
{
name: "FourValidPages OnlyPartOfSecondPage SomeItemsFiltered",
pagerGetter: func(
t *testing.T,
validModTimes bool,
) *testIDsNonDeltaMultiPager {
return &testIDsNonDeltaMultiPager{
t: t,
validModTimes: validModTimes,
pages: []pageResult{
{
items: []testItem{
addedItem1,
addedItem2,
},
},
{
items: []testItem{
addedItem3,
addedItem4,
},
},
{
items: []testItem{
addedItem5,
addedItem6,
},
},
{
items: []testItem{
addedItem("siete", time.Now()),
addedItem("ocho", time.Now()),
},
},
},
}
},
filter: func(item testItem) bool {
return item.id != addedItem2.id
},
expect: expected{
errCheck: assert.NoError,
added: []testItem{
addedItem1,
},
finalPageAdded: []testItem{
addedItem3,
addedItem4,
},
numAdded: 2,
maxGetterIdx: 3,
noDelta: true,
},
limit: 2,
},
{
name: "ThreeValidPages OnlyPartOfThirdPage RepeatItem",
pagerGetter: func(
t *testing.T,
validModTimes bool,
) *testIDsNonDeltaMultiPager {
return &testIDsNonDeltaMultiPager{
t: t,
validModTimes: validModTimes,
pages: []pageResult{
{
items: []testItem{
addedItem1,
addedItem2,
removedItem1,
},
},
{
items: []testItem{
addedItem1,
addedItem4,
addedItem3,
removedItem2,
},
},
{
items: []testItem{
addedItem5,
addedItem6,
},
},
},
}
},
expect: expected{
errCheck: assert.NoError,
added: []testItem{
addedItem1,
addedItem2,
addedItem3,
addedItem4,
},
finalPageAdded: []testItem{
addedItem5,
addedItem6,
},
removed: []testItem{
removedItem1,
removedItem2,
},
numAdded: 5,
maxGetterIdx: 3,
},
limit: 5,
},
{
name: "ParentContextCancelled",
pagerGetter: func(
t *testing.T,
validModTimes bool,
) *testIDsNonDeltaMultiPager {
return &testIDsNonDeltaMultiPager{
t: t,
validModTimes: true,
pages: []pageResult{
{
items: []testItem{
addedItem("uno", time.Now()),
addedItem("dos", time.Now()),
removedItem("siete"),
},
},
},
}
},
expect: expected{
errCheck: assert.Error,
},
ctxCancelled: true,
},
} }
for _, modTimeTest := range modTimeTests { for _, modTimeTest := range modTimeTests {
@ -617,6 +822,14 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
ictx, cancel := context.WithCancel(ctx)
if test.ctxCancelled {
cancel()
} else {
defer cancel()
}
filters := []func(testItem) bool{} filters := []func(testItem) bool{}
if test.filter != nil { if test.filter != nil {
filters = append(filters, test.filter) filters = append(filters, test.filter)
@ -626,11 +839,12 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
getter, deltaGetter := pagerTypeTest.pagersFunc(basePager) getter, deltaGetter := pagerTypeTest.pagersFunc(basePager)
addRemoved, err := GetAddedAndRemovedItemIDs[testItem]( addRemoved, err := GetAddedAndRemovedItemIDs[testItem](
ctx, ictx,
getter, getter,
deltaGetter, deltaGetter,
pagerTypeTest.prevDelta, pagerTypeTest.prevDelta,
pagerTypeTest.canUseDelta, pagerTypeTest.canUseDelta,
test.limit,
AddedAndRemovedByAddtlData[testItem], AddedAndRemovedByAddtlData[testItem],
filters...) filters...)
test.expect.errCheck(t, err, "getting added and removed item IDs: %+v", clues.ToCore(err)) test.expect.errCheck(t, err, "getting added and removed item IDs: %+v", clues.ToCore(err))
@ -639,14 +853,23 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
return return
} }
assert.Len(t, addRemoved.Added, test.expect.numAdded, "number of added items")
assert.GreaterOrEqual(t, test.expect.maxGetterIdx, basePager.pageIdx, "number of pager calls")
assert.Equal(t, modTimeTest.validModTimes, addRemoved.ValidModTimes, "valid mod times") assert.Equal(t, modTimeTest.validModTimes, addRemoved.ValidModTimes, "valid mod times")
assert.Equal(t, pagerTypeTest.expectDeltaReset, addRemoved.DU.Reset, "delta update") assert.Equal(t, pagerTypeTest.expectDeltaReset, addRemoved.DU.Reset, "delta reset")
pagerTypeTest.expectDeltaLink(t, addRemoved.DU.URL, "delta link")
if pagerTypeTest.expectNoDelta || test.expect.noDelta {
assert.Empty(t, addRemoved.DU.URL, "delta link")
} else {
assert.NotEmpty(t, addRemoved.DU.URL, "delta link")
}
assertAddedAndRemoved( assertAddedAndRemoved(
t, t,
modTimeTest.validModTimes, modTimeTest.validModTimes,
test.expect.added, test.expect.added,
test.expect.finalPageAdded,
addRemoved.Added, addRemoved.Added,
test.expect.removed, test.expect.removed,
addRemoved.Removed) addRemoved.Removed)
@ -683,6 +906,7 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs_FallbackPagers() {
t *testing.T, t *testing.T,
validModTimes bool, validModTimes bool,
) DeltaHandler[testItem] ) DeltaHandler[testItem]
limit int
expect expected expect expected
}{ }{
{ {
@ -867,6 +1091,130 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs_FallbackPagers() {
deltaLink: assert.Empty, deltaLink: assert.Empty,
}, },
}, },
{
name: "FivePages DeltaReset LimitedItems",
pagerGetter: nilPager,
deltaPagerGetter: func(
t *testing.T,
validModTimes bool,
) DeltaHandler[testItem] {
return newDeltaPager(
&testIDsNonDeltaMultiPager{
t: t,
pages: []pageResult{
{
items: []testItem{
addedItem1,
removedItem1,
},
},
{
errCode: "SyncStateNotFound",
needsReset: true,
},
{
items: []testItem{
removedItem2,
addedItem2,
},
},
{
items: []testItem{
addedItem3,
addedItem4,
},
},
{
items: []testItem{
addedItem5,
addedItem6,
},
},
},
validModTimes: validModTimes,
})
},
limit: 3,
expect: expected{
errCheck: assert.NoError,
added: []testItem{
addedItem2,
addedItem3,
addedItem4,
},
removed: []testItem{
removedItem2,
},
deltaLink: assert.Empty,
},
},
{
name: "FivePages DeltaNoSupported LimitedItems",
pagerGetter: func(
t *testing.T,
validModTimes bool,
) NonDeltaHandler[testItem] {
return &testIDsNonDeltaMultiPager{
t: t,
pages: []pageResult{
{
items: []testItem{
removedItem2,
addedItem2,
},
},
{
items: []testItem{
addedItem3,
addedItem4,
},
},
{
items: []testItem{
addedItem5,
addedItem6,
},
},
},
validModTimes: validModTimes,
}
},
deltaPagerGetter: func(
t *testing.T,
validModTimes bool,
) DeltaHandler[testItem] {
return newDeltaPager(
&testIDsNonDeltaMultiPager{
t: t,
pages: []pageResult{
{
items: []testItem{
addedItem1,
removedItem1,
},
},
{
err: graph.ErrDeltaNotSupported,
needsReset: true,
},
},
validModTimes: validModTimes,
})
},
limit: 3,
expect: expected{
errCheck: assert.NoError,
added: []testItem{
addedItem2,
addedItem3,
addedItem4,
},
removed: []testItem{
removedItem2,
},
deltaLink: assert.Empty,
},
},
} }
for _, modTimeTest := range modTimeTests { for _, modTimeTest := range modTimeTests {
@ -884,6 +1232,7 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs_FallbackPagers() {
test.deltaPagerGetter(t, modTimeTest.validModTimes), test.deltaPagerGetter(t, modTimeTest.validModTimes),
"a", "a",
true, true,
test.limit,
AddedAndRemovedByAddtlData[testItem]) AddedAndRemovedByAddtlData[testItem])
require.NoError( require.NoError(
t, t,
@ -892,13 +1241,14 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs_FallbackPagers() {
clues.ToCore(err)) clues.ToCore(err))
assert.Equal(t, modTimeTest.validModTimes, addRemoved.ValidModTimes, "valid mod times") assert.Equal(t, modTimeTest.validModTimes, addRemoved.ValidModTimes, "valid mod times")
assert.True(t, addRemoved.DU.Reset, "delta update") assert.True(t, addRemoved.DU.Reset, "delta reset")
test.expect.deltaLink(t, addRemoved.DU.URL, "delta link") test.expect.deltaLink(t, addRemoved.DU.URL, "delta link")
assertAddedAndRemoved( assertAddedAndRemoved(
t, t,
modTimeTest.validModTimes, modTimeTest.validModTimes,
test.expect.added, test.expect.added,
nil,
addRemoved.Added, addRemoved.Added,
test.expect.removed, test.expect.removed,
addRemoved.Removed) addRemoved.Removed)