replace channel creation with pager interface

instead of having callers pass in channels, create channels
when starting an api enumeration and return a pager to the
caller which allows them to retrieve each page at a time.
This commit is contained in:
ryanfkeepers 2023-10-09 16:53:05 -06:00
parent 4a295be9ed
commit eb0219de6e
7 changed files with 111 additions and 68 deletions

View File

@ -86,12 +86,7 @@ type EnumerateDriveItemsDeltaer interface {
EnumerateDriveItemsDelta( EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
driveID, prevDeltaLink string, driveID, prevDeltaLink string,
selectProps []string, ) api.NextPageResulter[models.DriveItemable]
) (
[]models.DriveItemable,
api.DeltaUpdate,
error,
)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -138,7 +138,7 @@ func (h itemBackupHandler) EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
driveID, prevDeltaLink string, driveID, prevDeltaLink string,
selectProps []string, selectProps []string,
) ([]models.DriveItemable, api.DeltaUpdate, error) { ) (api.NextPageResulter[models.DriveItemable], error) {
return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, selectProps) return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, selectProps)
} }

View File

@ -141,7 +141,7 @@ func (h libraryBackupHandler) EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
driveID, prevDeltaLink string, driveID, prevDeltaLink string,
selectProps []string, selectProps []string,
) ([]models.DriveItemable, api.DeltaUpdate, error) { ) api.NextPageResulter[models.DriveItemable] {
return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, selectProps) return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, selectProps)
} }

View File

@ -288,11 +288,7 @@ func (edi EnumeratesDriveItemsDelta) EnumerateDriveItemsDelta(
_ context.Context, _ context.Context,
driveID, _ string, driveID, _ string,
_ []string, _ []string,
) ( ) api.NextPageResulter[models.DriveItemable] {
[]models.DriveItemable,
api.DeltaUpdate,
error,
) {
return edi.Items[driveID], edi.DeltaUpdate[driveID], edi.Err[driveID] return edi.Items[driveID], edi.DeltaUpdate[driveID], edi.Err[driveID]
} }

View File

@ -201,23 +201,29 @@ func (p *DriveItemDeltaPageCtrl) ValidModTimes() bool {
// by page, along with the delta update and any errors, to the provided channel. // by page, along with the delta update and any errors, to the provided channel.
func (c Drives) EnumerateDriveItemsDelta( func (c Drives) EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
ch chan<- NextPage[models.DriveItemable],
driveID string, driveID string,
prevDeltaLink string, prevDeltaLink string,
selectProps []string, selectProps []string,
) (DeltaUpdate, error) { ) NextPageResulter[models.DriveItemable] {
deltaPager := c.newDriveItemDeltaPager( deltaPager := c.newDriveItemDeltaPager(
driveID, driveID,
prevDeltaLink, prevDeltaLink,
selectProps...) selectProps...)
du, err := deltaEnumerateItems[models.DriveItemable]( npr := &nextPageResults[models.DriveItemable]{
pages: make(chan nextPage[models.DriveItemable]),
}
// asynchronously enumerate pages on the caller's behalf.
// they only need to consume the pager and call Results at
// the end.
go deltaEnumerateItems[models.DriveItemable](
ctx, ctx,
deltaPager, deltaPager,
ch, npr,
prevDeltaLink) prevDeltaLink)
return du, clues.Stack(err).OrNil() return npr
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -186,21 +186,22 @@ func (suite *DrivePagerIntgSuite) TestEnumerateDriveItems() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
ch := make(chan api.NextPage[models.DriveItemable], 1)
items := []models.DriveItemable{} items := []models.DriveItemable{}
go func() { pager := suite.its.
for np := range ch {
items = append(items, np.Items...)
assert.False(t, np.Reset, "should not reset")
}
}()
du, err := suite.its.
ac. ac.
Drives(). Drives().
EnumerateDriveItemsDelta(ctx, suite.its.user.driveID, "", api.DefaultDriveItemProps()) EnumerateDriveItemsDelta(ctx, suite.its.user.driveID, "", api.DefaultDriveItemProps())
for page, reset, done := pager.NextPage(); !done; {
items = append(items, page...)
assert.False(t, reset, "should not reset")
}
du, err := pager.Results()
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, items, "no items found in user's drive") require.NotEmpty(t, items, "should find items in user's drive")
assert.NotEmpty(t, du.URL, "should have a delta link") assert.NotEmpty(t, du.URL, "should have a delta link")
} }

View File

