api pagers use channels for page streaming

Replaces the batch processing of api pagers with stream
processing using channels.  Each page is passed to the
channel for callers to consume iteratively.  For delta pagers,
the reset notification is also streamed through the same
channel.  Channel consumers are expeced to react to the
reset as needed.

For quality of life, most pagers now utilize a batching
wrapper instead of transitioning to streaming support.
Only locations that already expect stream handling will
get moved to that support in the following PRs.

This is the first of a multipart update that has been separated
for ease of review.  CI is not expected to pass until the final
PR.
This commit is contained in:
ryanfkeepers 2023-09-30 10:10:52 -06:00
parent 3656e04676
commit 4a295be9ed
8 changed files with 135 additions and 46 deletions

View File

@ -227,7 +227,7 @@ func (c Channels) GetChannelMessageReplies(
ctx context.Context, ctx context.Context,
teamID, channelID, messageID string, teamID, channelID, messageID string,
) ([]models.ChatMessageable, error) { ) ([]models.ChatMessageable, error) {
return enumerateItems[models.ChatMessageable]( return batchEnumerateItems[models.ChatMessageable](
ctx, ctx,
c.NewChannelMessageRepliesPager(teamID, channelID, messageID)) c.NewChannelMessageRepliesPager(teamID, channelID, messageID))
} }
@ -284,5 +284,5 @@ func (c Channels) GetChannels(
ctx context.Context, ctx context.Context,
teamID string, teamID string,
) ([]models.Channelable, error) { ) ([]models.Channelable, error) {
return enumerateItems[models.Channelable](ctx, c.NewChannelPager(teamID)) return batchEnumerateItems[models.Channelable](ctx, c.NewChannelPager(teamID))
} }

View File

@ -84,7 +84,7 @@ func (c Contacts) EnumerateContainers(
pgr = c.NewContactFoldersPager(userID, baseContainerID, immutableIDs) pgr = c.NewContactFoldersPager(userID, baseContainerID, immutableIDs)
) )
containers, err := enumerateItems(ctx, pgr) containers, err := batchEnumerateItems(ctx, pgr)
if err != nil { if err != nil {
return graph.Stack(ctx, err) return graph.Stack(ctx, err)
} }
@ -165,7 +165,7 @@ func (c Contacts) GetItemsInContainerByCollisionKey(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewContactsPager(userID, containerID, false, contactCollisionKeyProps()...) pager := c.NewContactsPager(userID, containerID, false, contactCollisionKeyProps()...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating contacts") return nil, graph.Wrap(ctx, err, "enumerating contacts")
} }
@ -186,7 +186,7 @@ func (c Contacts) GetItemIDsInContainer(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewContactsPager(userID, containerID, false, idAnd()...) pager := c.NewContactsPager(userID, containerID, false, idAnd()...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating contacts") return nil, graph.Wrap(ctx, err, "enumerating contacts")
} }

View File

