diff --git a/src/pkg/services/m365/api/channels_pager.go b/src/pkg/services/m365/api/channels_pager.go index e9fb76637..10bfaff4a 100644 --- a/src/pkg/services/m365/api/channels_pager.go +++ b/src/pkg/services/m365/api/channels_pager.go @@ -227,7 +227,7 @@ func (c Channels) GetChannelMessageReplies( ctx context.Context, teamID, channelID, messageID string, ) ([]models.ChatMessageable, error) { - return enumerateItems[models.ChatMessageable]( + return batchEnumerateItems[models.ChatMessageable]( ctx, c.NewChannelMessageRepliesPager(teamID, channelID, messageID)) } @@ -284,5 +284,5 @@ func (c Channels) GetChannels( ctx context.Context, teamID string, ) ([]models.Channelable, error) { - return enumerateItems[models.Channelable](ctx, c.NewChannelPager(teamID)) + return batchEnumerateItems[models.Channelable](ctx, c.NewChannelPager(teamID)) } diff --git a/src/pkg/services/m365/api/contacts_pager.go b/src/pkg/services/m365/api/contacts_pager.go index 5d43a5172..068e7406c 100644 --- a/src/pkg/services/m365/api/contacts_pager.go +++ b/src/pkg/services/m365/api/contacts_pager.go @@ -84,7 +84,7 @@ func (c Contacts) EnumerateContainers( pgr = c.NewContactFoldersPager(userID, baseContainerID, immutableIDs) ) - containers, err := enumerateItems(ctx, pgr) + containers, err := batchEnumerateItems(ctx, pgr) if err != nil { return graph.Stack(ctx, err) } @@ -165,7 +165,7 @@ func (c Contacts) GetItemsInContainerByCollisionKey( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewContactsPager(userID, containerID, false, contactCollisionKeyProps()...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating contacts") } @@ -186,7 +186,7 @@ func (c Contacts) GetItemIDsInContainer( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewContactsPager(userID, containerID, false, idAnd()...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating contacts") } diff --git a/src/pkg/services/m365/api/drive_pager.go b/src/pkg/services/m365/api/drive_pager.go index 2a9527712..4e6ee2845 100644 --- a/src/pkg/services/m365/api/drive_pager.go +++ b/src/pkg/services/m365/api/drive_pager.go @@ -77,7 +77,7 @@ func (c Drives) GetItemsInContainerByCollisionKey( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewDriveItemPager(driveID, containerID, idAnd("name")...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating drive items") } @@ -101,7 +101,7 @@ func (c Drives) GetItemIDsInContainer( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewDriveItemPager(driveID, containerID, idAnd("file", "folder")...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating contacts") } @@ -197,26 +197,27 @@ func (p *DriveItemDeltaPageCtrl) ValidModTimes() bool { return true } -// EnumerateDriveItems will enumerate all items in the specified drive and hand them to the -// provided `collector` method +// EnumerateDriveItems will enumerate all items in the specified drive and stream them page +// by page, along with the delta update and any errors, to the provided channel. func (c Drives) EnumerateDriveItemsDelta( ctx context.Context, + ch chan<- NextPage[models.DriveItemable], driveID string, prevDeltaLink string, selectProps []string, -) ( - []models.DriveItemable, - DeltaUpdate, - error, -) { - pager := c.newDriveItemDeltaPager(driveID, prevDeltaLink, selectProps...) +) (DeltaUpdate, error) { + deltaPager := c.newDriveItemDeltaPager( + driveID, + prevDeltaLink, + selectProps...) - items, du, err := deltaEnumerateItems[models.DriveItemable](ctx, pager, prevDeltaLink) - if err != nil { - return nil, du, clues.Stack(err) - } + du, err := deltaEnumerateItems[models.DriveItemable]( + ctx, + deltaPager, + ch, + prevDeltaLink) - return items, du, nil + return du, clues.Stack(err).OrNil() } // --------------------------------------------------------------------------- @@ -354,11 +355,17 @@ func GetAllDrives( ctx context.Context, pager Pager[models.Driveable], ) ([]models.Driveable, error) { - ds, err := enumerateItems(ctx, pager) - if err != nil && (clues.HasLabel(err, graph.LabelsMysiteNotFound) || - clues.HasLabel(err, graph.LabelsNoSharePointLicense)) { + ds, err := batchEnumerateItems(ctx, pager) + + // no license or drives available. + // return a non-error and let the caller assume an empty result set. + // TODO: is this the best wayy to handle this? + // what about returning a ResourceNotFound error as is standard elsewhere? + if err != nil && + (clues.HasLabel(err, graph.LabelsMysiteNotFound) || clues.HasLabel(err, graph.LabelsNoSharePointLicense)) { logger.CtxErr(ctx, err).Infof("resource owner does not have a drive") - return make([]models.Driveable, 0), nil // no license or drives. + + return make([]models.Driveable, 0), nil } return ds, graph.Stack(ctx, err).OrNil() diff --git a/src/pkg/services/m365/api/drive_pager_test.go b/src/pkg/services/m365/api/drive_pager_test.go index fad440ee0..4b5eae57c 100644 --- a/src/pkg/services/m365/api/drive_pager_test.go +++ b/src/pkg/services/m365/api/drive_pager_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/alcionai/clues" + "github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -185,7 +186,17 @@ func (suite *DrivePagerIntgSuite) TestEnumerateDriveItems() { ctx, flush := tester.NewContext(t) defer flush() - items, du, err := suite.its. + ch := make(chan api.NextPage[models.DriveItemable], 1) + items := []models.DriveItemable{} + + go func() { + for np := range ch { + items = append(items, np.Items...) + assert.False(t, np.Reset, "should not reset") + } + }() + + du, err := suite.its. ac. Drives(). EnumerateDriveItemsDelta(ctx, suite.its.user.driveID, "", api.DefaultDriveItemProps()) diff --git a/src/pkg/services/m365/api/events_pager.go b/src/pkg/services/m365/api/events_pager.go index 38c985770..05474367f 100644 --- a/src/pkg/services/m365/api/events_pager.go +++ b/src/pkg/services/m365/api/events_pager.go @@ -85,7 +85,7 @@ func (c Events) EnumerateContainers( pgr = c.NewEventCalendarsPager(userID, immutableIDs) ) - containers, err := enumerateItems(ctx, pgr) + containers, err := batchEnumerateItems(ctx, pgr) if err != nil { return graph.Stack(ctx, err) } @@ -169,7 +169,7 @@ func (c Events) GetItemsInContainerByCollisionKey( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewEventsPager(userID, containerID, false, eventCollisionKeyProps()...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating events") } @@ -190,7 +190,7 @@ func (c Events) GetItemIDsInContainer( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewEventsPager(userID, containerID, false, idAnd()...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating events") } diff --git a/src/pkg/services/m365/api/item_pager.go b/src/pkg/services/m365/api/item_pager.go index a64ae71b7..fb55add33 100644 --- a/src/pkg/services/m365/api/item_pager.go +++ b/src/pkg/services/m365/api/item_pager.go @@ -27,6 +27,15 @@ type DeltaUpdate struct { Reset bool } +type NextPage[T any] struct { + Items []T + // Reset is only true on the iteration where the delta pager's Reset() + // is called. Callers can use it to reset any data aggregation they + // currently use. After that loop, it will be false again, though the + // DeltaUpdate will still contain the expected value. + Reset bool +} + // --------------------------------------------------------------------------- // common interfaces // --------------------------------------------------------------------------- @@ -100,7 +109,10 @@ type Pager[T any] interface { func enumerateItems[T any]( ctx context.Context, pager Pager[T], -) ([]T, error) { + ch chan<- NextPage[T], +) error { + defer close(ch) + var ( result = make([]T, 0) // stubbed initial value to ensure we enter the loop. @@ -111,10 +123,11 @@ func enumerateItems[T any]( // get the next page of data, check for standard errors page, err := pager.GetPage(ctx) if err != nil { - return nil, graph.Stack(ctx, err) + return graph.Stack(ctx, err) } - result = append(result, page.GetValue()...) + ch <- NextPage[T]{Items: page.GetValue()} + nextLink = NextLink(page) pager.SetNextLink(nextLink) @@ -122,7 +135,27 @@ func enumerateItems[T any]( logger.Ctx(ctx).Infow("completed delta item enumeration", "result_count", len(result)) - return result, nil + return nil +} + +func batchEnumerateItems[T any]( + ctx context.Context, + pager Pager[T], +) ([]T, error) { + var ( + ch = make(chan NextPage[T]) + results = []T{} + ) + + go func() { + for np := range ch { + results = append(results, np.Items...) + } + }() + + err := enumerateItems[T](ctx, pager, ch) + + return results, clues.Stack(err).OrNil() } // --------------------------------------------------------------------------- @@ -136,11 +169,19 @@ type DeltaPager[T any] interface { ValidModTimer } +// enumerates pages of items, streaming each page to the provided channel. +// the DeltaUpdate, reset notifications, and any errors are also fed to the +// same channel. +// Returns false if conditions disallow making delta calls for the provided +// pager. Returns true otherwise, even in the event of an error. func deltaEnumerateItems[T any]( ctx context.Context, pager DeltaPager[T], + ch chan<- NextPage[T], prevDeltaLink string, -) ([]T, DeltaUpdate, error) { +) (DeltaUpdate, error) { + defer close(ch) + var ( result = make([]T, 0) // stubbed initial value to ensure we enter the loop. @@ -160,28 +201,33 @@ func deltaEnumerateItems[T any]( page, err := pager.GetPage(graph.ConsumeNTokens(ctx, consume)) if graph.IsErrDeltaNotSupported(err) { logger.Ctx(ctx).Infow("delta queries not supported") - return nil, DeltaUpdate{}, clues.Stack(graph.ErrDeltaNotSupported, err) + + pager.Reset(ctx) + ch <- NextPage[T]{Reset: true} + + return DeltaUpdate{}, clues.Stack(err) } if graph.IsErrInvalidDelta(err) { logger.Ctx(ctx).Infow("invalid previous delta", "delta_link", prevDeltaLink) invalidPrevDelta = true - // Reset limiter consumption since we don't have a valid delta token. + + // Set limiter consumption rate to non-delta. consume = graph.DeltaNoTokenLC - result = make([]T, 0) // Reset tells the pager to try again after ditching its delta history. pager.Reset(ctx) + ch <- NextPage[T]{Reset: true} continue } if err != nil { - return nil, DeltaUpdate{}, graph.Wrap(ctx, err, "retrieving page") + return DeltaUpdate{}, clues.Stack(err) } - result = append(result, page.GetValue()...) + ch <- NextPage[T]{Items: page.GetValue()} nl, deltaLink := NextAndDeltaLink(page) if len(deltaLink) > 0 { @@ -199,7 +245,32 @@ func deltaEnumerateItems[T any]( Reset: invalidPrevDelta, } - return result, du, nil + return du, nil +} + +func batchDeltaEnumerateItems[T any]( + ctx context.Context, + pager DeltaPager[T], + prevDeltaLink string, +) ([]T, DeltaUpdate, error) { + var ( + ch = make(chan NextPage[T]) + results = []T{} + ) + + go func() { + for np := range ch { + if np.Reset { + results = []T{} + } + + results = append(results, np.Items...) + } + }() + + du, err := deltaEnumerateItems[T](ctx, pager, ch, prevDeltaLink) + + return results, du, clues.Stack(err).OrNil() } // --------------------------------------------------------------------------- @@ -217,7 +288,7 @@ func getAddedAndRemovedItemIDs[T any]( aarh addedAndRemovedHandler[T], ) (map[string]time.Time, bool, []string, DeltaUpdate, error) { if canMakeDeltaQueries { - ts, du, err := deltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink) + ts, du, err := batchDeltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink) if err != nil && !graph.IsErrInvalidDelta(err) && !graph.IsErrDeltaNotSupported(err) { return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err) } @@ -230,7 +301,7 @@ func getAddedAndRemovedItemIDs[T any]( du := DeltaUpdate{Reset: true} - ts, err := enumerateItems(ctx, pager) + ts, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err) } diff --git a/src/pkg/services/m365/api/item_pager_test.go b/src/pkg/services/m365/api/item_pager_test.go index 5644fbfc2..0e12f7d9a 100644 --- a/src/pkg/services/m365/api/item_pager_test.go +++ b/src/pkg/services/m365/api/item_pager_test.go @@ -245,7 +245,7 @@ func (suite *PagerUnitSuite) TestEnumerateItems() { pageErr: assert.AnError, } }, - expect: nil, + expect: []any{}, expectErr: require.Error, }, } @@ -257,7 +257,7 @@ func (suite *PagerUnitSuite) TestEnumerateItems() { ctx, flush := tester.NewContext(t) defer flush() - result, err := enumerateItems(ctx, test.getPager(t, ctx)) + result, err := batchEnumerateItems(ctx, test.getPager(t, ctx)) test.expectErr(t, err, clues.ToCore(err)) require.EqualValues(t, test.expect, result) diff --git a/src/pkg/services/m365/api/mail_pager.go b/src/pkg/services/m365/api/mail_pager.go index 1aa6c8414..7882de7cd 100644 --- a/src/pkg/services/m365/api/mail_pager.go +++ b/src/pkg/services/m365/api/mail_pager.go @@ -81,7 +81,7 @@ func (c Mail) EnumerateContainers( pgr = c.NewMailFoldersPager(userID, immutableIDs) ) - containers, err := enumerateItems(ctx, pgr) + containers, err := batchEnumerateItems(ctx, pgr) if err != nil { return graph.Stack(ctx, err) } @@ -162,7 +162,7 @@ func (c Mail) GetItemsInContainerByCollisionKey( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewMailPager(userID, containerID, false, mailCollisionKeyProps()...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating mails") } @@ -183,7 +183,7 @@ func (c Mail) GetItemIDsInContainer( ctx = clues.Add(ctx, "container_id", containerID) pager := c.NewMailPager(userID, containerID, false, idAnd()...) - items, err := enumerateItems(ctx, pager) + items, err := batchEnumerateItems(ctx, pager) if err != nil { return nil, graph.Wrap(ctx, err, "enumerating mails") }