@ -27,13 +27,55 @@ type DeltaUpdate struct {
Reset bool Reset bool
} }
type NextPage[T any] struct { type NextPager[T any] interface {
Items []T NextPage() (items []T, reset, done bool)
// Reset is only true on the iteration where the delta pager's Reset() }
type nextPage[T any] struct {
items []T
// reset is only true on the iteration where the delta pager's Reset()
// is called. Callers can use it to reset any data aggregation they // is called. Callers can use it to reset any data aggregation they
// currently use. After that loop, it will be false again, though the // currently use. After that loop, it will be false again, though the
// DeltaUpdate will still contain the expected value. // DeltaUpdate will still contain the expected value.
Reset bool reset bool
}
type NextPageResulter[T any] interface {
NextPager[T]
Results() (DeltaUpdate, error)
}
var _ NextPageResulter[any] = &nextPageResults[any]{}
type nextPageResults[T any] struct {
pages chan nextPage[T]
du DeltaUpdate
err error
}
func (npr *nextPageResults[T]) NextPage() ([]T, bool, bool) {
if npr.pages == nil {
return nil, false, true
}
np := <-npr.pages
return np.items, np.reset, false
}
func (npr *nextPageResults[T]) Results() (DeltaUpdate, error) {
// if the pager hasn't closed yet, drain out the pages iterator
// to avoid leaking routines, and to ensure we get results.
for npr.pages != nil {
<-npr.pages
}
return npr.du, npr.err
}
func (npr *nextPageResults[T]) close() {
npr.pages = nil
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -109,9 +151,9 @@ type Pager[T any] interface {
func enumerateItems[T any]( func enumerateItems[T any](
ctx context.Context, ctx context.Context,
pager Pager[T], pager Pager[T],
ch chan<- NextPage[T], npr *nextPageResults[T],
) error { ) {
defer close(ch) defer npr.close()
var ( var (
result = make([]T, 0) result = make([]T, 0)
@ -123,10 +165,11 @@ func enumerateItems[T any](
// get the next page of data, check for standard errors // get the next page of data, check for standard errors
page, err := pager.GetPage(ctx) page, err := pager.GetPage(ctx)
if err != nil { if err != nil {
return graph.Stack(ctx, err) npr.err = graph.Stack(ctx, err)
return
} }
ch <- NextPage[T]{Items: page.GetValue()} npr.pages <- nextPage[T]{items: page.GetValue()}
nextLink = NextLink(page) nextLink = NextLink(page)
@ -134,8 +177,6 @@ func enumerateItems[T any](
} }
logger.Ctx(ctx).Infow("completed delta item enumeration", "result_count", len(result)) logger.Ctx(ctx).Infow("completed delta item enumeration", "result_count", len(result))
return nil
} }
func batchEnumerateItems[T any]( func batchEnumerateItems[T any](
@ -143,19 +184,21 @@ func batchEnumerateItems[T any](
pager Pager[T], pager Pager[T],
) ([]T, error) { ) ([]T, error) {
var ( var (
ch = make(chan NextPage[T]) npr = nextPageResults[T]{
results = []T{} pages: make(chan nextPage[T]),
}
items = []T{}
) )
go func() { go enumerateItems[T](ctx, pager, &npr)
for np := range ch {
results = append(results, np.Items...)
}
}()
err := enumerateItems[T](ctx, pager, ch) for is, _, done := npr.NextPage(); !done; {
items = append(items, is...)
}
return results, clues.Stack(err).OrNil() _, err := npr.Results()
return items, clues.Stack(err).OrNil()
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -177,10 +220,10 @@ type DeltaPager[T any] interface {
func deltaEnumerateItems[T any]( func deltaEnumerateItems[T any](
ctx context.Context, ctx context.Context,
pager DeltaPager[T], pager DeltaPager[T],
ch chan<- NextPage[T], npr *nextPageResults[T],
prevDeltaLink string, prevDeltaLink string,
) (DeltaUpdate, error) { ) {
defer close(ch) defer npr.close()
var ( var (
result = make([]T, 0) result = make([]T, 0)
@ -203,9 +246,10 @@ func deltaEnumerateItems[T any](
logger.Ctx(ctx).Infow("delta queries not supported") logger.Ctx(ctx).Infow("delta queries not supported")
pager.Reset(ctx) pager.Reset(ctx)
ch <- NextPage[T]{Reset: true} npr.pages <- nextPage[T]{reset: true}
npr.err = clues.Stack(err)
return DeltaUpdate{}, clues.Stack(err) return
} }
if graph.IsErrInvalidDelta(err) { if graph.IsErrInvalidDelta(err) {
@ -218,16 +262,17 @@ func deltaEnumerateItems[T any](
// Reset tells the pager to try again after ditching its delta history. // Reset tells the pager to try again after ditching its delta history.
pager.Reset(ctx) pager.Reset(ctx)
ch <- NextPage[T]{Reset: true} npr.pages <- nextPage[T]{reset: true}
continue continue
} }
if err != nil { if err != nil {
return DeltaUpdate{}, clues.Stack(err) npr.err = clues.Stack(err)
return
} }
ch <- NextPage[T]{Items: page.GetValue()} npr.pages <- nextPage[T]{items: page.GetValue()}
nl, deltaLink := NextAndDeltaLink(page) nl, deltaLink := NextAndDeltaLink(page)
if len(deltaLink) > 0 { if len(deltaLink) > 0 {
@ -240,12 +285,10 @@ func deltaEnumerateItems[T any](
logger.Ctx(ctx).Debugw("completed delta item enumeration", "result_count", len(result)) logger.Ctx(ctx).Debugw("completed delta item enumeration", "result_count", len(result))
du := DeltaUpdate{ npr.du = DeltaUpdate{
URL: newDeltaLink, URL: newDeltaLink,
Reset: invalidPrevDelta, Reset: invalidPrevDelta,
} }
return du, nil
} }
func batchDeltaEnumerateItems[T any]( func batchDeltaEnumerateItems[T any](
@ -254,21 +297,23 @@ func batchDeltaEnumerateItems[T any](
prevDeltaLink string, prevDeltaLink string,
) ([]T, DeltaUpdate, error) { ) ([]T, DeltaUpdate, error) {
var ( var (
ch = make(chan NextPage[T]) npr = nextPageResults[T]{
pages: make(chan nextPage[T]),
}
results = []T{} results = []T{}
) )
go func() { go deltaEnumerateItems[T](ctx, pager, &npr, prevDeltaLink)
for np := range ch {
if np.Reset {
results = []T{}
}
results = append(results, np.Items...) for is, reset, done := npr.NextPage(); !done; {
if reset {
results = []T{}
} }
}()
du, err := deltaEnumerateItems[T](ctx, pager, ch, prevDeltaLink) results = append(results, is...)
}
du, err := npr.Results()
return results, du, clues.Stack(err).OrNil() return results, du, clues.Stack(err).OrNil()
} }