@ -77,7 +77,7 @@ func (c Drives) GetItemsInContainerByCollisionKey(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewDriveItemPager(driveID, containerID, idAnd("name")...) pager := c.NewDriveItemPager(driveID, containerID, idAnd("name")...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating drive items") return nil, graph.Wrap(ctx, err, "enumerating drive items")
} }
@ -101,7 +101,7 @@ func (c Drives) GetItemIDsInContainer(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewDriveItemPager(driveID, containerID, idAnd("file", "folder")...) pager := c.NewDriveItemPager(driveID, containerID, idAnd("file", "folder")...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating contacts") return nil, graph.Wrap(ctx, err, "enumerating contacts")
} }
@ -197,26 +197,27 @@ func (p *DriveItemDeltaPageCtrl) ValidModTimes() bool {
return true return true
} }
// EnumerateDriveItems will enumerate all items in the specified drive and hand them to the // EnumerateDriveItems will enumerate all items in the specified drive and stream them page
// provided `collector` method // by page, along with the delta update and any errors, to the provided channel.
func (c Drives) EnumerateDriveItemsDelta( func (c Drives) EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
ch chan<- NextPage[models.DriveItemable],
driveID string, driveID string,
prevDeltaLink string, prevDeltaLink string,
selectProps []string, selectProps []string,
) ( ) (DeltaUpdate, error) {
[]models.DriveItemable, deltaPager := c.newDriveItemDeltaPager(
DeltaUpdate, driveID,
error, prevDeltaLink,
) { selectProps...)
pager := c.newDriveItemDeltaPager(driveID, prevDeltaLink, selectProps...)
items, du, err := deltaEnumerateItems[models.DriveItemable](ctx, pager, prevDeltaLink) du, err := deltaEnumerateItems[models.DriveItemable](
if err != nil { ctx,
return nil, du, clues.Stack(err) deltaPager,
} ch,
prevDeltaLink)
return items, du, nil return du, clues.Stack(err).OrNil()
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -354,11 +355,17 @@ func GetAllDrives(
ctx context.Context, ctx context.Context,
pager Pager[models.Driveable], pager Pager[models.Driveable],
) ([]models.Driveable, error) { ) ([]models.Driveable, error) {
ds, err := enumerateItems(ctx, pager) ds, err := batchEnumerateItems(ctx, pager)
if err != nil && (clues.HasLabel(err, graph.LabelsMysiteNotFound) ||
clues.HasLabel(err, graph.LabelsNoSharePointLicense)) { // no license or drives available.
// return a non-error and let the caller assume an empty result set.
// TODO: is this the best wayy to handle this?
// what about returning a ResourceNotFound error as is standard elsewhere?
if err != nil &&
(clues.HasLabel(err, graph.LabelsMysiteNotFound) || clues.HasLabel(err, graph.LabelsNoSharePointLicense)) {
logger.CtxErr(ctx, err).Infof("resource owner does not have a drive") logger.CtxErr(ctx, err).Infof("resource owner does not have a drive")
return make([]models.Driveable, 0), nil // no license or drives.
return make([]models.Driveable, 0), nil
} }
return ds, graph.Stack(ctx, err).OrNil() return ds, graph.Stack(ctx, err).OrNil()

View File

@ -4,6 +4,7 @@ import (
"testing" "testing"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -185,7 +186,17 @@ func (suite *DrivePagerIntgSuite) TestEnumerateDriveItems() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
items, du, err := suite.its. ch := make(chan api.NextPage[models.DriveItemable], 1)
items := []models.DriveItemable{}
go func() {
for np := range ch {
items = append(items, np.Items...)
assert.False(t, np.Reset, "should not reset")
}
}()
du, err := suite.its.
ac. ac.
Drives(). Drives().
EnumerateDriveItemsDelta(ctx, suite.its.user.driveID, "", api.DefaultDriveItemProps()) EnumerateDriveItemsDelta(ctx, suite.its.user.driveID, "", api.DefaultDriveItemProps())

View File

@ -85,7 +85,7 @@ func (c Events) EnumerateContainers(
pgr = c.NewEventCalendarsPager(userID, immutableIDs) pgr = c.NewEventCalendarsPager(userID, immutableIDs)
) )
containers, err := enumerateItems(ctx, pgr) containers, err := batchEnumerateItems(ctx, pgr)
if err != nil { if err != nil {
return graph.Stack(ctx, err) return graph.Stack(ctx, err)
} }
@ -169,7 +169,7 @@ func (c Events) GetItemsInContainerByCollisionKey(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewEventsPager(userID, containerID, false, eventCollisionKeyProps()...) pager := c.NewEventsPager(userID, containerID, false, eventCollisionKeyProps()...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating events") return nil, graph.Wrap(ctx, err, "enumerating events")
} }
@ -190,7 +190,7 @@ func (c Events) GetItemIDsInContainer(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewEventsPager(userID, containerID, false, idAnd()...) pager := c.NewEventsPager(userID, containerID, false, idAnd()...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating events") return nil, graph.Wrap(ctx, err, "enumerating events")
} }

View File

@ -27,6 +27,15 @@ type DeltaUpdate struct {
Reset bool Reset bool
} }
type NextPage[T any] struct {
Items []T
// Reset is only true on the iteration where the delta pager's Reset()
// is called. Callers can use it to reset any data aggregation they
// currently use. After that loop, it will be false again, though the
// DeltaUpdate will still contain the expected value.
Reset bool
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// common interfaces // common interfaces
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -100,7 +109,10 @@ type Pager[T any] interface {
func enumerateItems[T any]( func enumerateItems[T any](
ctx context.Context, ctx context.Context,
pager Pager[T], pager Pager[T],
) ([]T, error) { ch chan<- NextPage[T],
) error {
defer close(ch)
var ( var (
result = make([]T, 0) result = make([]T, 0)
// stubbed initial value to ensure we enter the loop. // stubbed initial value to ensure we enter the loop.
@ -111,10 +123,11 @@ func enumerateItems[T any](
// get the next page of data, check for standard errors // get the next page of data, check for standard errors
page, err := pager.GetPage(ctx) page, err := pager.GetPage(ctx)
if err != nil { if err != nil {
return nil, graph.Stack(ctx, err) return graph.Stack(ctx, err)
} }
result = append(result, page.GetValue()...) ch <- NextPage[T]{Items: page.GetValue()}
nextLink = NextLink(page) nextLink = NextLink(page)
pager.SetNextLink(nextLink) pager.SetNextLink(nextLink)
@ -122,7 +135,27 @@ func enumerateItems[T any](
logger.Ctx(ctx).Infow("completed delta item enumeration", "result_count", len(result)) logger.Ctx(ctx).Infow("completed delta item enumeration", "result_count", len(result))
return result, nil return nil
}
func batchEnumerateItems[T any](
ctx context.Context,
pager Pager[T],
) ([]T, error) {
var (
ch = make(chan NextPage[T])
results = []T{}
)
go func() {
for np := range ch {
results = append(results, np.Items...)
}
}()
err := enumerateItems[T](ctx, pager, ch)
return results, clues.Stack(err).OrNil()
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -136,11 +169,19 @@ type DeltaPager[T any] interface {
ValidModTimer ValidModTimer
} }
// enumerates pages of items, streaming each page to the provided channel.
// the DeltaUpdate, reset notifications, and any errors are also fed to the
// same channel.
// Returns false if conditions disallow making delta calls for the provided
// pager. Returns true otherwise, even in the event of an error.
func deltaEnumerateItems[T any]( func deltaEnumerateItems[T any](
ctx context.Context, ctx context.Context,
pager DeltaPager[T], pager DeltaPager[T],
ch chan<- NextPage[T],
prevDeltaLink string, prevDeltaLink string,
) ([]T, DeltaUpdate, error) { ) (DeltaUpdate, error) {
defer close(ch)
var ( var (
result = make([]T, 0) result = make([]T, 0)
// stubbed initial value to ensure we enter the loop. // stubbed initial value to ensure we enter the loop.
@ -160,28 +201,33 @@ func deltaEnumerateItems[T any](
page, err := pager.GetPage(graph.ConsumeNTokens(ctx, consume)) page, err := pager.GetPage(graph.ConsumeNTokens(ctx, consume))
if graph.IsErrDeltaNotSupported(err) { if graph.IsErrDeltaNotSupported(err) {
logger.Ctx(ctx).Infow("delta queries not supported") logger.Ctx(ctx).Infow("delta queries not supported")
return nil, DeltaUpdate{}, clues.Stack(graph.ErrDeltaNotSupported, err)
pager.Reset(ctx)
ch <- NextPage[T]{Reset: true}
return DeltaUpdate{}, clues.Stack(err)
} }
if graph.IsErrInvalidDelta(err) { if graph.IsErrInvalidDelta(err) {
logger.Ctx(ctx).Infow("invalid previous delta", "delta_link", prevDeltaLink) logger.Ctx(ctx).Infow("invalid previous delta", "delta_link", prevDeltaLink)
invalidPrevDelta = true invalidPrevDelta = true
// Reset limiter consumption since we don't have a valid delta token.
// Set limiter consumption rate to non-delta.
consume = graph.DeltaNoTokenLC consume = graph.DeltaNoTokenLC
result = make([]T, 0)
// Reset tells the pager to try again after ditching its delta history. // Reset tells the pager to try again after ditching its delta history.
pager.Reset(ctx) pager.Reset(ctx)
ch <- NextPage[T]{Reset: true}
continue continue
} }
if err != nil { if err != nil {
return nil, DeltaUpdate{}, graph.Wrap(ctx, err, "retrieving page") return DeltaUpdate{}, clues.Stack(err)
} }
result = append(result, page.GetValue()...) ch <- NextPage[T]{Items: page.GetValue()}
nl, deltaLink := NextAndDeltaLink(page) nl, deltaLink := NextAndDeltaLink(page)
if len(deltaLink) > 0 { if len(deltaLink) > 0 {
@ -199,7 +245,32 @@ func deltaEnumerateItems[T any](
Reset: invalidPrevDelta, Reset: invalidPrevDelta,
} }
return result, du, nil return du, nil
}
func batchDeltaEnumerateItems[T any](
ctx context.Context,
pager DeltaPager[T],
prevDeltaLink string,
) ([]T, DeltaUpdate, error) {
var (
ch = make(chan NextPage[T])
results = []T{}
)
go func() {
for np := range ch {
if np.Reset {
results = []T{}
}
results = append(results, np.Items...)
}
}()
du, err := deltaEnumerateItems[T](ctx, pager, ch, prevDeltaLink)
return results, du, clues.Stack(err).OrNil()
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -217,7 +288,7 @@ func getAddedAndRemovedItemIDs[T any](
aarh addedAndRemovedHandler[T], aarh addedAndRemovedHandler[T],
) (map[string]time.Time, bool, []string, DeltaUpdate, error) { ) (map[string]time.Time, bool, []string, DeltaUpdate, error) {
if canMakeDeltaQueries { if canMakeDeltaQueries {
ts, du, err := deltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink) ts, du, err := batchDeltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink)
if err != nil && !graph.IsErrInvalidDelta(err) && !graph.IsErrDeltaNotSupported(err) { if err != nil && !graph.IsErrInvalidDelta(err) && !graph.IsErrDeltaNotSupported(err) {
return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err) return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err)
} }
@ -230,7 +301,7 @@ func getAddedAndRemovedItemIDs[T any](
du := DeltaUpdate{Reset: true} du := DeltaUpdate{Reset: true}
ts, err := enumerateItems(ctx, pager) ts, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err) return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err)
} }

View File

@ -245,7 +245,7 @@ func (suite *PagerUnitSuite) TestEnumerateItems() {
pageErr: assert.AnError, pageErr: assert.AnError,
} }
}, },
expect: nil, expect: []any{},
expectErr: require.Error, expectErr: require.Error,
}, },
} }
@ -257,7 +257,7 @@ func (suite *PagerUnitSuite) TestEnumerateItems() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
result, err := enumerateItems(ctx, test.getPager(t, ctx)) result, err := batchEnumerateItems(ctx, test.getPager(t, ctx))
test.expectErr(t, err, clues.ToCore(err)) test.expectErr(t, err, clues.ToCore(err))
require.EqualValues(t, test.expect, result) require.EqualValues(t, test.expect, result)

View File

@ -81,7 +81,7 @@ func (c Mail) EnumerateContainers(
pgr = c.NewMailFoldersPager(userID, immutableIDs) pgr = c.NewMailFoldersPager(userID, immutableIDs)
) )
containers, err := enumerateItems(ctx, pgr) containers, err := batchEnumerateItems(ctx, pgr)
if err != nil { if err != nil {
return graph.Stack(ctx, err) return graph.Stack(ctx, err)
} }
@ -162,7 +162,7 @@ func (c Mail) GetItemsInContainerByCollisionKey(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewMailPager(userID, containerID, false, mailCollisionKeyProps()...) pager := c.NewMailPager(userID, containerID, false, mailCollisionKeyProps()...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating mails") return nil, graph.Wrap(ctx, err, "enumerating mails")
} }
@ -183,7 +183,7 @@ func (c Mail) GetItemIDsInContainer(
ctx = clues.Add(ctx, "container_id", containerID) ctx = clues.Add(ctx, "container_id", containerID)
pager := c.NewMailPager(userID, containerID, false, idAnd()...) pager := c.NewMailPager(userID, containerID, false, idAnd()...)
items, err := enumerateItems(ctx, pager) items, err := batchEnumerateItems(ctx, pager)
if err != nil { if err != nil {
return nil, graph.Wrap(ctx, err, "enumerating mails") return nil, graph.Wrap(ctx, err, "enumerating mails")
} }