diff --git a/src/internal/connector/exchange/api/api.go b/src/internal/connector/exchange/api/api.go index 999eb6c98..3fd15409f 100644 --- a/src/internal/connector/exchange/api/api.go +++ b/src/internal/connector/exchange/api/api.go @@ -24,15 +24,6 @@ type DeltaUpdate struct { Reset bool } -// DeltaResult contains the ID and whether the item referenced by the ID was -// deleted. This allows functions that fetch items for a folder to return a -// single consolidated stream which is easier to dedupe as the order between -// add/update and delete operations is known. -type DeltaResult struct { - ID string - Deleted bool -} - // GraphQuery represents functions which perform exchange-specific queries // into M365 backstore. Responses -> returned items will only contain the information // that is included in the options diff --git a/src/internal/connector/exchange/api/contacts.go b/src/internal/connector/exchange/api/contacts.go index 9a2b3c3dd..ab41ff4b3 100644 --- a/src/internal/connector/exchange/api/contacts.go +++ b/src/internal/connector/exchange/api/contacts.go @@ -179,10 +179,10 @@ func (p *contactPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) { func (c Contacts) GetAddedAndRemovedItemIDs( ctx context.Context, user, directoryID, oldDelta string, -) ([]DeltaResult, DeltaUpdate, error) { +) ([]string, []string, DeltaUpdate, error) { service, err := c.service() if err != nil { - return nil, DeltaUpdate{}, err + return nil, nil, DeltaUpdate{}, err } var ( @@ -192,22 +192,22 @@ func (c Contacts) GetAddedAndRemovedItemIDs( options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"}) if err != nil { - return nil, DeltaUpdate{}, errors.Wrap(err, "getting query options") + return nil, nil, DeltaUpdate{}, errors.Wrap(err, "getting query options") } if len(oldDelta) > 0 { builder := users.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, service.Adapter()) pgr := &contactPager{service, builder, options} - items, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) // note: happy path, not the error condition if err == nil { - return items, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil() + return added, removed, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil() } // only return on error if it is NOT a delta issue. // on bad deltas we retry the call with the regular builder if graph.IsErrInvalidDelta(err) == nil { - return nil, DeltaUpdate{}, err + return nil, nil, DeltaUpdate{}, err } resetDelta = true @@ -217,10 +217,10 @@ func (c Contacts) GetAddedAndRemovedItemIDs( builder := service.Client().UsersById(user).ContactFoldersById(directoryID).Contacts().Delta() pgr := &contactPager{service, builder, options} - items, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) if err != nil { - return nil, DeltaUpdate{}, err + return nil, nil, DeltaUpdate{}, err } - return items, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() + return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() } diff --git a/src/internal/connector/exchange/api/events.go b/src/internal/connector/exchange/api/events.go index a9065a686..bd37a361a 100644 --- a/src/internal/connector/exchange/api/events.go +++ b/src/internal/connector/exchange/api/events.go @@ -165,29 +165,29 @@ func (p *eventPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) { func (c Events) GetAddedAndRemovedItemIDs( ctx context.Context, user, calendarID, oldDelta string, -) ([]DeltaResult, DeltaUpdate, error) { +) ([]string, []string, DeltaUpdate, error) { service, err := c.service() if err != nil { - return nil, DeltaUpdate{}, err + return nil, nil, DeltaUpdate{}, err } var errs *multierror.Error options, err := optionsForEventsByCalendar([]string{"id"}) if err != nil { - return nil, DeltaUpdate{}, err + return nil, nil, DeltaUpdate{}, err } builder := service.Client().UsersById(user).CalendarsById(calendarID).Events() pgr := &eventPager{service, builder, options} - items, _, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + added, _, _, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) if err != nil { - return nil, DeltaUpdate{}, err + return nil, nil, DeltaUpdate{}, err } // Events don't have a delta endpoint so just return an empty string. - return items, DeltaUpdate{}, errs.ErrorOrNil() + return added, nil, DeltaUpdate{}, errs.ErrorOrNil() } // --------------------------------------------------------------------------- diff --git a/src/internal/connector/exchange/api/mail.go b/src/internal/connector/exchange/api/mail.go index 9c6b34155..bf6739384 100644 --- a/src/internal/connector/exchange/api/mail.go +++ b/src/internal/connector/exchange/api/mail.go @@ -177,10 +177,10 @@ func (p *mailPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) { func (c Mail) GetAddedAndRemovedItemIDs( ctx context.Context, user, directoryID, oldDelta string, -) ([]DeltaResult, DeltaUpdate, error) { +) ([]string, []string, DeltaUpdate, error) { service, err := c.service() if err != nil { - return nil, DeltaUpdate{}, err + return nil, nil, DeltaUpdate{}, err } var ( @@ -191,22 +191,22 @@ func (c Mail) GetAddedAndRemovedItemIDs( options, err := optionsForFolderMessagesDelta([]string{"isRead"}) if err != nil { - return nil, DeltaUpdate{}, errors.Wrap(err, "getting query options") + return nil, nil, DeltaUpdate{}, errors.Wrap(err, "getting query options") } if len(oldDelta) > 0 { builder := users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, service.Adapter()) pgr := &mailPager{service, builder, options} - items, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) // note: happy path, not the error condition if err == nil { - return items, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil() + return added, removed, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil() } // only return on error if it is NOT a delta issue. // on bad deltas we retry the call with the regular builder if graph.IsErrInvalidDelta(err) == nil { - return nil, DeltaUpdate{}, err + return nil, nil, DeltaUpdate{}, err } resetDelta = true @@ -216,10 +216,10 @@ func (c Mail) GetAddedAndRemovedItemIDs( builder := service.Client().UsersById(user).MailFoldersById(directoryID).Messages().Delta() pgr := &mailPager{service, builder, options} - items, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) if err != nil { - return nil, DeltaUpdate{}, err + return nil, nil, DeltaUpdate{}, err } - return items, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() + return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() } diff --git a/src/internal/connector/exchange/api/shared.go b/src/internal/connector/exchange/api/shared.go index dbb1d13fc..c77e21fa8 100644 --- a/src/internal/connector/exchange/api/shared.go +++ b/src/internal/connector/exchange/api/shared.go @@ -61,9 +61,10 @@ func toValues[T any](a any) ([]getIDAndAddtler, error) { func getItemsAddedAndRemovedFromContainer( ctx context.Context, pager itemPager, -) ([]DeltaResult, string, error) { +) ([]string, []string, string, error) { var ( - foundItems = []DeltaResult{} + addedIDs = []string{} + removedIDs = []string{} deltaURL string ) @@ -72,37 +73,33 @@ func getItemsAddedAndRemovedFromContainer( resp, err := pager.getPage(ctx) if err != nil { if err := graph.IsErrDeletedInFlight(err); err != nil { - return nil, deltaURL, err + return nil, nil, deltaURL, err } if err := graph.IsErrInvalidDelta(err); err != nil { - return nil, deltaURL, err + return nil, nil, deltaURL, err } - return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) + return nil, nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } // each category type responds with a different interface, but all // of them comply with GetValue, which is where we'll get our item data. items, err := pager.valuesIn(resp) if err != nil { - return nil, "", err + return nil, nil, "", err } // iterate through the items in the page for _, item := range items { - newItem := DeltaResult{ - ID: *item.GetId(), - } - // if the additional data conains a `@removed` key, the value will either // be 'changed' or 'deleted'. We don't really care about the cause: both // cases are handled the same way in storage. - if item.GetAdditionalData()[graph.AddtlDataRemoved] != nil { - newItem.Deleted = true + if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil { + addedIDs = append(addedIDs, *item.GetId()) + } else { + removedIDs = append(removedIDs, *item.GetId()) } - - foundItems = append(foundItems, newItem) } // the deltaLink is kind of like a cursor for overall data state. @@ -125,5 +122,5 @@ func getItemsAddedAndRemovedFromContainer( pager.setNext(*nextLink) } - return foundItems, deltaURL, nil + return addedIDs, removedIDs, deltaURL, nil } diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index b893092b9..70f2190c5 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -19,7 +19,7 @@ type addedAndRemovedItemIDsGetter interface { GetAddedAndRemovedItemIDs( ctx context.Context, user, containerID, oldDeltaToken string, - ) ([]api.DeltaResult, api.DeltaUpdate, error) + ) ([]string, []string, api.DeltaUpdate, error) } // filterContainersAndFillCollections is a utility function @@ -93,7 +93,7 @@ func filterContainersAndFillCollections( } } - items, newDelta, err := getter.GetAddedAndRemovedItemIDs(ctx, qp.ResourceOwner, cID, prevDelta) + added, removed, newDelta, err := getter.GetAddedAndRemovedItemIDs(ctx, qp.ResourceOwner, cID, prevDelta) if err != nil { // note == nil check; only catches non-inFlight error cases. if graph.IsErrDeletedInFlight(err) == nil { @@ -126,20 +126,16 @@ func filterContainersAndFillCollections( collections[cID] = &edc - // This results in "last one wins" if there's duplicate entries for an ID - // and some are deleted while some are added. - for _, i := range items { - m := edc.added - del := edc.removed + for _, add := range added { + edc.added[add] = struct{}{} + } - if i.Deleted { - m = edc.removed - del = edc.added - } - - m[i.ID] = struct{}{} - - delete(del, i.ID) + // Remove any deleted IDs from the set of added IDs because items that are + // deleted and then restored will have a different ID than they did + // originally. + for _, remove := range removed { + delete(edc.added, remove) + edc.removed[remove] = struct{}{} } // add the current path for the container ID to be used in the next backup diff --git a/src/internal/connector/exchange/service_iterators_test.go b/src/internal/connector/exchange/service_iterators_test.go index 2c1329c2f..e1872b55c 100644 --- a/src/internal/connector/exchange/service_iterators_test.go +++ b/src/internal/connector/exchange/service_iterators_test.go @@ -30,7 +30,8 @@ var _ addedAndRemovedItemIDsGetter = &mockGetter{} type ( mockGetter map[string]mockGetterResults mockGetterResults struct { - items []api.DeltaResult + added []string + removed []string newDelta api.DeltaUpdate err error } @@ -40,16 +41,17 @@ func (mg mockGetter) GetAddedAndRemovedItemIDs( ctx context.Context, userID, cID, prevDelta string, ) ( - []api.DeltaResult, + []string, + []string, api.DeltaUpdate, error, ) { results, ok := mg[cID] if !ok { - return nil, api.DeltaUpdate{}, errors.New("mock not found for " + cID) + return nil, nil, api.DeltaUpdate{}, errors.New("mock not found for " + cID) } - return results.items, results.newDelta, results.err + return results.added, results.removed, results.newDelta, results.err } var _ graph.ContainerResolver = &mockResolver{} @@ -110,25 +112,20 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { statusUpdater = func(*support.ConnectorOperationStatus) {} allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0] dps = DeltaPaths{} // incrementals are tested separately - getterItems = []api.DeltaResult{ - {ID: "a1"}, - {ID: "a2"}, - {ID: "a3"}, - {ID: "r1", Deleted: true}, - {ID: "r2", Deleted: true}, - {ID: "r3", Deleted: true}, - } - commonResult = mockGetterResults{ - items: getterItems, + commonResult = mockGetterResults{ + added: []string{"a1", "a2", "a3"}, + removed: []string{"r1", "r2", "r3"}, newDelta: api.DeltaUpdate{URL: "delta_url"}, } errorResult = mockGetterResults{ - items: getterItems, + added: []string{"a1", "a2", "a3"}, + removed: []string{"r1", "r2", "r3"}, newDelta: api.DeltaUpdate{URL: "delta_url"}, err: assert.AnError, } deletedInFlightResult = mockGetterResults{ - items: getterItems, + added: []string{"a1", "a2", "a3"}, + removed: []string{"r1", "r2", "r3"}, newDelta: api.DeltaUpdate{URL: "delta_url"}, err: graph.ErrDeletedInFlight{Err: *common.EncapsulateError(assert.AnError)}, } @@ -336,136 +333,81 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { exColl, ok := coll.(*Collection) require.True(t, ok, "collection is an *exchange.Collection") - expectAdded := map[string]struct{}{} - expectRemoved := map[string]struct{}{} + ids := [][]string{ + make([]string, 0, len(exColl.added)), + make([]string, 0, len(exColl.removed)), + } - for _, i := range expect.items { - if i.Deleted { - expectRemoved[i.ID] = struct{}{} - } else { - expectAdded[i.ID] = struct{}{} + for i, cIDs := range []map[string]struct{}{exColl.added, exColl.removed} { + for id := range cIDs { + ids[i] = append(ids[i], id) } } - assert.Equal(t, expectAdded, exColl.added, "added items") - assert.Equal(t, expectRemoved, exColl.removed, "removed items") + assert.ElementsMatch(t, expect.added, ids[0], "added items") + assert.ElementsMatch(t, expect.removed, ids[1], "removed items") } }) } } func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repeatedItems() { - var ( - userID = "user_id" - qp = graph.QueryParams{ - Category: path.EmailCategory, // doesn't matter which one we use. - ResourceOwner: userID, - Credentials: suite.creds, - } - statusUpdater = func(*support.ConnectorOperationStatus) {} - allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0] - dps = DeltaPaths{} // incrementals are tested separately - delta = api.DeltaUpdate{URL: "delta_url"} - container1 = mockContainer{ - id: strPtr("1"), - displayName: strPtr("display_name_1"), - p: path.Builder{}.Append("display_name_1"), - } - ) + newDelta := api.DeltaUpdate{URL: "delta_url"} table := []struct { - name string - getter mockGetter - resolver graph.ContainerResolver - scope selectors.ExchangeScope - failFast bool - expectErr assert.ErrorAssertionFunc - expectNewColls int - expectMetadataColls int - expectAdded map[string]struct{} - expectRemoved map[string]struct{} + name string + getter mockGetter + expectAdded map[string]struct{} + expectRemoved map[string]struct{} }{ { - name: "repeated add", + name: "repeated adds", getter: map[string]mockGetterResults{ "1": { - items: []api.DeltaResult{ - {ID: "a1"}, - {ID: "a1"}, - }, - newDelta: delta, + added: []string{"a1", "a2", "a3", "a1"}, + newDelta: newDelta, }, }, - resolver: newMockResolver(container1), - scope: allScope, - expectErr: assert.NoError, - expectNewColls: 1, - expectMetadataColls: 1, - expectAdded: map[string]struct{}{"a1": {}}, - // Avoid failures for nil map. + expectAdded: map[string]struct{}{ + "a1": {}, + "a2": {}, + "a3": {}, + }, expectRemoved: map[string]struct{}{}, }, { - name: "repeated remove", + name: "repeated removes", getter: map[string]mockGetterResults{ "1": { - items: []api.DeltaResult{ - {ID: "a1", Deleted: true}, - {ID: "a1", Deleted: true}, - }, - newDelta: delta, + removed: []string{"r1", "r2", "r3", "r1"}, + newDelta: newDelta, }, }, - resolver: newMockResolver(container1), - scope: allScope, - expectErr: assert.NoError, - expectNewColls: 1, - expectMetadataColls: 1, - expectAdded: map[string]struct{}{}, - expectRemoved: map[string]struct{}{"a1": {}}, + expectAdded: map[string]struct{}{}, + expectRemoved: map[string]struct{}{ + "r1": {}, + "r2": {}, + "r3": {}, + }, }, { - name: "interleaved, final remove", + name: "remove for same item wins", getter: map[string]mockGetterResults{ "1": { - items: []api.DeltaResult{ - {ID: "a1"}, - {ID: "a1", Deleted: true}, - {ID: "a1"}, - {ID: "a1", Deleted: true}, - }, - newDelta: delta, + added: []string{"i1", "a2", "a3"}, + removed: []string{"i1", "r2", "r3"}, + newDelta: newDelta, }, }, - resolver: newMockResolver(container1), - scope: allScope, - expectErr: assert.NoError, - expectNewColls: 1, - expectMetadataColls: 1, - expectAdded: map[string]struct{}{}, - expectRemoved: map[string]struct{}{"a1": {}}, - }, - { - name: "interleaved, final add", - getter: map[string]mockGetterResults{ - "1": { - items: []api.DeltaResult{ - {ID: "a1"}, - {ID: "a1", Deleted: true}, - {ID: "a1"}, - {ID: "a1", Deleted: true}, - {ID: "a1"}, - }, - newDelta: delta, - }, + expectAdded: map[string]struct{}{ + "a2": {}, + "a3": {}, + }, + expectRemoved: map[string]struct{}{ + "i1": {}, + "r2": {}, + "r3": {}, }, - resolver: newMockResolver(container1), - scope: allScope, - expectErr: assert.NoError, - expectNewColls: 1, - expectMetadataColls: 1, - expectAdded: map[string]struct{}{"a1": {}}, - expectRemoved: map[string]struct{}{}, }, } for _, test := range table { @@ -473,6 +415,24 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea ctx, flush := tester.NewContext() defer flush() + var ( + userID = "user_id" + qp = graph.QueryParams{ + Category: path.EmailCategory, // doesn't matter which one we use. + ResourceOwner: userID, + Credentials: suite.creds, + } + statusUpdater = func(*support.ConnectorOperationStatus) {} + allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0] + dps = DeltaPaths{} // incrementals are tested separately + container1 = mockContainer{ + id: strPtr("1"), + displayName: strPtr("display_name_1"), + p: path.Builder{}.Append("display_name_1"), + } + resolver = newMockResolver(container1) + ) + collections := map[string]data.Collection{} err := filterContainersAndFillCollections( @@ -481,12 +441,12 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea test.getter, collections, statusUpdater, - test.resolver, - test.scope, + resolver, + allScope, dps, - control.Options{FailFast: test.failFast}, + control.Options{FailFast: true}, ) - test.expectErr(t, err) + require.NoError(t, err) // collection assertions @@ -508,17 +468,26 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea if c.DoNotMergeItems() { doNotMerges++ } - - exColl, ok := c.(*Collection) - require.True(t, ok, "collection is an *exchange.Collection") - - assert.Equal(t, test.expectAdded, exColl.added) - assert.Equal(t, test.expectRemoved, exColl.removed) } assert.Zero(t, deleteds, "deleted collections") - assert.Equal(t, test.expectMetadataColls, metadatas, "metadata collections") - assert.Equal(t, test.expectNewColls, news, "new collections") + assert.Equal(t, 1, news, "new collections") + assert.Equal(t, 1, metadatas, "metadata collections") + assert.Zero(t, doNotMerges, "doNotMerge collections") + + // items in collections assertions + for k := range test.getter { + coll := collections[k] + if !assert.NotNilf(t, coll, "missing collection for path %s", k) { + continue + } + + exColl, ok := coll.(*Collection) + require.True(t, ok, "collection is an *exchange.Collection") + + assert.Equal(t, test.expectAdded, exColl.added, "added items") + assert.Equal(t, test.expectRemoved, exColl.removed, "removed items") + } }) } } @@ -536,11 +505,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre statusUpdater = func(*support.ConnectorOperationStatus) {} allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0] commonResults = mockGetterResults{ - items: []api.DeltaResult{{ID: "added"}}, + added: []string{"added"}, newDelta: api.DeltaUpdate{URL: "new_delta_url"}, } expiredResults = mockGetterResults{ - items: []api.DeltaResult{{ID: "added"}}, + added: []string{"added"}, newDelta: api.DeltaUpdate{ URL: "new_delta_url", Reset: true, diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index 6a0a62fd2..5876cd331 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -941,35 +941,19 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchangeIncrementals() { switch category { case path.EmailCategory: - ids, _, err := ac.Mail().GetAddedAndRemovedItemIDs(ctx, suite.user, containerID, "") + ids, _, _, err := ac.Mail().GetAddedAndRemovedItemIDs(ctx, suite.user, containerID, "") require.NoError(t, err, "getting message ids") require.NotEmpty(t, ids, "message ids in folder") - var idx int - - for _, item := range ids { - if item.Deleted { - idx++ - } - } - - err = cli.MessagesById(ids[idx].ID).Delete(ctx, nil) + err = cli.MessagesById(ids[0]).Delete(ctx, nil) require.NoError(t, err, "deleting email item: %s", support.ConnectorStackErrorTrace(err)) case path.ContactsCategory: - ids, _, err := ac.Contacts().GetAddedAndRemovedItemIDs(ctx, suite.user, containerID, "") + ids, _, _, err := ac.Contacts().GetAddedAndRemovedItemIDs(ctx, suite.user, containerID, "") require.NoError(t, err, "getting contact ids") require.NotEmpty(t, ids, "contact ids in folder") - var idx int - - for _, item := range ids { - if item.Deleted { - idx++ - } - } - - err = cli.ContactsById(ids[idx].ID).Delete(ctx, nil) + err = cli.ContactsById(ids[0]).Delete(ctx, nil) require.NoError(t, err, "deleting contact item: %s", support.ConnectorStackErrorTrace(err)) } }