diff --git a/src/internal/connector/exchange/api/api.go b/src/internal/connector/exchange/api/api.go index 3fd15409f..999eb6c98 100644 --- a/src/internal/connector/exchange/api/api.go +++ b/src/internal/connector/exchange/api/api.go @@ -24,6 +24,15 @@ type DeltaUpdate struct { Reset bool } +// DeltaResult contains the ID and whether the item referenced by the ID was +// deleted. This allows functions that fetch items for a folder to return a +// single consolidated stream which is easier to dedupe as the order between +// add/update and delete operations is known. +type DeltaResult struct { + ID string + Deleted bool +} + // GraphQuery represents functions which perform exchange-specific queries // into M365 backstore. Responses -> returned items will only contain the information // that is included in the options diff --git a/src/internal/connector/exchange/api/contacts.go b/src/internal/connector/exchange/api/contacts.go index ab41ff4b3..9a2b3c3dd 100644 --- a/src/internal/connector/exchange/api/contacts.go +++ b/src/internal/connector/exchange/api/contacts.go @@ -179,10 +179,10 @@ func (p *contactPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) { func (c Contacts) GetAddedAndRemovedItemIDs( ctx context.Context, user, directoryID, oldDelta string, -) ([]string, []string, DeltaUpdate, error) { +) ([]DeltaResult, DeltaUpdate, error) { service, err := c.service() if err != nil { - return nil, nil, DeltaUpdate{}, err + return nil, DeltaUpdate{}, err } var ( @@ -192,22 +192,22 @@ func (c Contacts) GetAddedAndRemovedItemIDs( options, err := optionsForContactFoldersItemDelta([]string{"parentFolderId"}) if err != nil { - return nil, nil, DeltaUpdate{}, errors.Wrap(err, "getting query options") + return nil, DeltaUpdate{}, errors.Wrap(err, "getting query options") } if len(oldDelta) > 0 { builder := users.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, service.Adapter()) pgr := &contactPager{service, builder, options} - added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + items, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) // note: happy path, not the error condition if err == nil { - return added, removed, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil() + return items, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil() } // only return on error if it is NOT a delta issue. // on bad deltas we retry the call with the regular builder if graph.IsErrInvalidDelta(err) == nil { - return nil, nil, DeltaUpdate{}, err + return nil, DeltaUpdate{}, err } resetDelta = true @@ -217,10 +217,10 @@ func (c Contacts) GetAddedAndRemovedItemIDs( builder := service.Client().UsersById(user).ContactFoldersById(directoryID).Contacts().Delta() pgr := &contactPager{service, builder, options} - added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + items, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) if err != nil { - return nil, nil, DeltaUpdate{}, err + return nil, DeltaUpdate{}, err } - return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() + return items, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() } diff --git a/src/internal/connector/exchange/api/events.go b/src/internal/connector/exchange/api/events.go index bd37a361a..a9065a686 100644 --- a/src/internal/connector/exchange/api/events.go +++ b/src/internal/connector/exchange/api/events.go @@ -165,29 +165,29 @@ func (p *eventPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) { func (c Events) GetAddedAndRemovedItemIDs( ctx context.Context, user, calendarID, oldDelta string, -) ([]string, []string, DeltaUpdate, error) { +) ([]DeltaResult, DeltaUpdate, error) { service, err := c.service() if err != nil { - return nil, nil, DeltaUpdate{}, err + return nil, DeltaUpdate{}, err } var errs *multierror.Error options, err := optionsForEventsByCalendar([]string{"id"}) if err != nil { - return nil, nil, DeltaUpdate{}, err + return nil, DeltaUpdate{}, err } builder := service.Client().UsersById(user).CalendarsById(calendarID).Events() pgr := &eventPager{service, builder, options} - added, _, _, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + items, _, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) if err != nil { - return nil, nil, DeltaUpdate{}, err + return nil, DeltaUpdate{}, err } // Events don't have a delta endpoint so just return an empty string. - return added, nil, DeltaUpdate{}, errs.ErrorOrNil() + return items, DeltaUpdate{}, errs.ErrorOrNil() } // --------------------------------------------------------------------------- diff --git a/src/internal/connector/exchange/api/mail.go b/src/internal/connector/exchange/api/mail.go index bf6739384..9c6b34155 100644 --- a/src/internal/connector/exchange/api/mail.go +++ b/src/internal/connector/exchange/api/mail.go @@ -177,10 +177,10 @@ func (p *mailPager) valuesIn(pl pageLinker) ([]getIDAndAddtler, error) { func (c Mail) GetAddedAndRemovedItemIDs( ctx context.Context, user, directoryID, oldDelta string, -) ([]string, []string, DeltaUpdate, error) { +) ([]DeltaResult, DeltaUpdate, error) { service, err := c.service() if err != nil { - return nil, nil, DeltaUpdate{}, err + return nil, DeltaUpdate{}, err } var ( @@ -191,22 +191,22 @@ func (c Mail) GetAddedAndRemovedItemIDs( options, err := optionsForFolderMessagesDelta([]string{"isRead"}) if err != nil { - return nil, nil, DeltaUpdate{}, errors.Wrap(err, "getting query options") + return nil, DeltaUpdate{}, errors.Wrap(err, "getting query options") } if len(oldDelta) > 0 { builder := users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, service.Adapter()) pgr := &mailPager{service, builder, options} - added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + items, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) // note: happy path, not the error condition if err == nil { - return added, removed, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil() + return items, DeltaUpdate{deltaURL, false}, errs.ErrorOrNil() } // only return on error if it is NOT a delta issue. // on bad deltas we retry the call with the regular builder if graph.IsErrInvalidDelta(err) == nil { - return nil, nil, DeltaUpdate{}, err + return nil, DeltaUpdate{}, err } resetDelta = true @@ -216,10 +216,10 @@ func (c Mail) GetAddedAndRemovedItemIDs( builder := service.Client().UsersById(user).MailFoldersById(directoryID).Messages().Delta() pgr := &mailPager{service, builder, options} - added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + items, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) if err != nil { - return nil, nil, DeltaUpdate{}, err + return nil, DeltaUpdate{}, err } - return added, removed, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() + return items, DeltaUpdate{deltaURL, resetDelta}, errs.ErrorOrNil() } diff --git a/src/internal/connector/exchange/api/shared.go b/src/internal/connector/exchange/api/shared.go index c77e21fa8..dbb1d13fc 100644 --- a/src/internal/connector/exchange/api/shared.go +++ b/src/internal/connector/exchange/api/shared.go @@ -61,10 +61,9 @@ func toValues[T any](a any) ([]getIDAndAddtler, error) { func getItemsAddedAndRemovedFromContainer( ctx context.Context, pager itemPager, -) ([]string, []string, string, error) { +) ([]DeltaResult, string, error) { var ( - addedIDs = []string{} - removedIDs = []string{} + foundItems = []DeltaResult{} deltaURL string ) @@ -73,33 +72,37 @@ func getItemsAddedAndRemovedFromContainer( resp, err := pager.getPage(ctx) if err != nil { if err := graph.IsErrDeletedInFlight(err); err != nil { - return nil, nil, deltaURL, err + return nil, deltaURL, err } if err := graph.IsErrInvalidDelta(err); err != nil { - return nil, nil, deltaURL, err + return nil, deltaURL, err } - return nil, nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) + return nil, deltaURL, errors.Wrap(err, support.ConnectorStackErrorTrace(err)) } // each category type responds with a different interface, but all // of them comply with GetValue, which is where we'll get our item data. items, err := pager.valuesIn(resp) if err != nil { - return nil, nil, "", err + return nil, "", err } // iterate through the items in the page for _, item := range items { + newItem := DeltaResult{ + ID: *item.GetId(), + } + // if the additional data conains a `@removed` key, the value will either // be 'changed' or 'deleted'. We don't really care about the cause: both // cases are handled the same way in storage. - if item.GetAdditionalData()[graph.AddtlDataRemoved] == nil { - addedIDs = append(addedIDs, *item.GetId()) - } else { - removedIDs = append(removedIDs, *item.GetId()) + if item.GetAdditionalData()[graph.AddtlDataRemoved] != nil { + newItem.Deleted = true } + + foundItems = append(foundItems, newItem) } // the deltaLink is kind of like a cursor for overall data state. @@ -122,5 +125,5 @@ func getItemsAddedAndRemovedFromContainer( pager.setNext(*nextLink) } - return addedIDs, removedIDs, deltaURL, nil + return foundItems, deltaURL, nil } diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index 8668e86b0..c98854f4a 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -53,9 +53,9 @@ type Collection struct { data chan data.Stream // added is a list of existing item IDs that were added to a container - added []string + added map[string]struct{} // removed is a list of item IDs that were deleted from, or moved out, of a container - removed []string + removed map[string]struct{} // service - client/adapter pair used to access M365 back store service graph.Servicer @@ -102,8 +102,8 @@ func NewCollection( data: make(chan data.Stream, collectionChannelBufferSize), doNotMergeItems: doNotMergeItems, fullPath: curr, - added: make([]string, 0), - removed: make([]string, 0), + added: make(map[string]struct{}, 0), + removed: make(map[string]struct{}, 0), prevPath: prev, service: service, state: stateOf(prev, curr), @@ -222,7 +222,7 @@ func (col *Collection) streamItems(ctx context.Context) { } // delete all removed items - for _, id := range col.removed { + for id := range col.removed { semaphoreCh <- struct{}{} wg.Add(1) @@ -247,7 +247,7 @@ func (col *Collection) streamItems(ctx context.Context) { } // add any new items - for _, id := range col.added { + for id := range col.added { if col.ctrl.FailFast && errs != nil { break } diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index 9a3ef3be7..b893092b9 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -19,7 +19,7 @@ type addedAndRemovedItemIDsGetter interface { GetAddedAndRemovedItemIDs( ctx context.Context, user, containerID, oldDeltaToken string, - ) ([]string, []string, api.DeltaUpdate, error) + ) ([]api.DeltaResult, api.DeltaUpdate, error) } // filterContainersAndFillCollections is a utility function @@ -93,7 +93,7 @@ func filterContainersAndFillCollections( } } - added, removed, newDelta, err := getter.GetAddedAndRemovedItemIDs(ctx, qp.ResourceOwner, cID, prevDelta) + items, newDelta, err := getter.GetAddedAndRemovedItemIDs(ctx, qp.ResourceOwner, cID, prevDelta) if err != nil { // note == nil check; only catches non-inFlight error cases. if graph.IsErrDeletedInFlight(err) == nil { @@ -125,8 +125,22 @@ func filterContainersAndFillCollections( newDelta.Reset) collections[cID] = &edc - edc.added = append(edc.added, added...) - edc.removed = append(edc.removed, removed...) + + // This results in "last one wins" if there's duplicate entries for an ID + // and some are deleted while some are added. + for _, i := range items { + m := edc.added + del := edc.removed + + if i.Deleted { + m = edc.removed + del = edc.added + } + + m[i.ID] = struct{}{} + + delete(del, i.ID) + } // add the current path for the container ID to be used in the next backup // as the "previous path", for reference in case of a rename or relocation. diff --git a/src/internal/connector/exchange/service_iterators_test.go b/src/internal/connector/exchange/service_iterators_test.go index d7cbee9e0..2c1329c2f 100644 --- a/src/internal/connector/exchange/service_iterators_test.go +++ b/src/internal/connector/exchange/service_iterators_test.go @@ -30,8 +30,7 @@ var _ addedAndRemovedItemIDsGetter = &mockGetter{} type ( mockGetter map[string]mockGetterResults mockGetterResults struct { - added []string - removed []string + items []api.DeltaResult newDelta api.DeltaUpdate err error } @@ -41,17 +40,16 @@ func (mg mockGetter) GetAddedAndRemovedItemIDs( ctx context.Context, userID, cID, prevDelta string, ) ( - []string, - []string, + []api.DeltaResult, api.DeltaUpdate, error, ) { results, ok := mg[cID] if !ok { - return nil, nil, api.DeltaUpdate{}, errors.New("mock not found for " + cID) + return nil, api.DeltaUpdate{}, errors.New("mock not found for " + cID) } - return results.added, results.removed, results.newDelta, results.err + return results.items, results.newDelta, results.err } var _ graph.ContainerResolver = &mockResolver{} @@ -112,20 +110,25 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { statusUpdater = func(*support.ConnectorOperationStatus) {} allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0] dps = DeltaPaths{} // incrementals are tested separately - commonResult = mockGetterResults{ - added: []string{"a1", "a2", "a3"}, - removed: []string{"r1", "r2", "r3"}, + getterItems = []api.DeltaResult{ + {ID: "a1"}, + {ID: "a2"}, + {ID: "a3"}, + {ID: "r1", Deleted: true}, + {ID: "r2", Deleted: true}, + {ID: "r3", Deleted: true}, + } + commonResult = mockGetterResults{ + items: getterItems, newDelta: api.DeltaUpdate{URL: "delta_url"}, } errorResult = mockGetterResults{ - added: []string{"a1", "a2", "a3"}, - removed: []string{"r1", "r2", "r3"}, + items: getterItems, newDelta: api.DeltaUpdate{URL: "delta_url"}, err: assert.AnError, } deletedInFlightResult = mockGetterResults{ - added: []string{"a1", "a2", "a3"}, - removed: []string{"r1", "r2", "r3"}, + items: getterItems, newDelta: api.DeltaUpdate{URL: "delta_url"}, err: graph.ErrDeletedInFlight{Err: *common.EncapsulateError(assert.AnError)}, } @@ -333,13 +336,193 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { exColl, ok := coll.(*Collection) require.True(t, ok, "collection is an *exchange.Collection") - assert.ElementsMatch(t, expect.added, exColl.added, "added items") - assert.ElementsMatch(t, expect.removed, exColl.removed, "removed items") + expectAdded := map[string]struct{}{} + expectRemoved := map[string]struct{}{} + + for _, i := range expect.items { + if i.Deleted { + expectRemoved[i.ID] = struct{}{} + } else { + expectAdded[i.ID] = struct{}{} + } + } + + assert.Equal(t, expectAdded, exColl.added, "added items") + assert.Equal(t, expectRemoved, exColl.removed, "removed items") } }) } } +func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repeatedItems() { + var ( + userID = "user_id" + qp = graph.QueryParams{ + Category: path.EmailCategory, // doesn't matter which one we use. + ResourceOwner: userID, + Credentials: suite.creds, + } + statusUpdater = func(*support.ConnectorOperationStatus) {} + allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0] + dps = DeltaPaths{} // incrementals are tested separately + delta = api.DeltaUpdate{URL: "delta_url"} + container1 = mockContainer{ + id: strPtr("1"), + displayName: strPtr("display_name_1"), + p: path.Builder{}.Append("display_name_1"), + } + ) + + table := []struct { + name string + getter mockGetter + resolver graph.ContainerResolver + scope selectors.ExchangeScope + failFast bool + expectErr assert.ErrorAssertionFunc + expectNewColls int + expectMetadataColls int + expectAdded map[string]struct{} + expectRemoved map[string]struct{} + }{ + { + name: "repeated add", + getter: map[string]mockGetterResults{ + "1": { + items: []api.DeltaResult{ + {ID: "a1"}, + {ID: "a1"}, + }, + newDelta: delta, + }, + }, + resolver: newMockResolver(container1), + scope: allScope, + expectErr: assert.NoError, + expectNewColls: 1, + expectMetadataColls: 1, + expectAdded: map[string]struct{}{"a1": {}}, + // Avoid failures for nil map. + expectRemoved: map[string]struct{}{}, + }, + { + name: "repeated remove", + getter: map[string]mockGetterResults{ + "1": { + items: []api.DeltaResult{ + {ID: "a1", Deleted: true}, + {ID: "a1", Deleted: true}, + }, + newDelta: delta, + }, + }, + resolver: newMockResolver(container1), + scope: allScope, + expectErr: assert.NoError, + expectNewColls: 1, + expectMetadataColls: 1, + expectAdded: map[string]struct{}{}, + expectRemoved: map[string]struct{}{"a1": {}}, + }, + { + name: "interleaved, final remove", + getter: map[string]mockGetterResults{ + "1": { + items: []api.DeltaResult{ + {ID: "a1"}, + {ID: "a1", Deleted: true}, + {ID: "a1"}, + {ID: "a1", Deleted: true}, + }, + newDelta: delta, + }, + }, + resolver: newMockResolver(container1), + scope: allScope, + expectErr: assert.NoError, + expectNewColls: 1, + expectMetadataColls: 1, + expectAdded: map[string]struct{}{}, + expectRemoved: map[string]struct{}{"a1": {}}, + }, + { + name: "interleaved, final add", + getter: map[string]mockGetterResults{ + "1": { + items: []api.DeltaResult{ + {ID: "a1"}, + {ID: "a1", Deleted: true}, + {ID: "a1"}, + {ID: "a1", Deleted: true}, + {ID: "a1"}, + }, + newDelta: delta, + }, + }, + resolver: newMockResolver(container1), + scope: allScope, + expectErr: assert.NoError, + expectNewColls: 1, + expectMetadataColls: 1, + expectAdded: map[string]struct{}{"a1": {}}, + expectRemoved: map[string]struct{}{}, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + ctx, flush := tester.NewContext() + defer flush() + + collections := map[string]data.Collection{} + + err := filterContainersAndFillCollections( + ctx, + qp, + test.getter, + collections, + statusUpdater, + test.resolver, + test.scope, + dps, + control.Options{FailFast: test.failFast}, + ) + test.expectErr(t, err) + + // collection assertions + + deleteds, news, metadatas, doNotMerges := 0, 0, 0, 0 + for _, c := range collections { + if c.FullPath().Service() == path.ExchangeMetadataService { + metadatas++ + continue + } + + if c.State() == data.DeletedState { + deleteds++ + } + + if c.State() == data.NewState { + news++ + } + + if c.DoNotMergeItems() { + doNotMerges++ + } + + exColl, ok := c.(*Collection) + require.True(t, ok, "collection is an *exchange.Collection") + + assert.Equal(t, test.expectAdded, exColl.added) + assert.Equal(t, test.expectRemoved, exColl.removed) + } + + assert.Zero(t, deleteds, "deleted collections") + assert.Equal(t, test.expectMetadataColls, metadatas, "metadata collections") + assert.Equal(t, test.expectNewColls, news, "new collections") + }) + } +} + func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incrementals() { var ( userID = "user_id" @@ -353,11 +536,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre statusUpdater = func(*support.ConnectorOperationStatus) {} allScope = selectors.NewExchangeBackup(nil).MailFolders(selectors.Any())[0] commonResults = mockGetterResults{ - added: []string{"added"}, + items: []api.DeltaResult{{ID: "added"}}, newDelta: api.DeltaUpdate{URL: "new_delta_url"}, } expiredResults = mockGetterResults{ - added: []string{"added"}, + items: []api.DeltaResult{{ID: "added"}}, newDelta: api.DeltaUpdate{ URL: "new_delta_url", Reset: true, diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index 5876cd331..6a0a62fd2 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -941,19 +941,35 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchangeIncrementals() { switch category { case path.EmailCategory: - ids, _, _, err := ac.Mail().GetAddedAndRemovedItemIDs(ctx, suite.user, containerID, "") + ids, _, err := ac.Mail().GetAddedAndRemovedItemIDs(ctx, suite.user, containerID, "") require.NoError(t, err, "getting message ids") require.NotEmpty(t, ids, "message ids in folder") - err = cli.MessagesById(ids[0]).Delete(ctx, nil) + var idx int + + for _, item := range ids { + if item.Deleted { + idx++ + } + } + + err = cli.MessagesById(ids[idx].ID).Delete(ctx, nil) require.NoError(t, err, "deleting email item: %s", support.ConnectorStackErrorTrace(err)) case path.ContactsCategory: - ids, _, _, err := ac.Contacts().GetAddedAndRemovedItemIDs(ctx, suite.user, containerID, "") + ids, _, err := ac.Contacts().GetAddedAndRemovedItemIDs(ctx, suite.user, containerID, "") require.NoError(t, err, "getting contact ids") require.NotEmpty(t, ids, "contact ids in folder") - err = cli.ContactsById(ids[0]).Delete(ctx, nil) + var idx int + + for _, item := range ids { + if item.Deleted { + idx++ + } + } + + err = cli.ContactsById(ids[idx].ID).Delete(ctx, nil) require.NoError(t, err, "deleting contact item: %s", support.ConnectorStackErrorTrace(err)) } }