From eb0219de6e498ca913f3fc8938a10786f4dbbdca Mon Sep 17 00:00:00 2001 From: ryanfkeepers Date: Mon, 9 Oct 2023 16:53:05 -0600 Subject: [PATCH] replace channel creation with pager interface instead of having callers pass in channels, create channels when starting an api enumeration and return a pager to the caller which allows them to retrieve each page at a time. --- .../m365/collection/drive/handlers.go | 7 +- .../m365/collection/drive/item_handler.go | 2 +- .../m365/collection/drive/library_handler.go | 2 +- .../m365/service/onedrive/mock/handlers.go | 6 +- src/pkg/services/m365/api/drive_pager.go | 16 ++- src/pkg/services/m365/api/drive_pager_test.go | 21 +-- src/pkg/services/m365/api/item_pager.go | 125 ++++++++++++------ 7 files changed, 111 insertions(+), 68 deletions(-) diff --git a/src/internal/m365/collection/drive/handlers.go b/src/internal/m365/collection/drive/handlers.go index 9c803b93f..4d0b3fc4e 100644 --- a/src/internal/m365/collection/drive/handlers.go +++ b/src/internal/m365/collection/drive/handlers.go @@ -86,12 +86,7 @@ type EnumerateDriveItemsDeltaer interface { EnumerateDriveItemsDelta( ctx context.Context, driveID, prevDeltaLink string, - selectProps []string, - ) ( - []models.DriveItemable, - api.DeltaUpdate, - error, - ) + ) api.NextPageResulter[models.DriveItemable] } // --------------------------------------------------------------------------- diff --git a/src/internal/m365/collection/drive/item_handler.go b/src/internal/m365/collection/drive/item_handler.go index 4804db187..1f7bf328d 100644 --- a/src/internal/m365/collection/drive/item_handler.go +++ b/src/internal/m365/collection/drive/item_handler.go @@ -138,7 +138,7 @@ func (h itemBackupHandler) EnumerateDriveItemsDelta( ctx context.Context, driveID, prevDeltaLink string, selectProps []string, -) ([]models.DriveItemable, api.DeltaUpdate, error) { +) (api.NextPageResulter[models.DriveItemable], error) { return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, selectProps) } diff --git a/src/internal/m365/collection/drive/library_handler.go b/src/internal/m365/collection/drive/library_handler.go index b64eaaee7..42aefa507 100644 --- a/src/internal/m365/collection/drive/library_handler.go +++ b/src/internal/m365/collection/drive/library_handler.go @@ -141,7 +141,7 @@ func (h libraryBackupHandler) EnumerateDriveItemsDelta( ctx context.Context, driveID, prevDeltaLink string, selectProps []string, -) ([]models.DriveItemable, api.DeltaUpdate, error) { +) api.NextPageResulter[models.DriveItemable] { return h.ac.EnumerateDriveItemsDelta(ctx, driveID, prevDeltaLink, selectProps) } diff --git a/src/internal/m365/service/onedrive/mock/handlers.go b/src/internal/m365/service/onedrive/mock/handlers.go index 568644d98..c0bdd81f0 100644 --- a/src/internal/m365/service/onedrive/mock/handlers.go +++ b/src/internal/m365/service/onedrive/mock/handlers.go @@ -288,11 +288,7 @@ func (edi EnumeratesDriveItemsDelta) EnumerateDriveItemsDelta( _ context.Context, driveID, _ string, _ []string, -) ( - []models.DriveItemable, - api.DeltaUpdate, - error, -) { +) api.NextPageResulter[models.DriveItemable] { return edi.Items[driveID], edi.DeltaUpdate[driveID], edi.Err[driveID] } diff --git a/src/pkg/services/m365/api/drive_pager.go b/src/pkg/services/m365/api/drive_pager.go index 4e6ee2845..1637b347d 100644 --- a/src/pkg/services/m365/api/drive_pager.go +++ b/src/pkg/services/m365/api/drive_pager.go @@ -201,23 +201,29 @@ func (p *DriveItemDeltaPageCtrl) ValidModTimes() bool { // by page, along with the delta update and any errors, to the provided channel. func (c Drives) EnumerateDriveItemsDelta( ctx context.Context, - ch chan<- NextPage[models.DriveItemable], driveID string, prevDeltaLink string, selectProps []string, -) (DeltaUpdate, error) { +) NextPageResulter[models.DriveItemable] { deltaPager := c.newDriveItemDeltaPager( driveID, prevDeltaLink, selectProps...) - du, err := deltaEnumerateItems[models.DriveItemable]( + npr := &nextPageResults[models.DriveItemable]{ + pages: make(chan nextPage[models.DriveItemable]), + } + + // asynchronously enumerate pages on the caller's behalf. + // they only need to consume the pager and call Results at + // the end. + go deltaEnumerateItems[models.DriveItemable]( ctx, deltaPager, - ch, + npr, prevDeltaLink) - return du, clues.Stack(err).OrNil() + return npr } // --------------------------------------------------------------------------- diff --git a/src/pkg/services/m365/api/drive_pager_test.go b/src/pkg/services/m365/api/drive_pager_test.go index 4b5eae57c..b06ad20f7 100644 --- a/src/pkg/services/m365/api/drive_pager_test.go +++ b/src/pkg/services/m365/api/drive_pager_test.go @@ -186,21 +186,22 @@ func (suite *DrivePagerIntgSuite) TestEnumerateDriveItems() { ctx, flush := tester.NewContext(t) defer flush() - ch := make(chan api.NextPage[models.DriveItemable], 1) items := []models.DriveItemable{} - go func() { - for np := range ch { - items = append(items, np.Items...) - assert.False(t, np.Reset, "should not reset") - } - }() - - du, err := suite.its. + pager := suite.its. ac. Drives(). EnumerateDriveItemsDelta(ctx, suite.its.user.driveID, "", api.DefaultDriveItemProps()) + + for page, reset, done := pager.NextPage(); !done; { + items = append(items, page...) + + assert.False(t, reset, "should not reset") + } + + du, err := pager.Results() + require.NoError(t, err, clues.ToCore(err)) - require.NotEmpty(t, items, "no items found in user's drive") + require.NotEmpty(t, items, "should find items in user's drive") assert.NotEmpty(t, du.URL, "should have a delta link") } diff --git a/src/pkg/services/m365/api/item_pager.go b/src/pkg/services/m365/api/item_pager.go index fb55add33..d12b368f3 100644 --- a/src/pkg/services/m365/api/item_pager.go +++ b/src/pkg/services/m365/api/item_pager.go @@ -27,13 +27,55 @@ type DeltaUpdate struct { Reset bool } -type NextPage[T any] struct { - Items []T - // Reset is only true on the iteration where the delta pager's Reset() +type NextPager[T any] interface { + NextPage() (items []T, reset, done bool) +} + +type nextPage[T any] struct { + items []T + // reset is only true on the iteration where the delta pager's Reset() // is called. Callers can use it to reset any data aggregation they // currently use. After that loop, it will be false again, though the // DeltaUpdate will still contain the expected value. - Reset bool + reset bool +} + +type NextPageResulter[T any] interface { + NextPager[T] + + Results() (DeltaUpdate, error) +} + +var _ NextPageResulter[any] = &nextPageResults[any]{} + +type nextPageResults[T any] struct { + pages chan nextPage[T] + du DeltaUpdate + err error +} + +func (npr *nextPageResults[T]) NextPage() ([]T, bool, bool) { + if npr.pages == nil { + return nil, false, true + } + + np := <-npr.pages + + return np.items, np.reset, false +} + +func (npr *nextPageResults[T]) Results() (DeltaUpdate, error) { + // if the pager hasn't closed yet, drain out the pages iterator + // to avoid leaking routines, and to ensure we get results. + for npr.pages != nil { + <-npr.pages + } + + return npr.du, npr.err +} + +func (npr *nextPageResults[T]) close() { + npr.pages = nil } // --------------------------------------------------------------------------- @@ -109,9 +151,9 @@ type Pager[T any] interface { func enumerateItems[T any]( ctx context.Context, pager Pager[T], - ch chan<- NextPage[T], -) error { - defer close(ch) + npr *nextPageResults[T], +) { + defer npr.close() var ( result = make([]T, 0) @@ -123,10 +165,11 @@ func enumerateItems[T any]( // get the next page of data, check for standard errors page, err := pager.GetPage(ctx) if err != nil { - return graph.Stack(ctx, err) + npr.err = graph.Stack(ctx, err) + return } - ch <- NextPage[T]{Items: page.GetValue()} + npr.pages <- nextPage[T]{items: page.GetValue()} nextLink = NextLink(page) @@ -134,8 +177,6 @@ func enumerateItems[T any]( } logger.Ctx(ctx).Infow("completed delta item enumeration", "result_count", len(result)) - - return nil } func batchEnumerateItems[T any]( @@ -143,19 +184,21 @@ func batchEnumerateItems[T any]( pager Pager[T], ) ([]T, error) { var ( - ch = make(chan NextPage[T]) - results = []T{} + npr = nextPageResults[T]{ + pages: make(chan nextPage[T]), + } + items = []T{} ) - go func() { - for np := range ch { - results = append(results, np.Items...) - } - }() + go enumerateItems[T](ctx, pager, &npr) - err := enumerateItems[T](ctx, pager, ch) + for is, _, done := npr.NextPage(); !done; { + items = append(items, is...) + } - return results, clues.Stack(err).OrNil() + _, err := npr.Results() + + return items, clues.Stack(err).OrNil() } // --------------------------------------------------------------------------- @@ -177,10 +220,10 @@ type DeltaPager[T any] interface { func deltaEnumerateItems[T any]( ctx context.Context, pager DeltaPager[T], - ch chan<- NextPage[T], + npr *nextPageResults[T], prevDeltaLink string, -) (DeltaUpdate, error) { - defer close(ch) +) { + defer npr.close() var ( result = make([]T, 0) @@ -203,9 +246,10 @@ func deltaEnumerateItems[T any]( logger.Ctx(ctx).Infow("delta queries not supported") pager.Reset(ctx) - ch <- NextPage[T]{Reset: true} + npr.pages <- nextPage[T]{reset: true} + npr.err = clues.Stack(err) - return DeltaUpdate{}, clues.Stack(err) + return } if graph.IsErrInvalidDelta(err) { @@ -218,16 +262,17 @@ func deltaEnumerateItems[T any]( // Reset tells the pager to try again after ditching its delta history. pager.Reset(ctx) - ch <- NextPage[T]{Reset: true} + npr.pages <- nextPage[T]{reset: true} continue } if err != nil { - return DeltaUpdate{}, clues.Stack(err) + npr.err = clues.Stack(err) + return } - ch <- NextPage[T]{Items: page.GetValue()} + npr.pages <- nextPage[T]{items: page.GetValue()} nl, deltaLink := NextAndDeltaLink(page) if len(deltaLink) > 0 { @@ -240,12 +285,10 @@ func deltaEnumerateItems[T any]( logger.Ctx(ctx).Debugw("completed delta item enumeration", "result_count", len(result)) - du := DeltaUpdate{ + npr.du = DeltaUpdate{ URL: newDeltaLink, Reset: invalidPrevDelta, } - - return du, nil } func batchDeltaEnumerateItems[T any]( @@ -254,21 +297,23 @@ func batchDeltaEnumerateItems[T any]( prevDeltaLink string, ) ([]T, DeltaUpdate, error) { var ( - ch = make(chan NextPage[T]) + npr = nextPageResults[T]{ + pages: make(chan nextPage[T]), + } results = []T{} ) - go func() { - for np := range ch { - if np.Reset { - results = []T{} - } + go deltaEnumerateItems[T](ctx, pager, &npr, prevDeltaLink) - results = append(results, np.Items...) + for is, reset, done := npr.NextPage(); !done; { + if reset { + results = []T{} } - }() - du, err := deltaEnumerateItems[T](ctx, pager, ch, prevDeltaLink) + results = append(results, is...) + } + + du, err := npr.Results() return results, du, clues.Stack(err).OrNil() }