Return delta tokens from fetch functions (#1717)

## Description

When fetching Exchange items, return the resulting delta token from the query so it can be reused later.

## Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🐹 Trivial/Minor

## Issue(s)

* #1685 

## Test Plan

<!-- How will this be tested prior to merging.-->
- [x] 💪 Manual
- [ ]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2022-12-06 14:12:08 -08:00 committed by GitHub
parent ff715f4d6b
commit db020305f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -19,7 +19,10 @@ import (
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
) )
const nextLinkKey = "@odata.nextLink" const (
nextLinkKey = "@odata.nextLink"
deltaLinkKey = "@odata.deltaLink"
)
// getAdditionalDataString gets a string value from the AdditionalData map. If // getAdditionalDataString gets a string value from the AdditionalData map. If
// the value is not in the map returns an empty string. // the value is not in the map returns an empty string.
@ -100,7 +103,7 @@ func FilterContainersAndFillCollections(
continue continue
} }
jobs, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, *c.GetId()) jobs, _, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, *c.GetId())
if err != nil { if err != nil {
errs = support.WrapAndAppend( errs = support.WrapAndAppend(
qp.ResourceOwner, qp.ResourceOwner,
@ -163,9 +166,14 @@ func IterativeCollectCalendarContainers(
} }
} }
// FetchIDFunc collection of helper functions which return a list of strings // FetchIDFunc collection of helper functions which return a list of all item
// from a response. // IDs in the given container and a delta token for future requests if the
type FetchIDFunc func(ctx context.Context, gs graph.Service, user, containerID string) ([]string, error) // container supports fetching delta records.
type FetchIDFunc func(
ctx context.Context,
gs graph.Service,
user, containerID string,
) ([]string, string, error)
func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) { func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) {
switch category { switch category {
@ -185,7 +193,7 @@ func FetchEventIDsFromCalendar(
ctx context.Context, ctx context.Context,
gs graph.Service, gs graph.Service,
user, calendarID string, user, calendarID string,
) ([]string, error) { ) ([]string, string, error) {
var ( var (
errs *multierror.Error errs *multierror.Error
ids []string ids []string
@ -193,7 +201,7 @@ func FetchEventIDsFromCalendar(
options, err := optionsForCalendarEvents([]string{"id"}) options, err := optionsForCalendarEvents([]string{"id"})
if err != nil { if err != nil {
return nil, err return nil, "", err
} }
builder := gs.Client(). builder := gs.Client().
@ -204,7 +212,7 @@ func FetchEventIDsFromCalendar(
for { for {
resp, err := builder.Get(ctx, options) resp, err := builder.Get(ctx, options)
if err != nil { if err != nil {
return nil, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) return nil, "", errors.Wrap(err, support.ConnectorStackErrorTrace(err))
} }
for _, item := range resp.GetValue() { for _, item := range resp.GetValue() {
@ -229,20 +237,26 @@ func FetchEventIDsFromCalendar(
builder = msevents.NewEventsRequestBuilder(*nextLink, gs.Adapter()) builder = msevents.NewEventsRequestBuilder(*nextLink, gs.Adapter())
} }
return ids, errs.ErrorOrNil() // Events don't have a delta endpoint so just return an empty string.
return ids, "", errs.ErrorOrNil()
} }
// FetchContactIDsFromDirectory function that returns a list of all the m365IDs of the contacts // FetchContactIDsFromDirectory function that returns a list of all the m365IDs of the contacts
// of the targeted directory // of the targeted directory
func FetchContactIDsFromDirectory(ctx context.Context, gs graph.Service, user, directoryID string) ([]string, error) { func FetchContactIDsFromDirectory(
ctx context.Context,
gs graph.Service,
user, directoryID string,
) ([]string, string, error) {
var ( var (
errs *multierror.Error errs *multierror.Error
ids []string ids []string
deltaToken string
) )
options, err := optionsForContactFoldersItem([]string{"parentFolderId"}) options, err := optionsForContactFoldersItem([]string{"parentFolderId"})
if err != nil { if err != nil {
return nil, errors.Wrap(err, "getting query options") return nil, deltaToken, errors.Wrap(err, "getting query options")
} }
builder := gs.Client(). builder := gs.Client().
@ -255,7 +269,7 @@ func FetchContactIDsFromDirectory(ctx context.Context, gs graph.Service, user, d
// TODO(ashmrtn): Update to pass options once graph SDK dependency is updated. // TODO(ashmrtn): Update to pass options once graph SDK dependency is updated.
resp, err := sendContactsDeltaGet(ctx, builder, options, gs.Adapter()) resp, err := sendContactsDeltaGet(ctx, builder, options, gs.Adapter())
if err != nil { if err != nil {
return nil, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) return nil, deltaToken, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
} }
for _, item := range resp.GetValue() { for _, item := range resp.GetValue() {
@ -274,6 +288,11 @@ func FetchContactIDsFromDirectory(ctx context.Context, gs graph.Service, user, d
addtlData := resp.GetAdditionalData() addtlData := resp.GetAdditionalData()
delta := getAdditionalDataString(deltaLinkKey, addtlData)
if len(delta) > 0 {
deltaToken = delta
}
nextLink := getAdditionalDataString(nextLinkKey, addtlData) nextLink := getAdditionalDataString(nextLinkKey, addtlData)
if len(nextLink) == 0 { if len(nextLink) == 0 {
break break
@ -282,7 +301,7 @@ func FetchContactIDsFromDirectory(ctx context.Context, gs graph.Service, user, d
builder = cdelta.NewDeltaRequestBuilder(nextLink, gs.Adapter()) builder = cdelta.NewDeltaRequestBuilder(nextLink, gs.Adapter())
} }
return ids, errs.ErrorOrNil() return ids, deltaToken, errs.ErrorOrNil()
} }
// FetchMessageIDsFromDirectory function that returns a list of all the m365IDs of the exchange.Mail // FetchMessageIDsFromDirectory function that returns a list of all the m365IDs of the exchange.Mail
@ -291,15 +310,16 @@ func FetchMessageIDsFromDirectory(
ctx context.Context, ctx context.Context,
gs graph.Service, gs graph.Service,
user, directoryID string, user, directoryID string,
) ([]string, error) { ) ([]string, string, error) {
var ( var (
errs *multierror.Error errs *multierror.Error
ids []string ids []string
deltaToken string
) )
options, err := optionsForFolderMessages([]string{"id"}) options, err := optionsForFolderMessages([]string{"id"})
if err != nil { if err != nil {
return nil, errors.Wrap(err, "getting query options") return nil, deltaToken, errors.Wrap(err, "getting query options")
} }
builder := gs.Client(). builder := gs.Client().
@ -312,7 +332,7 @@ func FetchMessageIDsFromDirectory(
// TODO(ashmrtn): Update to pass options once graph SDK dependency is updated. // TODO(ashmrtn): Update to pass options once graph SDK dependency is updated.
resp, err := sendMessagesDeltaGet(ctx, builder, options, gs.Adapter()) resp, err := sendMessagesDeltaGet(ctx, builder, options, gs.Adapter())
if err != nil { if err != nil {
return nil, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) return nil, deltaToken, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
} }
for _, item := range resp.GetValue() { for _, item := range resp.GetValue() {
@ -331,6 +351,11 @@ func FetchMessageIDsFromDirectory(
addtlData := resp.GetAdditionalData() addtlData := resp.GetAdditionalData()
delta := getAdditionalDataString(deltaLinkKey, addtlData)
if len(delta) > 0 {
deltaToken = delta
}
nextLink := getAdditionalDataString(nextLinkKey, addtlData) nextLink := getAdditionalDataString(nextLinkKey, addtlData)
if len(nextLink) == 0 { if len(nextLink) == 0 {
break break
@ -339,5 +364,5 @@ func FetchMessageIDsFromDirectory(
builder = mdelta.NewDeltaRequestBuilder(nextLink, gs.Adapter()) builder = mdelta.NewDeltaRequestBuilder(nextLink, gs.Adapter())
} }
return ids, errs.ErrorOrNil() return ids, deltaToken, errs.ErrorOrNil()
} }