From 1174a99e84494be7f2af71c7ca93362da205e22f Mon Sep 17 00:00:00 2001 From: Keepers Date: Tue, 10 Jan 2023 14:31:20 -0700 Subject: [PATCH] refactors the exchange fetchIDs iters (#1906) ## Description Transitions the fetchIDs service iterators to a set of interfaces to consolidate code across multiple nearly identical variations of "fetch id for directory". This was originally written in another PR (1780), then separated out to isolate changes. ## Type of change - [x] :hamster: Trivial/Minor ## Test Plan - [x] :green_heart: E2E --- src/internal/connector/exchange/api/api.go | 12 -- .../connector/exchange/api/contacts.go | 102 ++++++-------- src/internal/connector/exchange/api/events.go | 79 ++++++----- src/internal/connector/exchange/api/mail.go | 95 ++++++------- src/internal/connector/exchange/api/shared.go | 126 ++++++++++++++++++ 5 files changed, 249 insertions(+), 165 deletions(-) create mode 100644 src/internal/connector/exchange/api/shared.go diff --git a/src/internal/connector/exchange/api/api.go b/src/internal/connector/exchange/api/api.go index acaeb9c5e..3fd15409f 100644 --- a/src/internal/connector/exchange/api/api.go +++ b/src/internal/connector/exchange/api/api.go @@ -87,18 +87,6 @@ func newService(creds account.M365Config) (*graph.Service, error) { return graph.NewService(adapter), nil } -func (c Client) Contacts() Contacts { - return Contacts{c} -} - -func (c Client) Events() Events { - return Events{c} -} - -func (c Client) Mail() Mail { - return Mail{c} -} - // --------------------------------------------------------------------------- // helper funcs // --------------------------------------------------------------------------- diff --git a/src/internal/connector/exchange/api/contacts.go b/src/internal/connector/exchange/api/contacts.go index 0356d88ef..ab41ff4b3 100644 --- a/src/internal/connector/exchange/api/contacts.go +++ b/src/internal/connector/exchange/api/contacts.go @@ -17,6 +17,11 @@ import ( // controller // --------------------------------------------------------------------------- +func (c Client) Contacts() Contacts { + return Contacts{c} +} + +// Contacts is an interface-compliant provider of the client. type Contacts struct { Client } @@ -147,6 +152,30 @@ func (c Contacts) EnumerateContainers( return errs.ErrorOrNil() } +// --------------------------------------------------------------------------- +// item pager +// --------------------------------------------------------------------------- + +var _ itemPager = &contactPager{} + +type contactPager struct { + gs graph.Servicer + builder *users.ItemContactFoldersItemContactsDeltaRequestBuilder + options *users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration +} + +func (p *contactPager) getPage(ctx context.Context) (pageLinker, error) { + return p.builder.Get(ctx, p.options) +} + +func (p *contactPager) setNext(nextLink string) { + p.builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(nextLink, p.gs.Adapter()) +} + +func (p *contactPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) { + return toValues[models.Contactable](pl) +} + func (c Contacts) GetAddedAndRemovedItemIDs( ctx context.Context, user, directoryID, oldDelta string, @@ -158,9 +187,6 @@ func (c Contacts) GetAddedAndRemovedItemIDs( var ( errs *multierror.Error - ids []string - removedIDs []string - deltaURL string resetDelta bool ) @@ -169,63 +195,17 @@ func (c Contacts) GetAddedAndRemovedItemIDs( return nil, nil, DeltaUpdate{}, errors.Wrap(err, "getting query options") } - getIDs := func(builder *users.ItemContactFoldersItemContactsDeltaRequestBuilder) error { - for { - resp, err := builder.Get(ctx, options) - if err != nil { - if err := graph.IsErrDeletedInFlight(err); err != nil { - return err - } - - if err := graph.IsErrInvalidDelta(err); err != nil { - return err - } - - return errors.Wrap(err, support.ConnectorStackErrorTrace(err)) - } - - for _, item := range resp.GetValue() { - if item.GetId() == nil { - errs = multierror.Append( - errs, - errors.Errorf("item with nil ID in folder %s", directoryID), - ) - - // TODO(ashmrtn): Handle fail-fast. - continue - } - - if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil { - ids = append(ids, *item.GetId()) - } else { - removedIDs = append(removedIDs, *item.GetId()) - } - } - - delta := resp.GetOdataDeltaLink() - if delta != nil && len(*delta) > 0 { - deltaURL = *delta - } - - nextLink := resp.GetOdataNextLink() - if nextLink == nil || len(*nextLink) == 0 { - break - } - - builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(*nextLink, service.Adapter()) - } - - return nil - } - if len(oldDelta) > 0 { - err := getIDs(users.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, service.Adapter())) + builder := users.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, service.Adapter()) + pgr := &contactPager{service, builder, options} + + added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) // note: happy path, not the error condition if err == nil { - return ids, removedIDs, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil() + return added, removed, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil() } // only return on error if it is NOT a delta issue. - // otherwise we'll retry the call with the regular builder + // on bad deltas we retry the call with the regular builder if graph.IsErrInvalidDelta(err) == nil { return nil, nil, DeltaUpdate{}, err } @@ -234,15 +214,13 @@ func (c Contacts) GetAddedAndRemovedItemIDs( errs = nil } - builder := service.Client(). - UsersById(user). - ContactFoldersById(directoryID). - Contacts(). - Delta() + builder := service.Client().UsersById(user).ContactFoldersById(directoryID).Contacts().Delta() + pgr := &contactPager{service, builder, options} - if err := getIDs(builder); err != nil { + added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + if err != nil { return nil, nil, DeltaUpdate{}, err } - return ids, removedIDs, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() + return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() } diff --git a/src/internal/connector/exchange/api/events.go b/src/internal/connector/exchange/api/events.go index bd6a68893..bd37a361a 100644 --- a/src/internal/connector/exchange/api/events.go +++ b/src/internal/connector/exchange/api/events.go @@ -18,6 +18,11 @@ import ( // controller // --------------------------------------------------------------------------- +func (c Client) Events() Events { + return Events{c} +} + +// Events is an interface-compliant provider of the client. type Events struct { Client } @@ -124,6 +129,39 @@ func (c Events) EnumerateContainers( return errs.ErrorOrNil() } +// --------------------------------------------------------------------------- +// item pager +// --------------------------------------------------------------------------- + +type eventWrapper struct { + models.EventCollectionResponseable +} + +func (ew eventWrapper) GetOdataDeltaLink() *string { + return nil +} + +var _ itemPager = &eventPager{} + +type eventPager struct { + gs graph.Servicer + builder *users.ItemCalendarsItemEventsRequestBuilder + options *users.ItemCalendarsItemEventsRequestBuilderGetRequestConfiguration +} + +func (p *eventPager) getPage(ctx context.Context) (pageLinker, error) { + resp, err := p.builder.Get(ctx, p.options) + return eventWrapper{resp}, err +} + +func (p *eventPager) setNext(nextLink string) { + p.builder = users.NewItemCalendarsItemEventsRequestBuilder(nextLink, p.gs.Adapter()) +} + +func (p *eventPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) { + return toValues[models.Eventable](pl) +} + func (c Events) GetAddedAndRemovedItemIDs( ctx context.Context, user, calendarID, oldDelta string, @@ -133,10 +171,7 @@ func (c Events) GetAddedAndRemovedItemIDs( return nil, nil, DeltaUpdate{}, err } - var ( - errs *multierror.Error - ids []string - ) + var errs *multierror.Error options, err := optionsForEventsByCalendar([]string{"id"}) if err != nil { @@ -144,41 +179,15 @@ func (c Events) GetAddedAndRemovedItemIDs( } builder := service.Client().UsersById(user).CalendarsById(calendarID).Events() + pgr := &eventPager{service, builder, options} - for { - resp, err := builder.Get(ctx, options) - if err != nil { - if err := graph.IsErrDeletedInFlight(err); err != nil { - return nil, nil, DeltaUpdate{}, err - } - - return nil, nil, DeltaUpdate{}, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) - } - - for _, item := range resp.GetValue() { - if item.GetId() == nil { - errs = multierror.Append( - errs, - errors.Errorf("event with nil ID in calendar %s", calendarID), - ) - - // TODO(ashmrtn): Handle fail-fast. - continue - } - - ids = append(ids, *item.GetId()) - } - - nextLink := resp.GetOdataNextLink() - if nextLink == nil || len(*nextLink) == 0 { - break - } - - builder = users.NewItemCalendarsItemEventsRequestBuilder(*nextLink, service.Adapter()) + added, _, _, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + if err != nil { + return nil, nil, DeltaUpdate{}, err } // Events don't have a delta endpoint so just return an empty string. - return ids, nil, DeltaUpdate{}, errs.ErrorOrNil() + return added, nil, DeltaUpdate{}, errs.ErrorOrNil() } // --------------------------------------------------------------------------- diff --git a/src/internal/connector/exchange/api/mail.go b/src/internal/connector/exchange/api/mail.go index bc10bf53b..bf6739384 100644 --- a/src/internal/connector/exchange/api/mail.go +++ b/src/internal/connector/exchange/api/mail.go @@ -17,6 +17,11 @@ import ( // controller // --------------------------------------------------------------------------- +func (c Client) Mail() Mail { + return Mail{c} +} + +// Mail is an interface-compliant provider of the client. type Mail struct { Client } @@ -145,6 +150,30 @@ func (c Mail) EnumerateContainers( return errs.ErrorOrNil() } +// --------------------------------------------------------------------------- +// item pager +// --------------------------------------------------------------------------- + +var _ itemPager = &mailPager{} + +type mailPager struct { + gs graph.Servicer + builder *users.ItemMailFoldersItemMessagesDeltaRequestBuilder + options *users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration +} + +func (p *mailPager) getPage(ctx context.Context) (pageLinker, error) { + return p.builder.Get(ctx, p.options) +} + +func (p *mailPager) setNext(nextLink string) { + p.builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(nextLink, p.gs.Adapter()) +} + +func (p *mailPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) { + return toValues[models.Messageable](pl) +} + func (c Mail) GetAddedAndRemovedItemIDs( ctx context.Context, user, directoryID, oldDelta string, @@ -156,8 +185,6 @@ func (c Mail) GetAddedAndRemovedItemIDs( var ( errs *multierror.Error - ids []string - removedIDs []string deltaURL string resetDelta bool ) @@ -167,63 +194,17 @@ func (c Mail) GetAddedAndRemovedItemIDs( return nil, nil, DeltaUpdate{}, errors.Wrap(err, "getting query options") } - getIDs := func(builder *users.ItemMailFoldersItemMessagesDeltaRequestBuilder) error { - for { - resp, err := builder.Get(ctx, options) - if err != nil { - if err := graph.IsErrDeletedInFlight(err); err != nil { - return err - } - - if err := graph.IsErrInvalidDelta(err); err != nil { - return err - } - - return errors.Wrap(err, support.ConnectorStackErrorTrace(err)) - } - - for _, item := range resp.GetValue() { - if item.GetId() == nil { - errs = multierror.Append( - errs, - errors.Errorf("item with nil ID in folder %s", directoryID), - ) - - // TODO(ashmrtn): Handle fail-fast. - continue - } - - if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil { - ids = append(ids, *item.GetId()) - } else { - removedIDs = append(removedIDs, *item.GetId()) - } - } - - delta := resp.GetOdataDeltaLink() - if delta != nil && len(*delta) > 0 { - deltaURL = *delta - } - - nextLink := resp.GetOdataNextLink() - if nextLink == nil || len(*nextLink) == 0 { - break - } - - builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(*nextLink, service.Adapter()) - } - - return nil - } - if len(oldDelta) > 0 { - err := getIDs(users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, service.Adapter())) + builder := users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, service.Adapter()) + pgr := &mailPager{service, builder, options} + + added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) // note: happy path, not the error condition if err == nil { - return ids, removedIDs, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil() + return added, removed, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil() } // only return on error if it is NOT a delta issue. - // otherwise we'll retry the call with the regular builder + // on bad deltas we retry the call with the regular builder if graph.IsErrInvalidDelta(err) == nil { return nil, nil, DeltaUpdate{}, err } @@ -233,10 +214,12 @@ func (c Mail) GetAddedAndRemovedItemIDs( } builder := service.Client().UsersById(user).MailFoldersById(directoryID).Messages().Delta() + pgr := &mailPager{service, builder, options} - if err := getIDs(builder); err != nil { + added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + if err != nil { return nil, nil, DeltaUpdate{}, err } - return ids, removedIDs, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() + return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() } diff --git a/src/internal/connector/exchange/api/shared.go b/src/internal/connector/exchange/api/shared.go new file mode 100644 index 000000000..c77e21fa8 --- /dev/null +++ b/src/internal/connector/exchange/api/shared.go @@ -0,0 +1,126 @@ +package api + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/alcionai/corso/src/internal/connector/graph" + "github.com/alcionai/corso/src/internal/connector/support" +) + +// --------------------------------------------------------------------------- +// generic handler for paging item ids in a container +// --------------------------------------------------------------------------- + +type itemPager interface { + getPage(context.Context) (pageLinker, error) + setNext(string) + valuesIn(pageLinker) ([]getIDAndAddtler, error) +} + +type pageLinker interface { + GetOdataDeltaLink() *string + GetOdataNextLink() *string +} + +type getIDAndAddtler interface { + GetId() *string + GetAdditionalData() map[string]any +} + +// uses a models interface compliant with { GetValues() []T } +// to transform its results into a slice of getIDer interfaces. +// Generics used here to handle the variation of msoft interfaces +// that all _almost_ comply with GetValue, but all return a different +// interface. +func toValues[T any](a any) ([]getIDAndAddtler, error) { + gv, ok := a.(interface{ GetValue() []T }) + if !ok { + return nil, errors.Errorf("response of type [%T] does not comply with the GetValue() interface", a) + } + + items := gv.GetValue() + r := make([]getIDAndAddtler, 0, len(items)) + + for _, item := range items { + var a any = item + + ri, ok := a.(getIDAndAddtler) + if !ok { + return nil, errors.Errorf("item of type [%T] does not comply with the getIDAndAddtler interface", item) + } + + r = append(r, ri) + } + + return r, nil +} + +// generic controller for retrieving all item ids in a container. +func getItemsAddedAndRemovedFromContainer( + ctx context.Context, + pager itemPager, +) ([]string, []string, string, error) { + var ( + addedIDs = []string{} + removedIDs = []string{} + deltaURL string + ) + + for { + // get the next page of data, check for standard errors + resp, err := pager.getPage(ctx) + if err != nil { + if err := graph.IsErrDeletedInFlight(err); err != nil { + return nil, nil, deltaURL, err + } + + if err := graph.IsErrInvalidDelta(err); err != nil { + return nil, nil, deltaURL, err + } + + return nil, nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) + } + + // each category type responds with a different interface, but all + // of them comply with GetValue, which is where we'll get our item data. + items, err := pager.valuesIn(resp) + if err != nil { + return nil, nil, "", err + } + + // iterate through the items in the page + for _, item := range items { + // if the additional data conains a `@removed` key, the value will either + // be 'changed' or 'deleted'. We don't really care about the cause: both + // cases are handled the same way in storage. + if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil { + addedIDs = append(addedIDs, *item.GetId()) + } else { + removedIDs = append(removedIDs, *item.GetId()) + } + } + + // the deltaLink is kind of like a cursor for overall data state. + // once we run through pages of nextLinks, the last query will + // produce a deltaLink instead (if supported), which we'll use on + // the next backup to only get the changes since this run. + delta := resp.GetOdataDeltaLink() + if delta != nil && len(*delta) > 0 { + deltaURL = *delta + } + + // the nextLink is our page cursor within this query. + // if we have more data to retrieve, we'll have a + // nextLink instead of a deltaLink. + nextLink := resp.GetOdataNextLink() + if nextLink == nil || len(*nextLink) == 0 { + break + } + + pager.setNext(*nextLink) + } + + return addedIDs, removedIDs, deltaURL, nil +}