From db020305f6849a3a992680f8c737af404dc4774f Mon Sep 17 00:00:00 2001 From: ashmrtn Date: Tue, 6 Dec 2022 14:12:08 -0800 Subject: [PATCH] Return delta tokens from fetch functions (#1717) ## Description When fetching Exchange items, return the resulting delta token from the query so it can be reused later. ## Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Test - [ ] :computer: CI/Deployment - [ ] :hamster: Trivial/Minor ## Issue(s) * #1685 ## Test Plan - [x] :muscle: Manual - [ ] :zap: Unit test - [ ] :green_heart: E2E --- .../connector/exchange/service_iterators.go | 67 +++++++++++++------ 1 file changed, 46 insertions(+), 21 deletions(-) diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index c21efcd52..5eb22fae4 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -19,7 +19,10 @@ import ( "github.com/alcionai/corso/src/pkg/selectors" ) -const nextLinkKey = "@odata.nextLink" +const ( + nextLinkKey = "@odata.nextLink" + deltaLinkKey = "@odata.deltaLink" +) // getAdditionalDataString gets a string value from the AdditionalData map. If // the value is not in the map returns an empty string. @@ -100,7 +103,7 @@ func FilterContainersAndFillCollections( continue } - jobs, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, *c.GetId()) + jobs, _, err := fetchFunc(ctx, edc.service, qp.ResourceOwner, *c.GetId()) if err != nil { errs = support.WrapAndAppend( qp.ResourceOwner, @@ -163,9 +166,14 @@ func IterativeCollectCalendarContainers( } } -// FetchIDFunc collection of helper functions which return a list of strings -// from a response. -type FetchIDFunc func(ctx context.Context, gs graph.Service, user, containerID string) ([]string, error) +// FetchIDFunc collection of helper functions which return a list of all item +// IDs in the given container and a delta token for future requests if the +// container supports fetching delta records. +type FetchIDFunc func( + ctx context.Context, + gs graph.Service, + user, containerID string, +) ([]string, string, error) func getFetchIDFunc(category path.CategoryType) (FetchIDFunc, error) { switch category { @@ -185,7 +193,7 @@ func FetchEventIDsFromCalendar( ctx context.Context, gs graph.Service, user, calendarID string, -) ([]string, error) { +) ([]string, string, error) { var ( errs *multierror.Error ids []string @@ -193,7 +201,7 @@ func FetchEventIDsFromCalendar( options, err := optionsForCalendarEvents([]string{"id"}) if err != nil { - return nil, err + return nil, "", err } builder := gs.Client(). @@ -204,7 +212,7 @@ func FetchEventIDsFromCalendar( for { resp, err := builder.Get(ctx, options) if err != nil { - return nil, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) + return nil, "", errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } for _, item := range resp.GetValue() { @@ -229,20 +237,26 @@ func FetchEventIDsFromCalendar( builder = msevents.NewEventsRequestBuilder(*nextLink, gs.Adapter()) } - return ids, errs.ErrorOrNil() + // Events don't have a delta endpoint so just return an empty string. + return ids, "", errs.ErrorOrNil() } // FetchContactIDsFromDirectory function that returns a list of all the m365IDs of the contacts // of the targeted directory -func FetchContactIDsFromDirectory(ctx context.Context, gs graph.Service, user, directoryID string) ([]string, error) { +func FetchContactIDsFromDirectory( + ctx context.Context, + gs graph.Service, + user, directoryID string, +) ([]string, string, error) { var ( - errs *multierror.Error - ids []string + errs *multierror.Error + ids []string + deltaToken string ) options, err := optionsForContactFoldersItem([]string{"parentFolderId"}) if err != nil { - return nil, errors.Wrap(err, "getting query options") + return nil, deltaToken, errors.Wrap(err, "getting query options") } builder := gs.Client(). @@ -255,7 +269,7 @@ func FetchContactIDsFromDirectory(ctx context.Context, gs graph.Service, user, d // TODO(ashmrtn): Update to pass options once graph SDK dependency is updated. resp, err := sendContactsDeltaGet(ctx, builder, options, gs.Adapter()) if err != nil { - return nil, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) + return nil, deltaToken, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } for _, item := range resp.GetValue() { @@ -274,6 +288,11 @@ func FetchContactIDsFromDirectory(ctx context.Context, gs graph.Service, user, d addtlData := resp.GetAdditionalData() + delta := getAdditionalDataString(deltaLinkKey, addtlData) + if len(delta) > 0 { + deltaToken = delta + } + nextLink := getAdditionalDataString(nextLinkKey, addtlData) if len(nextLink) == 0 { break @@ -282,7 +301,7 @@ func FetchContactIDsFromDirectory(ctx context.Context, gs graph.Service, user, d builder = cdelta.NewDeltaRequestBuilder(nextLink, gs.Adapter()) } - return ids, errs.ErrorOrNil() + return ids, deltaToken, errs.ErrorOrNil() } // FetchMessageIDsFromDirectory function that returns a list of all the m365IDs of the exchange.Mail @@ -291,15 +310,16 @@ func FetchMessageIDsFromDirectory( ctx context.Context, gs graph.Service, user, directoryID string, -) ([]string, error) { +) ([]string, string, error) { var ( - errs *multierror.Error - ids []string + errs *multierror.Error + ids []string + deltaToken string ) options, err := optionsForFolderMessages([]string{"id"}) if err != nil { - return nil, errors.Wrap(err, "getting query options") + return nil, deltaToken, errors.Wrap(err, "getting query options") } builder := gs.Client(). @@ -312,7 +332,7 @@ func FetchMessageIDsFromDirectory( // TODO(ashmrtn): Update to pass options once graph SDK dependency is updated. resp, err := sendMessagesDeltaGet(ctx, builder, options, gs.Adapter()) if err != nil { - return nil, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) + return nil, deltaToken, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } for _, item := range resp.GetValue() { @@ -331,6 +351,11 @@ func FetchMessageIDsFromDirectory( addtlData := resp.GetAdditionalData() + delta := getAdditionalDataString(deltaLinkKey, addtlData) + if len(delta) > 0 { + deltaToken = delta + } + nextLink := getAdditionalDataString(nextLinkKey, addtlData) if len(nextLink) == 0 { break @@ -339,5 +364,5 @@ func FetchMessageIDsFromDirectory( builder = mdelta.NewDeltaRequestBuilder(nextLink, gs.Adapter()) } - return ids, errs.ErrorOrNil() + return ids, deltaToken, errs.ErrorOrNil() }