Switch to "delete wins" merge strategy for Exchange IDs (#2142)
## Description #2116 implemented a "last change wins" strategy for deduping IDs. However, it appears Graph API doesn't really enforce an ordering for returned items so that strategy may not work. Instead, implement a "delete wins" strategy. Items that are added, deleted, and then restored in M365 will be restored with a different ID This PR does the following: * reverts changes to return a unified list of IDs and added/removed status * switches to a "delete wins" merge strategy for duplicate IDs * updates tests ## Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [ ] 🕐 Yes, but in a later PR - [x] ⛔ No ## Type of change - [ ] 🌻 Feature - [x] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Test - [ ] 💻 CI/Deployment - [ ] 🧹 Tech Debt/Cleanup ## Issue(s) * closes #1954 ## Test Plan <!-- How will this be tested prior to merging.--> - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
5980c4dca0
commit
8d798d6024
@ -24,15 +24,6 @@ type DeltaUpdate struct {
|
|||||||
Reset bool
|
Reset bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeltaResult contains the ID and whether the item referenced by the ID was
|
|
||||||
// deleted. This allows functions that fetch items for a folder to return a
|
|
||||||
// single consolidated stream which is easier to dedupe as the order between
|
|
||||||
// add/update and delete operations is known.
|
|
||||||
type DeltaResult struct {
|
|
||||||
ID string
|
|
||||||
Deleted bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// GraphQuery represents functions which perform exchange-specific queries
|
// GraphQuery represents functions which perform exchange-specific queries
|
||||||
// into M365 backstore. Responses -> returned items will only contain the information
|
// into M365 backstore. Responses -> returned items will only contain the information
|
||||||
// that is included in the options
|
// that is included in the options
|
||||||
|
|||||||
@ -179,10 +179,10 @@ func (p *contactPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) {
|
|||||||
func (c Contacts) GetAddedAndRemovedItemIDs(
|
func (c Contacts) GetAddedAndRemovedItemIDs(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
user, directoryID, oldDelta string,
|
user, directoryID, oldDelta string,
|
||||||
) ([]DeltaResult, DeltaUpdate, error) {
|
) ([]string, []string, DeltaUpdate, error) {
|
||||||
service, err := c.service()
|
service, err := c.service()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, DeltaUpdate{}, err
|
return nil, nil, DeltaUpdate{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -192,22 +192,22 @@ func (c Contacts) GetAddedAndRemovedItemIDs(
|
|||||||
|
|
||||||
options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"})
|
options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, DeltaUpdate{}, errors.Wrap(err, "getting query options")
|
return nil, nil, DeltaUpdate{}, errors.Wrap(err, "getting query options")
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(oldDelta) > 0 {
|
if len(oldDelta) > 0 {
|
||||||
builder := users.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, service.Adapter())
|
builder := users.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, service.Adapter())
|
||||||
pgr := &contactPager{service, builder, options}
|
pgr := &contactPager{service, builder, options}
|
||||||
|
|
||||||
items, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
||||||
// note: happy path, not the error condition
|
// note: happy path, not the error condition
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return items, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
return added, removed, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
||||||
}
|
}
|
||||||
// only return on error if it is NOT a delta issue.
|
// only return on error if it is NOT a delta issue.
|
||||||
// on bad deltas we retry the call with the regular builder
|
// on bad deltas we retry the call with the regular builder
|
||||||
if graph.IsErrInvalidDelta(err) == nil {
|
if graph.IsErrInvalidDelta(err) == nil {
|
||||||
return nil, DeltaUpdate{}, err
|
return nil, nil, DeltaUpdate{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resetDelta = true
|
resetDelta = true
|
||||||
@ -217,10 +217,10 @@ func (c Contacts) GetAddedAndRemovedItemIDs(
|
|||||||
builder := service.Client().UsersById(user).ContactFoldersById(directoryID).Contacts().Delta()
|
builder := service.Client().UsersById(user).ContactFoldersById(directoryID).Contacts().Delta()
|
||||||
pgr := &contactPager{service, builder, options}
|
pgr := &contactPager{service, builder, options}
|
||||||
|
|
||||||
items, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, DeltaUpdate{}, err
|
return nil, nil, DeltaUpdate{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return items, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -165,29 +165,29 @@ func (p *eventPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) {
|
|||||||
func (c Events) GetAddedAndRemovedItemIDs(
|
func (c Events) GetAddedAndRemovedItemIDs(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
user, calendarID, oldDelta string,
|
user, calendarID, oldDelta string,
|
||||||
) ([]DeltaResult, DeltaUpdate, error) {
|
) ([]string, []string, DeltaUpdate, error) {
|
||||||
service, err := c.service()
|
service, err := c.service()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, DeltaUpdate{}, err
|
return nil, nil, DeltaUpdate{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var errs *multierror.Error
|
var errs *multierror.Error
|
||||||
|
|
||||||
options, err := optionsForEventsByCalendar([]string{"id"})
|
options, err := optionsForEventsByCalendar([]string{"id"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, DeltaUpdate{}, err
|
return nil, nil, DeltaUpdate{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
builder := service.Client().UsersById(user).CalendarsById(calendarID).Events()
|
builder := service.Client().UsersById(user).CalendarsById(calendarID).Events()
|
||||||
pgr := &eventPager{service, builder, options}
|
pgr := &eventPager{service, builder, options}
|
||||||
|
|
||||||
items, _, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
added, _, _, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, DeltaUpdate{}, err
|
return nil, nil, DeltaUpdate{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Events don't have a delta endpoint so just return an empty string.
|
// Events don't have a delta endpoint so just return an empty string.
|
||||||
return items, DeltaUpdate{}, errs.ErrorOrNil()
|
return added, nil, DeltaUpdate{}, errs.ErrorOrNil()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|||||||
@ -177,10 +177,10 @@ func (p *mailPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) {
|
|||||||
func (c Mail) GetAddedAndRemovedItemIDs(
|
func (c Mail) GetAddedAndRemovedItemIDs(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
user, directoryID, oldDelta string,
|
user, directoryID, oldDelta string,
|
||||||
) ([]DeltaResult, DeltaUpdate, error) {
|
) ([]string, []string, DeltaUpdate, error) {
|
||||||
service, err := c.service()
|
service, err := c.service()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, DeltaUpdate{}, err
|
return nil, nil, DeltaUpdate{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -191,22 +191,22 @@ func (c Mail) GetAddedAndRemovedItemIDs(
|
|||||||
|
|
||||||
options, err := optionsForFolderMessagesDelta([]string{"isRead"})
|
options, err := optionsForFolderMessagesDelta([]string{"isRead"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, DeltaUpdate{}, errors.Wrap(err, "getting query options")
|
return nil, nil, DeltaUpdate{}, errors.Wrap(err, "getting query options")
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(oldDelta) > 0 {
|
if len(oldDelta) > 0 {
|
||||||
builder := users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, service.Adapter())
|
builder := users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, service.Adapter())
|
||||||
pgr := &mailPager{service, builder, options}
|
pgr := &mailPager{service, builder, options}
|
||||||
|
|
||||||
items, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
||||||
// note: happy path, not the error condition
|
// note: happy path, not the error condition
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return items, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
return added, removed, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil()
|
||||||
}
|
}
|
||||||
// only return on error if it is NOT a delta issue.
|
// only return on error if it is NOT a delta issue.
|
||||||
// on bad deltas we retry the call with the regular builder
|
// on bad deltas we retry the call with the regular builder
|
||||||
if graph.IsErrInvalidDelta(err) == nil {
|
if graph.IsErrInvalidDelta(err) == nil {
|
||||||
return nil, DeltaUpdate{}, err
|
return nil, nil, DeltaUpdate{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resetDelta = true
|
resetDelta = true
|
||||||
@ -216,10 +216,10 @@ func (c Mail) GetAddedAndRemovedItemIDs(
|
|||||||
builder := service.Client().UsersById(user).MailFoldersById(directoryID).Messages().Delta()
|
builder := service.Client().UsersById(user).MailFoldersById(directoryID).Messages().Delta()
|
||||||
pgr := &mailPager{service, builder, options}
|
pgr := &mailPager{service, builder, options}
|
||||||
|
|
||||||
items, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, DeltaUpdate{}, err
|
return nil, nil, DeltaUpdate{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return items, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -61,9 +61,10 @@ func toValues[T any](a any) ([]getIDAndAddtler, error) {
|
|||||||
func getItemsAddedAndRemovedFromContainer(
|
func getItemsAddedAndRemovedFromContainer(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
pager itemPager,
|
pager itemPager,
|
||||||
) ([]DeltaResult, string, error) {
|
) ([]string, []string, string, error) {
|
||||||
var (
|
var (
|
||||||
foundItems = []DeltaResult{}
|
addedIDs = []string{}
|
||||||
|
removedIDs = []string{}
|
||||||
deltaURL string
|
deltaURL string
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -72,37 +73,33 @@ func getItemsAddedAndRemovedFromContainer(
|
|||||||
resp, err := pager.getPage(ctx)
|
resp, err := pager.getPage(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
if err := graph.IsErrDeletedInFlight(err); err != nil {
|
||||||
return nil, deltaURL, err
|
return nil, nil, deltaURL, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := graph.IsErrInvalidDelta(err); err != nil {
|
if err := graph.IsErrInvalidDelta(err); err != nil {
|
||||||
return nil, deltaURL, err
|
return nil, nil, deltaURL, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
return nil, nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
// each category type responds with a different interface, but all
|
// each category type responds with a different interface, but all
|
||||||
// of them comply with GetValue, which is where we'll get our item data.
|
// of them comply with GetValue, which is where we'll get our item data.
|
||||||
items, err := pager.valuesIn(resp)
|
items, err := pager.valuesIn(resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, nil, "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterate through the items in the page
|
// iterate through the items in the page
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
newItem := DeltaResult{
|
|
||||||
ID: *item.GetId(),
|
|
||||||
}
|
|
||||||
|
|
||||||
// if the additional data conains a `@removed` key, the value will either
|
// if the additional data conains a `@removed` key, the value will either
|
||||||
// be 'changed' or 'deleted'. We don't really care about the cause: both
|
// be 'changed' or 'deleted'. We don't really care about the cause: both
|
||||||
// cases are handled the same way in storage.
|
// cases are handled the same way in storage.
|
||||||
if item.GetAdditionalData()[graph.AddtlDataRemoved] != nil {
|
if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil {
|
||||||
newItem.Deleted = true
|
addedIDs = append(addedIDs, *item.GetId())
|
||||||
|
} else {
|
||||||
|
removedIDs = append(removedIDs, *item.GetId())
|
||||||
}
|
}
|
||||||
|
|
||||||
foundItems = append(foundItems, newItem)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// the deltaLink is kind of like a cursor for overall data state.
|
// the deltaLink is kind of like a cursor for overall data state.
|
||||||
@ -125,5 +122,5 @@ func getItemsAddedAndRemovedFromContainer(
|
|||||||
pager.setNext(*nextLink)
|
pager.setNext(*nextLink)
|
||||||
}
|
}
|
||||||
|
|
||||||
return foundItems, deltaURL, nil
|
return addedIDs, removedIDs, deltaURL, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,7 +19,7 @@ type addedAndRemovedItemIDsGetter interface {
|
|||||||
GetAddedAndRemovedItemIDs(
|
GetAddedAndRemovedItemIDs(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
user, containerID, oldDeltaToken string,
|
user, containerID, oldDeltaToken string,
|
||||||
) ([]api.DeltaResult, api.DeltaUpdate, error)
|
) ([]string, []string, api.DeltaUpdate, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// filterContainersAndFillCollections is a utility function
|
// filterContainersAndFillCollections is a utility function
|
||||||
@ -93,7 +93,7 @@ func filterContainersAndFillCollections(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
items, newDelta, err := getter.GetAddedAndRemovedItemIDs(ctx, qp.ResourceOwner, cID, prevDelta)
|
added, removed, newDelta, err := getter.GetAddedAndRemovedItemIDs(ctx, qp.ResourceOwner, cID, prevDelta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// note == nil check; only catches non-inFlight error cases.
|
// note == nil check; only catches non-inFlight error cases.
|
||||||
if graph.IsErrDeletedInFlight(err) == nil {
|
if graph.IsErrDeletedInFlight(err) == nil {
|
||||||
@ -126,20 +126,16 @@ func filterContainersAndFillCollections(
|
|||||||
|
|
||||||
collections[cID] = &edc
|
collections[cID] = &edc
|
||||||
|
|
||||||
// This results in "last one wins" if there's duplicate entries for an ID
|
for _, add := range added {
|
||||||
// and some are deleted while some are added.
|
edc.added[add] = struct{}{}
|
||||||
for _, i := range items {
|
}
|
||||||
m := edc.added
|
|
||||||
del := edc.removed
|
|
||||||
|
|
||||||
if i.Deleted {
|
// Remove any deleted IDs from the set of added IDs because items that are
|
||||||
m = edc.removed
|
// deleted and then restored will have a different ID than they did
|
||||||
del = edc.added
|
// originally.
|
||||||
}
|
for _, remove := range removed {
|
||||||
|
delete(edc.added, remove)
|
||||||
m[i.ID] = struct{}{}
|
edc.removed[remove] = struct{}{}
|
||||||
|
|
||||||
delete(del, i.ID)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// add the current path for the container ID to be used in the next backup
|
// add the current path for the container ID to be used in the next backup
|
||||||
|
|||||||
@ -30,7 +30,8 @@ var _ addedAndRemovedItemIDsGetter = &mockGetter{}
|
|||||||
type (
|
type (
|
||||||
mockGetter map[string]mockGetterResults
|
mockGetter map[string]mockGetterResults
|
||||||
mockGetterResults struct {
|
mockGetterResults struct {
|
||||||
items []api.DeltaResult
|
added []string
|
||||||
|
removed []string
|
||||||
newDelta api.DeltaUpdate
|
newDelta api.DeltaUpdate
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
@ -40,16 +41,17 @@ func (mg mockGetter) GetAddedAndRemovedItemIDs(
|
|||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
userID, cID, prevDelta string,
|
userID, cID, prevDelta string,
|
||||||
) (
|
) (
|
||||||
[]api.DeltaResult,
|
[]string,
|
||||||
|
[]string,
|
||||||
api.DeltaUpdate,
|
api.DeltaUpdate,
|
||||||
error,
|
error,
|
||||||
) {
|
) {
|
||||||
results, ok := mg[cID]
|
results, ok := mg[cID]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, api.DeltaUpdate{}, errors.New("mock not found for " + cID)
|
return nil, nil, api.DeltaUpdate{}, errors.New("mock not found for " + cID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return results.items, results.newDelta, results.err
|
return results.added, results.removed, results.newDelta, results.err
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ graph.ContainerResolver = &mockResolver{}
|
var _ graph.ContainerResolver = &mockResolver{}
|
||||||
@ -110,25 +112,20 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
|
|||||||
statusUpdater = func(*support.ConnectorOperationStatus) {}
|
statusUpdater = func(*support.ConnectorOperationStatus) {}
|
||||||
allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0]
|
allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0]
|
||||||
dps = DeltaPaths{} // incrementals are tested separately
|
dps = DeltaPaths{} // incrementals are tested separately
|
||||||
getterItems = []api.DeltaResult{
|
commonResult = mockGetterResults{
|
||||||
{ID: "a1"},
|
added: []string{"a1", "a2", "a3"},
|
||||||
{ID: "a2"},
|
removed: []string{"r1", "r2", "r3"},
|
||||||
{ID: "a3"},
|
|
||||||
{ID: "r1", Deleted: true},
|
|
||||||
{ID: "r2", Deleted: true},
|
|
||||||
{ID: "r3", Deleted: true},
|
|
||||||
}
|
|
||||||
commonResult = mockGetterResults{
|
|
||||||
items: getterItems,
|
|
||||||
newDelta: api.DeltaUpdate{URL: "delta_url"},
|
newDelta: api.DeltaUpdate{URL: "delta_url"},
|
||||||
}
|
}
|
||||||
errorResult = mockGetterResults{
|
errorResult = mockGetterResults{
|
||||||
items: getterItems,
|
added: []string{"a1", "a2", "a3"},
|
||||||
|
removed: []string{"r1", "r2", "r3"},
|
||||||
newDelta: api.DeltaUpdate{URL: "delta_url"},
|
newDelta: api.DeltaUpdate{URL: "delta_url"},
|
||||||
err: assert.AnError,
|
err: assert.AnError,
|
||||||
}
|
}
|
||||||
deletedInFlightResult = mockGetterResults{
|
deletedInFlightResult = mockGetterResults{
|
||||||
items: getterItems,
|
added: []string{"a1", "a2", "a3"},
|
||||||
|
removed: []string{"r1", "r2", "r3"},
|
||||||
newDelta: api.DeltaUpdate{URL: "delta_url"},
|
newDelta: api.DeltaUpdate{URL: "delta_url"},
|
||||||
err: graph.ErrDeletedInFlight{Err: *common.EncapsulateError(assert.AnError)},
|
err: graph.ErrDeletedInFlight{Err: *common.EncapsulateError(assert.AnError)},
|
||||||
}
|
}
|
||||||
@ -336,136 +333,81 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
|
|||||||
exColl, ok := coll.(*Collection)
|
exColl, ok := coll.(*Collection)
|
||||||
require.True(t, ok, "collection is an *exchange.Collection")
|
require.True(t, ok, "collection is an *exchange.Collection")
|
||||||
|
|
||||||
expectAdded := map[string]struct{}{}
|
ids := [][]string{
|
||||||
expectRemoved := map[string]struct{}{}
|
make([]string, 0, len(exColl.added)),
|
||||||
|
make([]string, 0, len(exColl.removed)),
|
||||||
|
}
|
||||||
|
|
||||||
for _, i := range expect.items {
|
for i, cIDs := range []map[string]struct{}{exColl.added, exColl.removed} {
|
||||||
if i.Deleted {
|
for id := range cIDs {
|
||||||
expectRemoved[i.ID] = struct{}{}
|
ids[i] = append(ids[i], id)
|
||||||
} else {
|
|
||||||
expectAdded[i.ID] = struct{}{}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Equal(t, expectAdded, exColl.added, "added items")
|
assert.ElementsMatch(t, expect.added, ids[0], "added items")
|
||||||
assert.Equal(t, expectRemoved, exColl.removed, "removed items")
|
assert.ElementsMatch(t, expect.removed, ids[1], "removed items")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repeatedItems() {
|
func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repeatedItems() {
|
||||||
var (
|
newDelta := api.DeltaUpdate{URL: "delta_url"}
|
||||||
userID = "user_id"
|
|
||||||
qp = graph.QueryParams{
|
|
||||||
Category: path.EmailCategory, // doesn't matter which one we use.
|
|
||||||
ResourceOwner: userID,
|
|
||||||
Credentials: suite.creds,
|
|
||||||
}
|
|
||||||
statusUpdater = func(*support.ConnectorOperationStatus) {}
|
|
||||||
allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0]
|
|
||||||
dps = DeltaPaths{} // incrementals are tested separately
|
|
||||||
delta = api.DeltaUpdate{URL: "delta_url"}
|
|
||||||
container1 = mockContainer{
|
|
||||||
id: strPtr("1"),
|
|
||||||
displayName: strPtr("display_name_1"),
|
|
||||||
p: path.Builder{}.Append("display_name_1"),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
table := []struct {
|
table := []struct {
|
||||||
name string
|
name string
|
||||||
getter mockGetter
|
getter mockGetter
|
||||||
resolver graph.ContainerResolver
|
expectAdded map[string]struct{}
|
||||||
scope selectors.ExchangeScope
|
expectRemoved map[string]struct{}
|
||||||
failFast bool
|
|
||||||
expectErr assert.ErrorAssertionFunc
|
|
||||||
expectNewColls int
|
|
||||||
expectMetadataColls int
|
|
||||||
expectAdded map[string]struct{}
|
|
||||||
expectRemoved map[string]struct{}
|
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "repeated add",
|
name: "repeated adds",
|
||||||
getter: map[string]mockGetterResults{
|
getter: map[string]mockGetterResults{
|
||||||
"1": {
|
"1": {
|
||||||
items: []api.DeltaResult{
|
added: []string{"a1", "a2", "a3", "a1"},
|
||||||
{ID: "a1"},
|
newDelta: newDelta,
|
||||||
{ID: "a1"},
|
|
||||||
},
|
|
||||||
newDelta: delta,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
resolver: newMockResolver(container1),
|
expectAdded: map[string]struct{}{
|
||||||
scope: allScope,
|
"a1": {},
|
||||||
expectErr: assert.NoError,
|
"a2": {},
|
||||||
expectNewColls: 1,
|
"a3": {},
|
||||||
expectMetadataColls: 1,
|
},
|
||||||
expectAdded: map[string]struct{}{"a1": {}},
|
|
||||||
// Avoid failures for nil map.
|
|
||||||
expectRemoved: map[string]struct{}{},
|
expectRemoved: map[string]struct{}{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "repeated remove",
|
name: "repeated removes",
|
||||||
getter: map[string]mockGetterResults{
|
getter: map[string]mockGetterResults{
|
||||||
"1": {
|
"1": {
|
||||||
items: []api.DeltaResult{
|
removed: []string{"r1", "r2", "r3", "r1"},
|
||||||
{ID: "a1", Deleted: true},
|
newDelta: newDelta,
|
||||||
{ID: "a1", Deleted: true},
|
|
||||||
},
|
|
||||||
newDelta: delta,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
resolver: newMockResolver(container1),
|
expectAdded: map[string]struct{}{},
|
||||||
scope: allScope,
|
expectRemoved: map[string]struct{}{
|
||||||
expectErr: assert.NoError,
|
"r1": {},
|
||||||
expectNewColls: 1,
|
"r2": {},
|
||||||
expectMetadataColls: 1,
|
"r3": {},
|
||||||
expectAdded: map[string]struct{}{},
|
},
|
||||||
expectRemoved: map[string]struct{}{"a1": {}},
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "interleaved, final remove",
|
name: "remove for same item wins",
|
||||||
getter: map[string]mockGetterResults{
|
getter: map[string]mockGetterResults{
|
||||||
"1": {
|
"1": {
|
||||||
items: []api.DeltaResult{
|
added: []string{"i1", "a2", "a3"},
|
||||||
{ID: "a1"},
|
removed: []string{"i1", "r2", "r3"},
|
||||||
{ID: "a1", Deleted: true},
|
newDelta: newDelta,
|
||||||
{ID: "a1"},
|
|
||||||
{ID: "a1", Deleted: true},
|
|
||||||
},
|
|
||||||
newDelta: delta,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
resolver: newMockResolver(container1),
|
expectAdded: map[string]struct{}{
|
||||||
scope: allScope,
|
"a2": {},
|
||||||
expectErr: assert.NoError,
|
"a3": {},
|
||||||
expectNewColls: 1,
|
},
|
||||||
expectMetadataColls: 1,
|
expectRemoved: map[string]struct{}{
|
||||||
expectAdded: map[string]struct{}{},
|
"i1": {},
|
||||||
expectRemoved: map[string]struct{}{"a1": {}},
|
"r2": {},
|
||||||
},
|
"r3": {},
|
||||||
{
|
|
||||||
name: "interleaved, final add",
|
|
||||||
getter: map[string]mockGetterResults{
|
|
||||||
"1": {
|
|
||||||
items: []api.DeltaResult{
|
|
||||||
{ID: "a1"},
|
|
||||||
{ID: "a1", Deleted: true},
|
|
||||||
{ID: "a1"},
|
|
||||||
{ID: "a1", Deleted: true},
|
|
||||||
{ID: "a1"},
|
|
||||||
},
|
|
||||||
newDelta: delta,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
resolver: newMockResolver(container1),
|
|
||||||
scope: allScope,
|
|
||||||
expectErr: assert.NoError,
|
|
||||||
expectNewColls: 1,
|
|
||||||
expectMetadataColls: 1,
|
|
||||||
expectAdded: map[string]struct{}{"a1": {}},
|
|
||||||
expectRemoved: map[string]struct{}{},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
@ -473,6 +415,24 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea
|
|||||||
ctx, flush := tester.NewContext()
|
ctx, flush := tester.NewContext()
|
||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
|
var (
|
||||||
|
userID = "user_id"
|
||||||
|
qp = graph.QueryParams{
|
||||||
|
Category: path.EmailCategory, // doesn't matter which one we use.
|
||||||
|
ResourceOwner: userID,
|
||||||
|
Credentials: suite.creds,
|
||||||
|
}
|
||||||
|
statusUpdater = func(*support.ConnectorOperationStatus) {}
|
||||||
|
allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0]
|
||||||
|
dps = DeltaPaths{} // incrementals are tested separately
|
||||||
|
container1 = mockContainer{
|
||||||
|
id: strPtr("1"),
|
||||||
|
displayName: strPtr("display_name_1"),
|
||||||
|
p: path.Builder{}.Append("display_name_1"),
|
||||||
|
}
|
||||||
|
resolver = newMockResolver(container1)
|
||||||
|
)
|
||||||
|
|
||||||
collections := map[string]data.Collection{}
|
collections := map[string]data.Collection{}
|
||||||
|
|
||||||
err := filterContainersAndFillCollections(
|
err := filterContainersAndFillCollections(
|
||||||
@ -481,12 +441,12 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea
|
|||||||
test.getter,
|
test.getter,
|
||||||
collections,
|
collections,
|
||||||
statusUpdater,
|
statusUpdater,
|
||||||
test.resolver,
|
resolver,
|
||||||
test.scope,
|
allScope,
|
||||||
dps,
|
dps,
|
||||||
control.Options{FailFast: test.failFast},
|
control.Options{FailFast: true},
|
||||||
)
|
)
|
||||||
test.expectErr(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// collection assertions
|
// collection assertions
|
||||||
|
|
||||||
@ -508,17 +468,26 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea
|
|||||||
if c.DoNotMergeItems() {
|
if c.DoNotMergeItems() {
|
||||||
doNotMerges++
|
doNotMerges++
|
||||||
}
|
}
|
||||||
|
|
||||||
exColl, ok := c.(*Collection)
|
|
||||||
require.True(t, ok, "collection is an *exchange.Collection")
|
|
||||||
|
|
||||||
assert.Equal(t, test.expectAdded, exColl.added)
|
|
||||||
assert.Equal(t, test.expectRemoved, exColl.removed)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Zero(t, deleteds, "deleted collections")
|
assert.Zero(t, deleteds, "deleted collections")
|
||||||
assert.Equal(t, test.expectMetadataColls, metadatas, "metadata collections")
|
assert.Equal(t, 1, news, "new collections")
|
||||||
assert.Equal(t, test.expectNewColls, news, "new collections")
|
assert.Equal(t, 1, metadatas, "metadata collections")
|
||||||
|
assert.Zero(t, doNotMerges, "doNotMerge collections")
|
||||||
|
|
||||||
|
// items in collections assertions
|
||||||
|
for k := range test.getter {
|
||||||
|
coll := collections[k]
|
||||||
|
if !assert.NotNilf(t, coll, "missing collection for path %s", k) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
exColl, ok := coll.(*Collection)
|
||||||
|
require.True(t, ok, "collection is an *exchange.Collection")
|
||||||
|
|
||||||
|
assert.Equal(t, test.expectAdded, exColl.added, "added items")
|
||||||
|
assert.Equal(t, test.expectRemoved, exColl.removed, "removed items")
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -536,11 +505,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
|
|||||||
statusUpdater = func(*support.ConnectorOperationStatus) {}
|
statusUpdater = func(*support.ConnectorOperationStatus) {}
|
||||||
allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0]
|
allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0]
|
||||||
commonResults = mockGetterResults{
|
commonResults = mockGetterResults{
|
||||||
items: []api.DeltaResult{{ID: "added"}},
|
added: []string{"added"},
|
||||||
newDelta: api.DeltaUpdate{URL: "new_delta_url"},
|
newDelta: api.DeltaUpdate{URL: "new_delta_url"},
|
||||||
}
|
}
|
||||||
expiredResults = mockGetterResults{
|
expiredResults = mockGetterResults{
|
||||||
items: []api.DeltaResult{{ID: "added"}},
|
added: []string{"added"},
|
||||||
newDelta: api.DeltaUpdate{
|
newDelta: api.DeltaUpdate{
|
||||||
URL: "new_delta_url",
|
URL: "new_delta_url",
|
||||||
Reset: true,
|
Reset: true,
|
||||||
|
|||||||
@ -941,35 +941,19 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchangeIncrementals() {
|
|||||||
|
|
||||||
switch category {
|
switch category {
|
||||||
case path.EmailCategory:
|
case path.EmailCategory:
|
||||||
ids, _, err := ac.Mail().GetAddedAndRemovedItemIDs(ctx, suite.user, containerID, "")
|
ids, _, _, err := ac.Mail().GetAddedAndRemovedItemIDs(ctx, suite.user, containerID, "")
|
||||||
require.NoError(t, err, "getting message ids")
|
require.NoError(t, err, "getting message ids")
|
||||||
require.NotEmpty(t, ids, "message ids in folder")
|
require.NotEmpty(t, ids, "message ids in folder")
|
||||||
|
|
||||||
var idx int
|
err = cli.MessagesById(ids[0]).Delete(ctx, nil)
|
||||||
|
|
||||||
for _, item := range ids {
|
|
||||||
if item.Deleted {
|
|
||||||
idx++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err = cli.MessagesById(ids[idx].ID).Delete(ctx, nil)
|
|
||||||
require.NoError(t, err, "deleting email item: %s", support.ConnectorStackErrorTrace(err))
|
require.NoError(t, err, "deleting email item: %s", support.ConnectorStackErrorTrace(err))
|
||||||
|
|
||||||
case path.ContactsCategory:
|
case path.ContactsCategory:
|
||||||
ids, _, err := ac.Contacts().GetAddedAndRemovedItemIDs(ctx, suite.user, containerID, "")
|
ids, _, _, err := ac.Contacts().GetAddedAndRemovedItemIDs(ctx, suite.user, containerID, "")
|
||||||
require.NoError(t, err, "getting contact ids")
|
require.NoError(t, err, "getting contact ids")
|
||||||
require.NotEmpty(t, ids, "contact ids in folder")
|
require.NotEmpty(t, ids, "contact ids in folder")
|
||||||
|
|
||||||
var idx int
|
err = cli.ContactsById(ids[0]).Delete(ctx, nil)
|
||||||
|
|
||||||
for _, item := range ids {
|
|
||||||
if item.Deleted {
|
|
||||||
idx++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err = cli.ContactsById(ids[idx].ID).Delete(ctx, nil)
|
|
||||||
require.NoError(t, err, "deleting contact item: %s", support.ConnectorStackErrorTrace(err))
|
require.NoError(t, err, "deleting contact item: %s", support.ConnectorStackErrorTrace(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user