fixing up npe possibilities in pager channel handling

This commit is contained in:
ryanfkeepers 2023-10-10 10:37:39 -06:00
parent eb0219de6e
commit 41130a2d5f
2 changed files with 55 additions and 15 deletions

View File

@ -365,7 +365,7 @@ func GetAllDrives(
// no license or drives available. // no license or drives available.
// return a non-error and let the caller assume an empty result set. // return a non-error and let the caller assume an empty result set.
// TODO: is this the best wayy to handle this? // TODO: is this the best way to handle this?
// what about returning a ResourceNotFound error as is standard elsewhere? // what about returning a ResourceNotFound error as is standard elsewhere?
if err != nil && if err != nil &&
(clues.HasLabel(err, graph.LabelsMysiteNotFound) || clues.HasLabel(err, graph.LabelsNoSharePointLicense)) { (clues.HasLabel(err, graph.LabelsMysiteNotFound) || clues.HasLabel(err, graph.LabelsNoSharePointLicense)) {

View File

@ -28,15 +28,19 @@ type DeltaUpdate struct {
} }
type NextPager[T any] interface { type NextPager[T any] interface {
// reset should only true on the iteration where the delta pager's Reset()
// is called. Callers can use it to reset any data aggregation they
// currently use. After that loop it can be false again, though the
// DeltaUpdate will eventually contain the expected reset value.
// Items may or may not be >0 when reset == true. In that case, the
// items should always represent the next page of data following a reset.
// Callers should always handle the reset first, and follow-up with
// item population.
NextPage() (items []T, reset, done bool) NextPage() (items []T, reset, done bool)
} }
type nextPage[T any] struct { type nextPage[T any] struct {
items []T items []T
// reset is only true on the iteration where the delta pager's Reset()
// is called. Callers can use it to reset any data aggregation they
// currently use. After that loop, it will be false again, though the
// DeltaUpdate will still contain the expected value.
reset bool reset bool
} }
@ -54,6 +58,26 @@ type nextPageResults[T any] struct {
err error err error
} }
func (npr *nextPageResults[T]) writeNextPage(
ctx context.Context,
items []T,
reset bool,
) error {
if npr.pages == nil {
return clues.New("pager already closed")
}
select {
case <-ctx.Done():
return clues.Wrap(context.Canceled, "writing next page")
case npr.pages <- nextPage[T]{
items: items,
reset: reset,
}:
return nil
}
}
func (npr *nextPageResults[T]) NextPage() ([]T, bool, bool) { func (npr *nextPageResults[T]) NextPage() ([]T, bool, bool) {
if npr.pages == nil { if npr.pages == nil {
return nil, false, true return nil, false, true
@ -65,16 +89,19 @@ func (npr *nextPageResults[T]) NextPage() ([]T, bool, bool) {
} }
func (npr *nextPageResults[T]) Results() (DeltaUpdate, error) { func (npr *nextPageResults[T]) Results() (DeltaUpdate, error) {
// if the pager hasn't closed yet, drain out the pages iterator if npr.pages != nil {
// to avoid leaking routines, and to ensure we get results. //nolint:revive
for npr.pages != nil { for range npr.pages {
<-npr.pages // if the pager hasn't closed yet, drain out the pages iterator
// to avoid leaking routines, and to ensure we get results.
}
} }
return npr.du, npr.err return npr.du, npr.err
} }
func (npr *nextPageResults[T]) close() { func (npr *nextPageResults[T]) close() {
close(npr.pages)
npr.pages = nil npr.pages = nil
} }
@ -169,7 +196,10 @@ func enumerateItems[T any](
return return
} }
npr.pages <- nextPage[T]{items: page.GetValue()} if err := npr.writeNextPage(ctx, page.GetValue(), false); err != nil {
npr.err = clues.Stack(err)
return
}
nextLink = NextLink(page) nextLink = NextLink(page)
@ -215,8 +245,6 @@ type DeltaPager[T any] interface {
// enumerates pages of items, streaming each page to the provided channel. // enumerates pages of items, streaming each page to the provided channel.
// the DeltaUpdate, reset notifications, and any errors are also fed to the // the DeltaUpdate, reset notifications, and any errors are also fed to the
// same channel. // same channel.
// Returns false if conditions disallow making delta calls for the provided
// pager. Returns true otherwise, even in the event of an error.
func deltaEnumerateItems[T any]( func deltaEnumerateItems[T any](
ctx context.Context, ctx context.Context,
pager DeltaPager[T], pager DeltaPager[T],
@ -246,7 +274,12 @@ func deltaEnumerateItems[T any](
logger.Ctx(ctx).Infow("delta queries not supported") logger.Ctx(ctx).Infow("delta queries not supported")
pager.Reset(ctx) pager.Reset(ctx)
npr.pages <- nextPage[T]{reset: true}
if err := npr.writeNextPage(ctx, nil, true); err != nil {
npr.err = clues.Stack(err)
return
}
npr.err = clues.Stack(err) npr.err = clues.Stack(err)
return return
@ -262,7 +295,11 @@ func deltaEnumerateItems[T any](
// Reset tells the pager to try again after ditching its delta history. // Reset tells the pager to try again after ditching its delta history.
pager.Reset(ctx) pager.Reset(ctx)
npr.pages <- nextPage[T]{reset: true}
if err := npr.writeNextPage(ctx, nil, true); err != nil {
npr.err = clues.Stack(err)
return
}
continue continue
} }
@ -272,7 +309,10 @@ func deltaEnumerateItems[T any](
return return
} }
npr.pages <- nextPage[T]{items: page.GetValue()} if err := npr.writeNextPage(ctx, page.GetValue(), false); err != nil {
npr.err = clues.Stack(err)
return
}
nl, deltaLink := NextAndDeltaLink(page) nl, deltaLink := NextAndDeltaLink(page)
if len(deltaLink) > 0 { if len(deltaLink) > 0 {