diff --git a/src/internal/m365/collection/drive/url_cache_test.go b/src/internal/m365/collection/drive/url_cache_test.go index 37fb9c5a4..42f565cdb 100644 --- a/src/internal/m365/collection/drive/url_cache_test.go +++ b/src/internal/m365/collection/drive/url_cache_test.go @@ -55,6 +55,8 @@ func (suite *URLCacheIntegrationSuite) SetupSuite() { ctx, flush := tester.NewContext(t) defer flush() + graph.InitializeConcurrencyLimiter(ctx, true, 4) + suite.user = tconfig.SecondaryM365UserID(t) acct := tconfig.NewM365Account(t) @@ -111,9 +113,14 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() { Select: api.URLCacheDriveItemProps(), }) - // normally we'd page through all the pager.NextPage - // enumerations first. But Results should make sure - // that we don't need to drain lower-level communication first. + // We need to go through all the pages of results so we don't get stuck. This + // is the only way to get a delta token since getting one requires going + // through all request results. + // + //revive:disable-next-line:empty-block + for _, _, done := pager.NextPage(); !done; _, _, done = pager.NextPage() { + } + du, err := pager.Results() require.NoError(t, err, clues.ToCore(err)) require.NotEmpty(t, du.URL) diff --git a/src/pkg/services/m365/api/channels_pager.go b/src/pkg/services/m365/api/channels_pager.go index 107651e24..a822ac4da 100644 --- a/src/pkg/services/m365/api/channels_pager.go +++ b/src/pkg/services/m365/api/channels_pager.go @@ -193,6 +193,7 @@ func (c Channels) GetChannelMessageIDs( c.NewChannelMessageDeltaPager(teamID, channelID, prevDeltaLink), prevDeltaLink, cc.CanMakeDeltaQueries, + 0, pagers.AddedAndRemovedByDeletedDateTime[models.ChatMessageable], filterOutSystemMessages) diff --git a/src/pkg/services/m365/api/contacts_pager.go b/src/pkg/services/m365/api/contacts_pager.go index 50dde9117..a369d368f 100644 --- a/src/pkg/services/m365/api/contacts_pager.go +++ b/src/pkg/services/m365/api/contacts_pager.go @@ -275,5 +275,6 @@ func (c Contacts) GetAddedAndRemovedItemIDs( deltaPager, prevDeltaLink, cc.CanMakeDeltaQueries, + 0, pagers.AddedAndRemovedByAddtlData[models.Contactable]) } diff --git a/src/pkg/services/m365/api/conversations_pager.go b/src/pkg/services/m365/api/conversations_pager.go index a946470f8..c892abce2 100644 --- a/src/pkg/services/m365/api/conversations_pager.go +++ b/src/pkg/services/m365/api/conversations_pager.go @@ -245,6 +245,7 @@ func (c Conversations) GetConversationThreadPostIDs( nil, "", canMakeDeltaQueries, + 0, pagers.AddedAndRemovedAddAll[models.Postable], pagers.FilterIncludeAll[models.Postable]) diff --git a/src/pkg/services/m365/api/events_pager.go b/src/pkg/services/m365/api/events_pager.go index 6304042ad..9ce2490ef 100644 --- a/src/pkg/services/m365/api/events_pager.go +++ b/src/pkg/services/m365/api/events_pager.go @@ -270,5 +270,6 @@ func (c Events) GetAddedAndRemovedItemIDs( deltaPager, prevDeltaLink, cc.CanMakeDeltaQueries, + 0, pagers.AddedAndRemovedByAddtlData[models.Eventable]) } diff --git a/src/pkg/services/m365/api/mail_pager.go b/src/pkg/services/m365/api/mail_pager.go index b734c7018..366812c6e 100644 --- a/src/pkg/services/m365/api/mail_pager.go +++ b/src/pkg/services/m365/api/mail_pager.go @@ -272,5 +272,6 @@ func (c Mail) GetAddedAndRemovedItemIDs( deltaPager, prevDeltaLink, cc.CanMakeDeltaQueries, + 0, pagers.AddedAndRemovedByAddtlData[models.Messageable]) } diff --git a/src/pkg/services/m365/api/pagers/pagers.go b/src/pkg/services/m365/api/pagers/pagers.go index decfdf0cb..153506da3 100644 --- a/src/pkg/services/m365/api/pagers/pagers.go +++ b/src/pkg/services/m365/api/pagers/pagers.go @@ -2,6 +2,7 @@ package pagers import ( "context" + "errors" "fmt" "strings" "time" @@ -18,6 +19,8 @@ import ( // common structs // --------------------------------------------------------------------------- +var errCancelled = clues.New("enumeration cancelled") + // DeltaUpdate holds the results of a current delta token. It normally // gets produced when aggregating the addition and removal of items in // a delta-queryable folder. @@ -54,14 +57,18 @@ type NextPageResulter[T any] interface { var _ NextPageResulter[any] = &nextPageResults[any]{} type nextPageResults[T any] struct { - pages chan nextPage[T] - du DeltaUpdate - err error + pages chan nextPage[T] + cancel chan struct{} + done chan struct{} + du DeltaUpdate + err error } func NewNextPageResults[T any]() *nextPageResults[T] { return &nextPageResults[T]{ - pages: make(chan nextPage[T]), + pages: make(chan nextPage[T]), + cancel: make(chan struct{}), + done: make(chan struct{}), } } @@ -76,7 +83,13 @@ func (npr *nextPageResults[T]) writeNextPage( select { case <-ctx.Done(): - return clues.Wrap(context.Canceled, "writing next page") + return clues.Wrap( + clues.Stack(ctx.Err(), context.Cause(ctx)), + "writing next page") + + case <-npr.cancel: + return clues.Stack(errCancelled) + case npr.pages <- nextPage[T]{ items: items, reset: reset, @@ -95,15 +108,17 @@ func (npr *nextPageResults[T]) NextPage() ([]T, bool, bool) { return np.items, np.reset, !ok } -func (npr *nextPageResults[T]) Results() (DeltaUpdate, error) { - if npr.pages != nil { - //nolint:revive - for range npr.pages { - // if the pager hasn't closed yet, drain out the pages iterator - // to avoid leaking routines, and to ensure we get results. - } - } +// Cancel stops the current pager enumeration. Must only be called at most once +// per pager. +func (npr *nextPageResults[T]) Cancel() { + close(npr.cancel) +} +// Results returns the final status of the pager. This is a blocking call. To +// avoid deadlocking, either call Cancel before this or iterate through all +// pager results using NextPage. +func (npr *nextPageResults[T]) Results() (DeltaUpdate, error) { + <-npr.done return npr.du, npr.err } @@ -111,6 +126,10 @@ func (npr *nextPageResults[T]) close() { if npr.pages != nil { close(npr.pages) } + + if npr.done != nil { + close(npr.done) + } } // --------------------------------------------------------------------------- @@ -211,7 +230,10 @@ func EnumerateItems[T any]( pageCount++ if err := npr.writeNextPage(ctx, pageResults, false); err != nil { - npr.err = clues.Stack(err) + if !errors.Is(err, errCancelled) { + npr.err = clues.Stack(err) + } + return } @@ -231,13 +253,11 @@ func BatchEnumerateItems[T any]( pager NonDeltaHandler[T], ) ([]T, error) { var ( - npr = nextPageResults[T]{ - pages: make(chan nextPage[T]), - } + npr = NewNextPageResults[T]() items = []T{} ) - go EnumerateItems[T](ctx, pager, &npr) + go EnumerateItems[T](ctx, pager, npr) for page, _, done := npr.NextPage(); !done; page, _, done = npr.NextPage() { items = append(items, page...) @@ -280,6 +300,15 @@ func DeltaEnumerateItems[T any]( consume = graph.SingleGetOrDeltaLC ) + // Ensure we always populate info about the delta token even if we exit before + // going through all pages of results. + defer func() { + npr.du = DeltaUpdate{ + URL: newDeltaLink, + Reset: invalidPrevDelta, + } + }() + if invalidPrevDelta { // Delta queries with no previous token cost more. consume = graph.DeltaNoTokenLC @@ -294,7 +323,10 @@ func DeltaEnumerateItems[T any]( pager.Reset(ctx) if err := npr.writeNextPage(ctx, nil, true); err != nil { - npr.err = clues.Stack(err) + if !errors.Is(err, errCancelled) { + npr.err = clues.Stack(err) + } + return } @@ -318,7 +350,10 @@ func DeltaEnumerateItems[T any]( itemCount = 0 if err := npr.writeNextPage(ctx, nil, true); err != nil { - npr.err = clues.Stack(err) + if !errors.Is(err, errCancelled) { + npr.err = clues.Stack(err) + } + return } @@ -336,7 +371,10 @@ func DeltaEnumerateItems[T any]( pageCount++ if err := npr.writeNextPage(ctx, pageResults, false); err != nil { - npr.err = clues.Stack(err) + if !errors.Is(err, errCancelled) { + npr.err = clues.Stack(err) + } + return } @@ -353,38 +391,6 @@ func DeltaEnumerateItems[T any]( "completed delta item enumeration", "item_count", itemCount, "page_count", pageCount) - - npr.du = DeltaUpdate{ - URL: newDeltaLink, - Reset: invalidPrevDelta, - } -} - -func batchDeltaEnumerateItems[T any]( - ctx context.Context, - pager DeltaHandler[T], - prevDeltaLink string, -) ([]T, DeltaUpdate, error) { - var ( - npr = nextPageResults[T]{ - pages: make(chan nextPage[T]), - } - results = []T{} - ) - - go DeltaEnumerateItems[T](ctx, pager, &npr, prevDeltaLink) - - for page, reset, done := npr.NextPage(); !done; page, reset, done = npr.NextPage() { - if reset { - results = []T{} - } - - results = append(results, page...) - } - - du, err := npr.Results() - - return results, du, clues.Stack(err).OrNil() } // --------------------------------------------------------------------------- @@ -415,45 +421,138 @@ type addedAndRemovedHandler[T any] func( error, ) +// batchWithMaxItemCount attempts to get itemLimit added items from the +// underlying pager. If itemLimit is 0 then gets all items from the pager. The +// item count is the actual number of added items, not just the number of added +// items seen. This means that duplicate items and items that are filtered out +// don't count towards the limit. +// +// Also respects context cancellations. +func batchWithMaxItemCount[T any]( + ctx context.Context, + resultsPager *nextPageResults[T], + itemLimit int, + getAddedAndRemoved addedAndRemovedHandler[T], + filters ...func(T) bool, +) (map[string]time.Time, []string, DeltaUpdate, error) { + var ( + done bool + reset bool + page []T + + removed []string + added = map[string]time.Time{} + ) + + // Can't use a for-loop variable declaration because the line ends up too + // long. + for !done && (itemLimit <= 0 || len(added) < itemLimit) { + // If the context was cancelled then exit early. This will keep us from + // accidentally reading from the pager more since it could pick to either + // send another page or see the context cancellation. We don't need to + // cancel the pager because it should see the context cancellation once we + // stop attempting to fetch the next page. + if ctx.Err() != nil { + return nil, nil, DeltaUpdate{}, clues.Stack(ctx.Err(), context.Cause(ctx)). + WithClues(ctx) + } + + // Get the next page first thing in the loop instead of last thing so we + // don't fetch an extra page we then discard when we've reached the item + // limit. That wouldn't affect correctness but would consume more tokens in + // our rate limiter. + page, reset, done = resultsPager.NextPage() + + if reset { + added = map[string]time.Time{} + removed = nil + } + + pageAdded, pageRemoved, err := getAddedAndRemoved(page, filters...) + if err != nil { + resultsPager.Cancel() + return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err) + } + + removed = append(removed, pageRemoved...) + + for k, v := range pageAdded { + added[k] = v + + if itemLimit > 0 && len(added) >= itemLimit { + break + } + } + } + + // Cancel the pager so we don't fetch the rest of the data it may have. + resultsPager.Cancel() + + du, err := resultsPager.Results() + if err != nil { + return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err) + } + + // We processed all the results from the pager. + return added, removed, du, nil +} + +// GetAddedAndRemovedItemIDs returns the set of item IDs that were added or +// removed from this resource since the last delta query. If no delta query is +// passed in then returns all items found, not just changed/removed ones. +// +// Use itemLimit to stop enumeration partway through and filters to pick which +// items should be returned. func GetAddedAndRemovedItemIDs[T any]( ctx context.Context, pager NonDeltaHandler[T], deltaPager DeltaHandler[T], prevDeltaLink string, canMakeDeltaQueries bool, + itemLimit int, aarh addedAndRemovedHandler[T], filters ...func(T) bool, ) (AddedAndRemoved, error) { if canMakeDeltaQueries { - ts, du, err := batchDeltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink) - if err != nil && !graph.IsErrInvalidDelta(err) && !graph.IsErrDeltaNotSupported(err) { - return AddedAndRemoved{}, graph.Stack(ctx, err) + npr := NewNextPageResults[T]() + + go DeltaEnumerateItems[T](ctx, deltaPager, npr, prevDeltaLink) + + added, removed, du, err := batchWithMaxItemCount( + ctx, + npr, + itemLimit, + aarh, + filters...) + aar := AddedAndRemoved{ + Added: added, + Removed: removed, + DU: du, + ValidModTimes: deltaPager.ValidModTimes(), } - if err == nil { - a, r, err := aarh(ts, filters...) - aar := AddedAndRemoved{ - Added: a, - Removed: r, - DU: du, - ValidModTimes: deltaPager.ValidModTimes(), - } - + if err != nil && !graph.IsErrInvalidDelta(err) && !graph.IsErrDeltaNotSupported(err) { + return AddedAndRemoved{}, graph.Stack(ctx, err) + } else if err == nil { return aar, graph.Stack(ctx, err).OrNil() } } du := DeltaUpdate{Reset: true} + npr := NewNextPageResults[T]() - ts, err := BatchEnumerateItems(ctx, pager) - if err != nil { - return AddedAndRemoved{}, graph.Stack(ctx, err) - } + go EnumerateItems[T](ctx, pager, npr) + + added, removed, _, err := batchWithMaxItemCount( + ctx, + npr, + itemLimit, + aarh, + filters...) - a, r, err := aarh(ts, filters...) aar := AddedAndRemoved{ - Added: a, - Removed: r, + Added: added, + Removed: removed, DU: du, ValidModTimes: pager.ValidModTimes(), } diff --git a/src/pkg/services/m365/api/pagers/pagers_test.go b/src/pkg/services/m365/api/pagers/pagers_test.go index bcd1fa5bd..121f830bc 100644 --- a/src/pkg/services/m365/api/pagers/pagers_test.go +++ b/src/pkg/services/m365/api/pagers/pagers_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "golang.org/x/exp/maps" "github.com/alcionai/corso/src/internal/common/ptr" "github.com/alcionai/corso/src/internal/tester" @@ -326,25 +325,11 @@ func assertSliceEmptyOr[S ~[]E, E any]( assertionFunc(t, expect, got, msgAndArgs) } -func assertMapEmptyOr[M ~map[K]V, K comparable, V any]( - t *testing.T, - expect M, - got M, - assertionFunc assert.ComparisonAssertionFunc, - msgAndArgs ...any, -) { - if len(expect) == 0 { - assert.Empty(t, got, msgAndArgs) - return - } - - assertionFunc(t, expect, got, msgAndArgs) -} - func assertAddedAndRemoved( t *testing.T, validModTimes bool, wantAdded []testItem, + partialAdded []testItem, gotAdded map[string]time.Time, wantRemoved []testItem, gotRemoved []string, @@ -352,32 +337,42 @@ func assertAddedAndRemoved( epoch, err := time.Parse(time.DateOnly, "1970-01-01") require.NoError(t, err, clues.ToCore(err)) - expectAdded := map[string]time.Time{} + requireAdded := map[string]time.Time{} for _, item := range wantAdded { - expectAdded[item.id] = item.modTime + requireAdded[item.id] = item.modTime } - if validModTimes { - assertMapEmptyOr( - t, - expectAdded, - gotAdded, - assert.Equal, - "added item IDs and mod times") - } else { - assertSliceEmptyOr( - t, - maps.Keys(expectAdded), - maps.Keys(gotAdded), - assert.ElementsMatch, - "added item IDs") + maybeAdded := map[string]time.Time{} + for _, item := range partialAdded { + maybeAdded[item.id] = item.modTime + } - for _, modtime := range gotAdded { - assert.True(t, modtime.After(epoch), "mod time after epoch") - assert.False(t, modtime.IsZero(), "non-zero mod time") + for id, mt := range gotAdded { + var ( + wantMT time.Time + found bool + ) + + if wantMT, found = requireAdded[id]; found { + delete(requireAdded, id) + } else if wantMT, found = maybeAdded[id]; found { + delete(maybeAdded, id) + } + + if !assert.True(t, found, "unexpected added item with ID %v", id) { + continue + } + + if validModTimes { + assert.Equal(t, wantMT, mt, "mod time for item with ID %v", id) + } else { + assert.True(t, mt.After(epoch), "mod time after epoch for item with ID %v", id) + assert.False(t, mt.IsZero(), "non-zero mod time for item with ID %v", id) } } + assert.Empty(t, requireAdded, "required items not added") + expectRemoved := []string{} for _, item := range wantRemoved { expectRemoved = append(expectRemoved, item.id) @@ -399,6 +394,10 @@ type modTimeTest struct { var ( addedItem1 = addedItem("a_uno", time.Now()) addedItem2 = addedItem("a_dos", time.Now()) + addedItem3 = addedItem("a_tres", time.Now()) + addedItem4 = addedItem("a_quatro", time.Now()) + addedItem5 = addedItem("a_cinco", time.Now()) + addedItem6 = addedItem("a_seis", time.Now()) removedItem1 = removedItem("r_uno") removedItem2 = removedItem("r_dos") @@ -427,7 +426,7 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { p *testIDsNonDeltaMultiPager, ) (NonDeltaHandler[testItem], DeltaHandler[testItem]) expectDeltaReset bool - expectDeltaLink assert.ValueAssertionFunc + expectNoDelta bool }{ { name: "NoPrevDelta", @@ -438,7 +437,6 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { return nil, newDeltaPager(p) }, expectDeltaReset: true, - expectDeltaLink: assert.NotEmpty, }, { name: "PrevDelta", @@ -449,7 +447,6 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { ) (NonDeltaHandler[testItem], DeltaHandler[testItem]) { return nil, newDeltaPager(p) }, - expectDeltaLink: assert.NotEmpty, }, { name: "DeltaNotAllowed", @@ -460,14 +457,26 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { return p, nil }, expectDeltaReset: true, - expectDeltaLink: assert.Empty, + expectNoDelta: true, }, } type expected struct { errCheck assert.ErrorAssertionFunc added []testItem + // finalPageAdded is the set of items on the last page for queries that + // limit results. Some of these items may not be returned, but we don't + // make guarantees about which ones those will be. + finalPageAdded []testItem + // numAdded is the total number of added items that should be returned. + numAdded int removed []testItem + // maxGetterIdx is the maximum index for the item page getter. Helps ensure + // we're stopping enumeration when we reach the item limit. + maxGetterIdx int + // expectNoDelta should be set if we exit enumeration early due to the item + // limit so we don't get a delta token. + noDelta bool } table := []struct { @@ -476,8 +485,10 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { t *testing.T, validModTimes bool, ) *testIDsNonDeltaMultiPager - filter func(a testItem) bool - expect expected + filter func(a testItem) bool + expect expected + limit int + ctxCancelled bool }{ { name: "OnePage", @@ -503,7 +514,9 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { addedItem1, addedItem2, }, - removed: []testItem{removedItem1, removedItem2}, + numAdded: 2, + removed: []testItem{removedItem1, removedItem2}, + maxGetterIdx: 1, }, }, { @@ -534,7 +547,9 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { addedItem1, addedItem2, }, - removed: []testItem{removedItem1, removedItem2}, + numAdded: 2, + removed: []testItem{removedItem1, removedItem2}, + maxGetterIdx: 2, }, }, { @@ -557,7 +572,8 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { }, filter: func(testItem) bool { return false }, expect: expected{ - errCheck: assert.NoError, + errCheck: assert.NoError, + maxGetterIdx: 1, }, }, { @@ -584,7 +600,8 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { }, filter: func(testItem) bool { return false }, expect: expected{ - errCheck: assert.NoError, + errCheck: assert.NoError, + maxGetterIdx: 2, }, }, { @@ -604,6 +621,194 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { errCheck: assert.Error, }, }, + { + name: "FourValidPages OnlyPartOfSecondPage", + pagerGetter: func( + t *testing.T, + validModTimes bool, + ) *testIDsNonDeltaMultiPager { + return &testIDsNonDeltaMultiPager{ + t: t, + validModTimes: validModTimes, + pages: []pageResult{ + { + items: []testItem{ + addedItem1, + addedItem2, + }, + }, + { + items: []testItem{ + addedItem3, + addedItem4, + }, + }, + { + items: []testItem{ + addedItem("cinco", time.Now()), + addedItem("seis", time.Now()), + }, + }, + { + items: []testItem{ + addedItem("siete", time.Now()), + addedItem("ocho", time.Now()), + }, + }, + }, + } + }, + expect: expected{ + errCheck: assert.NoError, + added: []testItem{ + addedItem1, + addedItem2, + }, + finalPageAdded: []testItem{ + addedItem3, + addedItem4, + }, + numAdded: 3, + maxGetterIdx: 3, + noDelta: true, + }, + limit: 3, + }, + { + name: "FourValidPages OnlyPartOfSecondPage SomeItemsFiltered", + pagerGetter: func( + t *testing.T, + validModTimes bool, + ) *testIDsNonDeltaMultiPager { + return &testIDsNonDeltaMultiPager{ + t: t, + validModTimes: validModTimes, + pages: []pageResult{ + { + items: []testItem{ + addedItem1, + addedItem2, + }, + }, + { + items: []testItem{ + addedItem3, + addedItem4, + }, + }, + { + items: []testItem{ + addedItem5, + addedItem6, + }, + }, + { + items: []testItem{ + addedItem("siete", time.Now()), + addedItem("ocho", time.Now()), + }, + }, + }, + } + }, + filter: func(item testItem) bool { + return item.id != addedItem2.id + }, + expect: expected{ + errCheck: assert.NoError, + added: []testItem{ + addedItem1, + }, + finalPageAdded: []testItem{ + addedItem3, + addedItem4, + }, + numAdded: 2, + maxGetterIdx: 3, + noDelta: true, + }, + limit: 2, + }, + { + name: "ThreeValidPages OnlyPartOfThirdPage RepeatItem", + pagerGetter: func( + t *testing.T, + validModTimes bool, + ) *testIDsNonDeltaMultiPager { + return &testIDsNonDeltaMultiPager{ + t: t, + validModTimes: validModTimes, + pages: []pageResult{ + { + items: []testItem{ + addedItem1, + addedItem2, + removedItem1, + }, + }, + { + items: []testItem{ + addedItem1, + addedItem4, + addedItem3, + removedItem2, + }, + }, + { + items: []testItem{ + addedItem5, + addedItem6, + }, + }, + }, + } + }, + expect: expected{ + errCheck: assert.NoError, + added: []testItem{ + addedItem1, + addedItem2, + addedItem3, + addedItem4, + }, + finalPageAdded: []testItem{ + addedItem5, + addedItem6, + }, + removed: []testItem{ + removedItem1, + removedItem2, + }, + numAdded: 5, + maxGetterIdx: 3, + }, + limit: 5, + }, + { + name: "ParentContextCancelled", + pagerGetter: func( + t *testing.T, + validModTimes bool, + ) *testIDsNonDeltaMultiPager { + return &testIDsNonDeltaMultiPager{ + t: t, + validModTimes: true, + pages: []pageResult{ + { + items: []testItem{ + addedItem("uno", time.Now()), + addedItem("dos", time.Now()), + removedItem("siete"), + }, + }, + }, + } + }, + expect: expected{ + errCheck: assert.Error, + }, + ctxCancelled: true, + }, } for _, modTimeTest := range modTimeTests { @@ -617,6 +822,14 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { ctx, flush := tester.NewContext(t) defer flush() + ictx, cancel := context.WithCancel(ctx) + + if test.ctxCancelled { + cancel() + } else { + defer cancel() + } + filters := []func(testItem) bool{} if test.filter != nil { filters = append(filters, test.filter) @@ -626,11 +839,12 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { getter, deltaGetter := pagerTypeTest.pagersFunc(basePager) addRemoved, err := GetAddedAndRemovedItemIDs[testItem]( - ctx, + ictx, getter, deltaGetter, pagerTypeTest.prevDelta, pagerTypeTest.canUseDelta, + test.limit, AddedAndRemovedByAddtlData[testItem], filters...) test.expect.errCheck(t, err, "getting added and removed item IDs: %+v", clues.ToCore(err)) @@ -639,14 +853,23 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() { return } + assert.Len(t, addRemoved.Added, test.expect.numAdded, "number of added items") + assert.GreaterOrEqual(t, test.expect.maxGetterIdx, basePager.pageIdx, "number of pager calls") + assert.Equal(t, modTimeTest.validModTimes, addRemoved.ValidModTimes, "valid mod times") - assert.Equal(t, pagerTypeTest.expectDeltaReset, addRemoved.DU.Reset, "delta update") - pagerTypeTest.expectDeltaLink(t, addRemoved.DU.URL, "delta link") + assert.Equal(t, pagerTypeTest.expectDeltaReset, addRemoved.DU.Reset, "delta reset") + + if pagerTypeTest.expectNoDelta || test.expect.noDelta { + assert.Empty(t, addRemoved.DU.URL, "delta link") + } else { + assert.NotEmpty(t, addRemoved.DU.URL, "delta link") + } assertAddedAndRemoved( t, modTimeTest.validModTimes, test.expect.added, + test.expect.finalPageAdded, addRemoved.Added, test.expect.removed, addRemoved.Removed) @@ -683,6 +906,7 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs_FallbackPagers() { t *testing.T, validModTimes bool, ) DeltaHandler[testItem] + limit int expect expected }{ { @@ -867,6 +1091,130 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs_FallbackPagers() { deltaLink: assert.Empty, }, }, + { + name: "FivePages DeltaReset LimitedItems", + pagerGetter: nilPager, + deltaPagerGetter: func( + t *testing.T, + validModTimes bool, + ) DeltaHandler[testItem] { + return newDeltaPager( + &testIDsNonDeltaMultiPager{ + t: t, + pages: []pageResult{ + { + items: []testItem{ + addedItem1, + removedItem1, + }, + }, + { + errCode: "SyncStateNotFound", + needsReset: true, + }, + { + items: []testItem{ + removedItem2, + addedItem2, + }, + }, + { + items: []testItem{ + addedItem3, + addedItem4, + }, + }, + { + items: []testItem{ + addedItem5, + addedItem6, + }, + }, + }, + validModTimes: validModTimes, + }) + }, + limit: 3, + expect: expected{ + errCheck: assert.NoError, + added: []testItem{ + addedItem2, + addedItem3, + addedItem4, + }, + removed: []testItem{ + removedItem2, + }, + deltaLink: assert.Empty, + }, + }, + { + name: "FivePages DeltaNoSupported LimitedItems", + pagerGetter: func( + t *testing.T, + validModTimes bool, + ) NonDeltaHandler[testItem] { + return &testIDsNonDeltaMultiPager{ + t: t, + pages: []pageResult{ + { + items: []testItem{ + removedItem2, + addedItem2, + }, + }, + { + items: []testItem{ + addedItem3, + addedItem4, + }, + }, + { + items: []testItem{ + addedItem5, + addedItem6, + }, + }, + }, + validModTimes: validModTimes, + } + }, + deltaPagerGetter: func( + t *testing.T, + validModTimes bool, + ) DeltaHandler[testItem] { + return newDeltaPager( + &testIDsNonDeltaMultiPager{ + t: t, + pages: []pageResult{ + { + items: []testItem{ + addedItem1, + removedItem1, + }, + }, + { + err: graph.ErrDeltaNotSupported, + needsReset: true, + }, + }, + validModTimes: validModTimes, + }) + }, + limit: 3, + expect: expected{ + errCheck: assert.NoError, + added: []testItem{ + addedItem2, + addedItem3, + addedItem4, + }, + removed: []testItem{ + removedItem2, + }, + deltaLink: assert.Empty, + }, + }, } for _, modTimeTest := range modTimeTests { @@ -884,6 +1232,7 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs_FallbackPagers() { test.deltaPagerGetter(t, modTimeTest.validModTimes), "a", true, + test.limit, AddedAndRemovedByAddtlData[testItem]) require.NoError( t, @@ -892,13 +1241,14 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs_FallbackPagers() { clues.ToCore(err)) assert.Equal(t, modTimeTest.validModTimes, addRemoved.ValidModTimes, "valid mod times") - assert.True(t, addRemoved.DU.Reset, "delta update") + assert.True(t, addRemoved.DU.Reset, "delta reset") test.expect.deltaLink(t, addRemoved.DU.URL, "delta link") assertAddedAndRemoved( t, modTimeTest.validModTimes, test.expect.added, + nil, addRemoved.Added, test.expect.removed, addRemoved.Removed)