From 41130a2d5fba2d32e37bfb3368885121a6507b68 Mon Sep 17 00:00:00 2001 From: ryanfkeepers Date: Tue, 10 Oct 2023 10:37:39 -0600 Subject: [PATCH] fixing up npe possibilities in pager channel handling --- src/pkg/services/m365/api/drive_pager.go | 2 +- src/pkg/services/m365/api/item_pager.go | 68 +++++++++++++++++++----- 2 files changed, 55 insertions(+), 15 deletions(-) diff --git a/src/pkg/services/m365/api/drive_pager.go b/src/pkg/services/m365/api/drive_pager.go index 1637b347d..a91bdff4f 100644 --- a/src/pkg/services/m365/api/drive_pager.go +++ b/src/pkg/services/m365/api/drive_pager.go @@ -365,7 +365,7 @@ func GetAllDrives( // no license or drives available. // return a non-error and let the caller assume an empty result set. - // TODO: is this the best wayy to handle this? + // TODO: is this the best way to handle this? // what about returning a ResourceNotFound error as is standard elsewhere? if err != nil && (clues.HasLabel(err, graph.LabelsMysiteNotFound) || clues.HasLabel(err, graph.LabelsNoSharePointLicense)) { diff --git a/src/pkg/services/m365/api/item_pager.go b/src/pkg/services/m365/api/item_pager.go index d12b368f3..6cd0c26eb 100644 --- a/src/pkg/services/m365/api/item_pager.go +++ b/src/pkg/services/m365/api/item_pager.go @@ -28,15 +28,19 @@ type DeltaUpdate struct { } type NextPager[T any] interface { + // reset should only true on the iteration where the delta pager's Reset() + // is called. Callers can use it to reset any data aggregation they + // currently use. After that loop it can be false again, though the + // DeltaUpdate will eventually contain the expected reset value. + // Items may or may not be >0 when reset == true. In that case, the + // items should always represent the next page of data following a reset. + // Callers should always handle the reset first, and follow-up with + // item population. NextPage() (items []T, reset, done bool) } type nextPage[T any] struct { items []T - // reset is only true on the iteration where the delta pager's Reset() - // is called. Callers can use it to reset any data aggregation they - // currently use. After that loop, it will be false again, though the - // DeltaUpdate will still contain the expected value. reset bool } @@ -54,6 +58,26 @@ type nextPageResults[T any] struct { err error } +func (npr *nextPageResults[T]) writeNextPage( + ctx context.Context, + items []T, + reset bool, +) error { + if npr.pages == nil { + return clues.New("pager already closed") + } + + select { + case <-ctx.Done(): + return clues.Wrap(context.Canceled, "writing next page") + case npr.pages <- nextPage[T]{ + items: items, + reset: reset, + }: + return nil + } +} + func (npr *nextPageResults[T]) NextPage() ([]T, bool, bool) { if npr.pages == nil { return nil, false, true @@ -65,16 +89,19 @@ func (npr *nextPageResults[T]) NextPage() ([]T, bool, bool) { } func (npr *nextPageResults[T]) Results() (DeltaUpdate, error) { - // if the pager hasn't closed yet, drain out the pages iterator - // to avoid leaking routines, and to ensure we get results. - for npr.pages != nil { - <-npr.pages + if npr.pages != nil { + //nolint:revive + for range npr.pages { + // if the pager hasn't closed yet, drain out the pages iterator + // to avoid leaking routines, and to ensure we get results. + } } return npr.du, npr.err } func (npr *nextPageResults[T]) close() { + close(npr.pages) npr.pages = nil } @@ -169,7 +196,10 @@ func enumerateItems[T any]( return } - npr.pages <- nextPage[T]{items: page.GetValue()} + if err := npr.writeNextPage(ctx, page.GetValue(), false); err != nil { + npr.err = clues.Stack(err) + return + } nextLink = NextLink(page) @@ -215,8 +245,6 @@ type DeltaPager[T any] interface { // enumerates pages of items, streaming each page to the provided channel. // the DeltaUpdate, reset notifications, and any errors are also fed to the // same channel. -// Returns false if conditions disallow making delta calls for the provided -// pager. Returns true otherwise, even in the event of an error. func deltaEnumerateItems[T any]( ctx context.Context, pager DeltaPager[T], @@ -246,7 +274,12 @@ func deltaEnumerateItems[T any]( logger.Ctx(ctx).Infow("delta queries not supported") pager.Reset(ctx) - npr.pages <- nextPage[T]{reset: true} + + if err := npr.writeNextPage(ctx, nil, true); err != nil { + npr.err = clues.Stack(err) + return + } + npr.err = clues.Stack(err) return @@ -262,7 +295,11 @@ func deltaEnumerateItems[T any]( // Reset tells the pager to try again after ditching its delta history. pager.Reset(ctx) - npr.pages <- nextPage[T]{reset: true} + + if err := npr.writeNextPage(ctx, nil, true); err != nil { + npr.err = clues.Stack(err) + return + } continue } @@ -272,7 +309,10 @@ func deltaEnumerateItems[T any]( return } - npr.pages <- nextPage[T]{items: page.GetValue()} + if err := npr.writeNextPage(ctx, page.GetValue(), false); err != nil { + npr.err = clues.Stack(err) + return + } nl, deltaLink := NextAndDeltaLink(page) if len(deltaLink) > 0 {