refactors the exchange fetchIDs iters (#1906)
## Description
Transitions the fetchIDs service iterators to a
set of interfaces to consolidate code across
multiple nearly identical variations of "fetch id
for directory".
This was originally written in another PR (1780), then
separated out to isolate changes.
## Type of change
- [x] 🐹 Trivial/Minor
## Test Plan
- [x] 💚 E2E
This commit is contained in:
parent
e88553d073
commit
1174a99e84
@ -87,18 +87,6 @@ func newService(creds account.M365Config) (*graph.Service, error) {
|
||||
return graph.NewService(adapter), nil
|
||||
}
|
||||
|
||||
func (c Client) Contacts() Contacts {
|
||||
return Contacts{c}
|
||||
}
|
||||
|
||||
func (c Client) Events() Events {
|
||||
return Events{c}
|
||||
}
|
||||
|
||||
func (c Client) Mail() Mail {
|
||||
return Mail{c}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// helper funcs
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@ -17,6 +17,11 @@ import (
|
||||
// controller
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func (c Client) Contacts() Contacts {
|
||||
return Contacts{c}
|
||||
}
|
||||
|
||||
// Contacts is an interface-compliant provider of the client.
|
||||
type Contacts struct {
|
||||
Client
|
||||
}
|
||||
@ -147,6 +152,30 @@ func (c Contacts) EnumerateContainers(
|
||||
return errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// item pager
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
var _ itemPager = &contactPager{}
|
||||
|
||||
type contactPager struct {
|
||||
gs graph.Servicer
|
||||
builder *users.ItemContactFoldersItemContactsDeltaRequestBuilder
|
||||
options *users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration
|
||||
}
|
||||
|
||||
func (p *contactPager) getPage(ctx context.Context) (pageLinker, error) {
|
||||
return p.builder.Get(ctx, p.options)
|
||||
}
|
||||
|
||||
func (p *contactPager) setNext(nextLink string) {
|
||||
p.builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(nextLink, p.gs.Adapter())
|
||||
}
|
||||
|
||||
func (p *contactPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) {
|
||||
return toValues[models.Contactable](pl)
|
||||
}
|
||||
|
||||
func (c Contacts) GetAddedAndRemovedItemIDs(
|
||||
ctx context.Context,
|
||||
user, directoryID, oldDelta string,
|
||||
@ -158,9 +187,6 @@ func (c Contacts) GetAddedAndRemovedItemIDs(
|
||||
|
||||
var (
|
||||
errs *multierror.Error
|
||||
ids []string
|
||||
removedIDs []string
|
||||
deltaURL string
|
||||
resetDelta bool
|
||||
)
|
||||
|
||||
@ -169,63 +195,17 @@ func (c Contacts) GetAddedAndRemovedItemIDs(
|
||||
return nil, nil, DeltaUpdate{}, errors.Wrap(err, "getting query options")
|
||||
}
|
||||
|
||||
getIDs := func(builder *users.ItemContactFoldersItemContactsDeltaRequestBuilder) error {
|
||||
for {
|
||||
resp, err := builder.Get(ctx, options)
|
||||
if err != nil {
|
||||
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := graph.IsErrInvalidDelta(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
|
||||
for _, item := range resp.GetValue() {
|
||||
if item.GetId() == nil {
|
||||
errs = multierror.Append(
|
||||
errs,
|
||||
errors.Errorf("item with nil ID in folder %s", directoryID),
|
||||
)
|
||||
|
||||
// TODO(ashmrtn): Handle fail-fast.
|
||||
continue
|
||||
}
|
||||
|
||||
if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil {
|
||||
ids = append(ids, *item.GetId())
|
||||
} else {
|
||||
removedIDs = append(removedIDs, *item.GetId())
|
||||
}
|
||||
}
|
||||
|
||||
delta := resp.GetOdataDeltaLink()
|
||||
if delta != nil && len(*delta) > 0 {
|
||||
deltaURL = *delta
|
||||
}
|
||||
|
||||
nextLink := resp.GetOdataNextLink()
|
||||
if nextLink == nil || len(*nextLink) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(*nextLink, service.Adapter())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(oldDelta) > 0 {
|
||||
err := getIDs(users.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, service.Adapter()))
|
||||
builder := users.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, service.Adapter())
|
||||
pgr := &contactPager{service, builder, options}
|
||||
|
||||
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
||||
// note: happy path, not the error condition
|
||||
if err == nil {
|
||||
return ids, removedIDs, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
||||
return added, removed, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
||||
}
|
||||
// only return on error if it is NOT a delta issue.
|
||||
// otherwise we'll retry the call with the regular builder
|
||||
// on bad deltas we retry the call with the regular builder
|
||||
if graph.IsErrInvalidDelta(err) == nil {
|
||||
return nil, nil, DeltaUpdate{}, err
|
||||
}
|
||||
@ -234,15 +214,13 @@ func (c Contacts) GetAddedAndRemovedItemIDs(
|
||||
errs = nil
|
||||
}
|
||||
|
||||
builder := service.Client().
|
||||
UsersById(user).
|
||||
ContactFoldersById(directoryID).
|
||||
Contacts().
|
||||
Delta()
|
||||
builder := service.Client().UsersById(user).ContactFoldersById(directoryID).Contacts().Delta()
|
||||
pgr := &contactPager{service, builder, options}
|
||||
|
||||
if err := getIDs(builder); err != nil {
|
||||
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
||||
if err != nil {
|
||||
return nil, nil, DeltaUpdate{}, err
|
||||
}
|
||||
|
||||
return ids, removedIDs, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
||||
return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
@ -18,6 +18,11 @@ import (
|
||||
// controller
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func (c Client) Events() Events {
|
||||
return Events{c}
|
||||
}
|
||||
|
||||
// Events is an interface-compliant provider of the client.
|
||||
type Events struct {
|
||||
Client
|
||||
}
|
||||
@ -124,6 +129,39 @@ func (c Events) EnumerateContainers(
|
||||
return errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// item pager
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type eventWrapper struct {
|
||||
models.EventCollectionResponseable
|
||||
}
|
||||
|
||||
func (ew eventWrapper) GetOdataDeltaLink() *string {
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ itemPager = &eventPager{}
|
||||
|
||||
type eventPager struct {
|
||||
gs graph.Servicer
|
||||
builder *users.ItemCalendarsItemEventsRequestBuilder
|
||||
options *users.ItemCalendarsItemEventsRequestBuilderGetRequestConfiguration
|
||||
}
|
||||
|
||||
func (p *eventPager) getPage(ctx context.Context) (pageLinker, error) {
|
||||
resp, err := p.builder.Get(ctx, p.options)
|
||||
return eventWrapper{resp}, err
|
||||
}
|
||||
|
||||
func (p *eventPager) setNext(nextLink string) {
|
||||
p.builder = users.NewItemCalendarsItemEventsRequestBuilder(nextLink, p.gs.Adapter())
|
||||
}
|
||||
|
||||
func (p *eventPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) {
|
||||
return toValues[models.Eventable](pl)
|
||||
}
|
||||
|
||||
func (c Events) GetAddedAndRemovedItemIDs(
|
||||
ctx context.Context,
|
||||
user, calendarID, oldDelta string,
|
||||
@ -133,10 +171,7 @@ func (c Events) GetAddedAndRemovedItemIDs(
|
||||
return nil, nil, DeltaUpdate{}, err
|
||||
}
|
||||
|
||||
var (
|
||||
errs *multierror.Error
|
||||
ids []string
|
||||
)
|
||||
var errs *multierror.Error
|
||||
|
||||
options, err := optionsForEventsByCalendar([]string{"id"})
|
||||
if err != nil {
|
||||
@ -144,41 +179,15 @@ func (c Events) GetAddedAndRemovedItemIDs(
|
||||
}
|
||||
|
||||
builder := service.Client().UsersById(user).CalendarsById(calendarID).Events()
|
||||
pgr := &eventPager{service, builder, options}
|
||||
|
||||
for {
|
||||
resp, err := builder.Get(ctx, options)
|
||||
if err != nil {
|
||||
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||
return nil, nil, DeltaUpdate{}, err
|
||||
}
|
||||
|
||||
return nil, nil, DeltaUpdate{}, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
|
||||
for _, item := range resp.GetValue() {
|
||||
if item.GetId() == nil {
|
||||
errs = multierror.Append(
|
||||
errs,
|
||||
errors.Errorf("event with nil ID in calendar %s", calendarID),
|
||||
)
|
||||
|
||||
// TODO(ashmrtn): Handle fail-fast.
|
||||
continue
|
||||
}
|
||||
|
||||
ids = append(ids, *item.GetId())
|
||||
}
|
||||
|
||||
nextLink := resp.GetOdataNextLink()
|
||||
if nextLink == nil || len(*nextLink) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
builder = users.NewItemCalendarsItemEventsRequestBuilder(*nextLink, service.Adapter())
|
||||
added, _, _, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
||||
if err != nil {
|
||||
return nil, nil, DeltaUpdate{}, err
|
||||
}
|
||||
|
||||
// Events don't have a delta endpoint so just return an empty string.
|
||||
return ids, nil, DeltaUpdate{}, errs.ErrorOrNil()
|
||||
return added, nil, DeltaUpdate{}, errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@ -17,6 +17,11 @@ import (
|
||||
// controller
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func (c Client) Mail() Mail {
|
||||
return Mail{c}
|
||||
}
|
||||
|
||||
// Mail is an interface-compliant provider of the client.
|
||||
type Mail struct {
|
||||
Client
|
||||
}
|
||||
@ -145,6 +150,30 @@ func (c Mail) EnumerateContainers(
|
||||
return errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// item pager
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
var _ itemPager = &mailPager{}
|
||||
|
||||
type mailPager struct {
|
||||
gs graph.Servicer
|
||||
builder *users.ItemMailFoldersItemMessagesDeltaRequestBuilder
|
||||
options *users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration
|
||||
}
|
||||
|
||||
func (p *mailPager) getPage(ctx context.Context) (pageLinker, error) {
|
||||
return p.builder.Get(ctx, p.options)
|
||||
}
|
||||
|
||||
func (p *mailPager) setNext(nextLink string) {
|
||||
p.builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(nextLink, p.gs.Adapter())
|
||||
}
|
||||
|
||||
func (p *mailPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) {
|
||||
return toValues[models.Messageable](pl)
|
||||
}
|
||||
|
||||
func (c Mail) GetAddedAndRemovedItemIDs(
|
||||
ctx context.Context,
|
||||
user, directoryID, oldDelta string,
|
||||
@ -156,8 +185,6 @@ func (c Mail) GetAddedAndRemovedItemIDs(
|
||||
|
||||
var (
|
||||
errs *multierror.Error
|
||||
ids []string
|
||||
removedIDs []string
|
||||
deltaURL string
|
||||
resetDelta bool
|
||||
)
|
||||
@ -167,63 +194,17 @@ func (c Mail) GetAddedAndRemovedItemIDs(
|
||||
return nil, nil, DeltaUpdate{}, errors.Wrap(err, "getting query options")
|
||||
}
|
||||
|
||||
getIDs := func(builder *users.ItemMailFoldersItemMessagesDeltaRequestBuilder) error {
|
||||
for {
|
||||
resp, err := builder.Get(ctx, options)
|
||||
if err != nil {
|
||||
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := graph.IsErrInvalidDelta(err); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
|
||||
for _, item := range resp.GetValue() {
|
||||
if item.GetId() == nil {
|
||||
errs = multierror.Append(
|
||||
errs,
|
||||
errors.Errorf("item with nil ID in folder %s", directoryID),
|
||||
)
|
||||
|
||||
// TODO(ashmrtn): Handle fail-fast.
|
||||
continue
|
||||
}
|
||||
|
||||
if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil {
|
||||
ids = append(ids, *item.GetId())
|
||||
} else {
|
||||
removedIDs = append(removedIDs, *item.GetId())
|
||||
}
|
||||
}
|
||||
|
||||
delta := resp.GetOdataDeltaLink()
|
||||
if delta != nil && len(*delta) > 0 {
|
||||
deltaURL = *delta
|
||||
}
|
||||
|
||||
nextLink := resp.GetOdataNextLink()
|
||||
if nextLink == nil || len(*nextLink) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(*nextLink, service.Adapter())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(oldDelta) > 0 {
|
||||
err := getIDs(users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, service.Adapter()))
|
||||
builder := users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, service.Adapter())
|
||||
pgr := &mailPager{service, builder, options}
|
||||
|
||||
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
||||
// note: happy path, not the error condition
|
||||
if err == nil {
|
||||
return ids, removedIDs, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
||||
return added, removed, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
||||
}
|
||||
// only return on error if it is NOT a delta issue.
|
||||
// otherwise we'll retry the call with the regular builder
|
||||
// on bad deltas we retry the call with the regular builder
|
||||
if graph.IsErrInvalidDelta(err) == nil {
|
||||
return nil, nil, DeltaUpdate{}, err
|
||||
}
|
||||
@ -233,10 +214,12 @@ func (c Mail) GetAddedAndRemovedItemIDs(
|
||||
}
|
||||
|
||||
builder := service.Client().UsersById(user).MailFoldersById(directoryID).Messages().Delta()
|
||||
pgr := &mailPager{service, builder, options}
|
||||
|
||||
if err := getIDs(builder); err != nil {
|
||||
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
||||
if err != nil {
|
||||
return nil, nil, DeltaUpdate{}, err
|
||||
}
|
||||
|
||||
return ids, removedIDs, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
||||
return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
||||
}
|
||||
|
||||
126
src/internal/connector/exchange/api/shared.go
Normal file
126
src/internal/connector/exchange/api/shared.go
Normal file
@ -0,0 +1,126 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// generic handler for paging item ids in a container
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type itemPager interface {
|
||||
getPage(context.Context) (pageLinker, error)
|
||||
setNext(string)
|
||||
valuesIn(pageLinker) ([]getIDAndAddtler, error)
|
||||
}
|
||||
|
||||
type pageLinker interface {
|
||||
GetOdataDeltaLink() *string
|
||||
GetOdataNextLink() *string
|
||||
}
|
||||
|
||||
type getIDAndAddtler interface {
|
||||
GetId() *string
|
||||
GetAdditionalData() map[string]any
|
||||
}
|
||||
|
||||
// uses a models interface compliant with { GetValues() []T }
|
||||
// to transform its results into a slice of getIDer interfaces.
|
||||
// Generics used here to handle the variation of msoft interfaces
|
||||
// that all _almost_ comply with GetValue, but all return a different
|
||||
// interface.
|
||||
func toValues[T any](a any) ([]getIDAndAddtler, error) {
|
||||
gv, ok := a.(interface{ GetValue() []T })
|
||||
if !ok {
|
||||
return nil, errors.Errorf("response of type [%T] does not comply with the GetValue() interface", a)
|
||||
}
|
||||
|
||||
items := gv.GetValue()
|
||||
r := make([]getIDAndAddtler, 0, len(items))
|
||||
|
||||
for _, item := range items {
|
||||
var a any = item
|
||||
|
||||
ri, ok := a.(getIDAndAddtler)
|
||||
if !ok {
|
||||
return nil, errors.Errorf("item of type [%T] does not comply with the getIDAndAddtler interface", item)
|
||||
}
|
||||
|
||||
r = append(r, ri)
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// generic controller for retrieving all item ids in a container.
|
||||
func getItemsAddedAndRemovedFromContainer(
|
||||
ctx context.Context,
|
||||
pager itemPager,
|
||||
) ([]string, []string, string, error) {
|
||||
var (
|
||||
addedIDs = []string{}
|
||||
removedIDs = []string{}
|
||||
deltaURL string
|
||||
)
|
||||
|
||||
for {
|
||||
// get the next page of data, check for standard errors
|
||||
resp, err := pager.getPage(ctx)
|
||||
if err != nil {
|
||||
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||
return nil, nil, deltaURL, err
|
||||
}
|
||||
|
||||
if err := graph.IsErrInvalidDelta(err); err != nil {
|
||||
return nil, nil, deltaURL, err
|
||||
}
|
||||
|
||||
return nil, nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||
}
|
||||
|
||||
// each category type responds with a different interface, but all
|
||||
// of them comply with GetValue, which is where we'll get our item data.
|
||||
items, err := pager.valuesIn(resp)
|
||||
if err != nil {
|
||||
return nil, nil, "", err
|
||||
}
|
||||
|
||||
// iterate through the items in the page
|
||||
for _, item := range items {
|
||||
// if the additional data conains a `@removed` key, the value will either
|
||||
// be 'changed' or 'deleted'. We don't really care about the cause: both
|
||||
// cases are handled the same way in storage.
|
||||
if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil {
|
||||
addedIDs = append(addedIDs, *item.GetId())
|
||||
} else {
|
||||
removedIDs = append(removedIDs, *item.GetId())
|
||||
}
|
||||
}
|
||||
|
||||
// the deltaLink is kind of like a cursor for overall data state.
|
||||
// once we run through pages of nextLinks, the last query will
|
||||
// produce a deltaLink instead (if supported), which we'll use on
|
||||
// the next backup to only get the changes since this run.
|
||||
delta := resp.GetOdataDeltaLink()
|
||||
if delta != nil && len(*delta) > 0 {
|
||||
deltaURL = *delta
|
||||
}
|
||||
|
||||
// the nextLink is our page cursor within this query.
|
||||
// if we have more data to retrieve, we'll have a
|
||||
// nextLink instead of a deltaLink.
|
||||
nextLink := resp.GetOdataNextLink()
|
||||
if nextLink == nil || len(*nextLink) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
pager.setNext(*nextLink)
|
||||
}
|
||||
|
||||
return addedIDs, removedIDs, deltaURL, nil
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user