diff --git a/.github/workflows/sanity-test.yaml b/.github/workflows/sanity-test.yaml index c2dcc4aaa..d33593923 100644 --- a/.github/workflows/sanity-test.yaml +++ b/.github/workflows/sanity-test.yaml @@ -253,6 +253,99 @@ jobs: set -euo pipefail ./sanityTest + # non-delta backup + - name: Backup exchange incremental without delta + id: exchange-incremental-test-no-delta + run: | + set -euo pipefail + echo -e "\nBackup Exchange incremental test without delta\n" >> ${CORSO_LOG_FILE} + ./corso backup create exchange \ + --no-stats \ + --hide-progress \ + --disable-delta \ + --mailbox "${TEST_USER}" \ + --json \ + 2>&1 | tee $TEST_RESULT/backup_exchange_incremental.txt + + resultjson=$(sed -e '1,/Completed Backups/d' $TEST_RESULT/backup_exchange_incremental.txt ) + + if [[ $( echo $resultjson | jq -r '.[0] | .stats.errorCount') -ne 0 ]]; then + echo "backup was not successful" + exit 1 + fi + + echo result=$( echo $resultjson | jq -r '.[0] | .id' ) >> $GITHUB_OUTPUT + + # restore from non delta + - name: Backup non delta exchange restore + id: exchange-non-delta-restore-test + run: | + set -euo pipefail + echo -e "\nBackup Exchange incremental without delta restore test\n" >> ${CORSO_LOG_FILE} + ./corso restore exchange \ + --no-stats \ + --hide-progress \ + --backup "${{ steps.exchange-incremental-test-no-delta.outputs.result }}" \ + --email-folder Corso_Restore_st_${{ steps.repo-init.outputs.result }} \ + 2>&1 | tee $TEST_RESULT/exchange-incremantal-restore-test.txt + echo result=$(grep -i -e 'Restoring to folder ' $TEST_RESULT/exchange-incremantal-restore-test.txt | sed "s/Restoring to folder//" ) >> $GITHUB_OUTPUT + + - name: Restoration check + env: + SANITY_RESTORE_FOLDER: ${{ steps.exchange-non-delta-restore-test.outputs.result }} + SANITY_RESTORE_SERVICE: "exchange" + TEST_DATA: Corso_Restore_st_${{ steps.repo-init.outputs.result }} + BASE_BACKUP: ${{ steps.exchange-restore-test.outputs.result }} + run: | + set -euo pipefail + ./sanityTest + + # incremental backup after non-delta + - name: Backup exchange incremental after non-delta + id: exchange-incremental-test-after-non-delta + run: | + set -euo pipefail + echo -e "\nBackup Exchange incremental test after non-delta\n" >> ${CORSO_LOG_FILE} + ./corso backup create exchange \ + --no-stats \ + --hide-progress \ + --mailbox "${TEST_USER}" \ + --json \ + 2>&1 | tee $TEST_RESULT/backup_exchange_incremental_after_non_delta.txt + + resultjson=$(sed -e '1,/Completed Backups/d' $TEST_RESULT/backup_exchange_incremental_after_non_delta.txt ) + + if [[ $( echo $resultjson | jq -r '.[0] | .stats.errorCount') -ne 0 ]]; then + echo "backup was not successful" + exit 1 + fi + + echo result=$( echo $resultjson | jq -r '.[0] | .id' ) >> $GITHUB_OUTPUT + + # restore from incremental + - name: Backup incremantal exchange restore after non-delta + id: exchange-incremantal-restore-test-after-non-delta + run: | + set -euo pipefail + echo -e "\nBackup Exchange incremental restore test after non-delta\n" >> ${CORSO_LOG_FILE} + ./corso restore exchange \ + --no-stats \ + --hide-progress \ + --backup "${{ steps.exchange-incremental-test-after-non-delta.outputs.result }}" \ + --email-folder Corso_Restore_st_${{ steps.repo-init.outputs.result }} \ + 2>&1 | tee $TEST_RESULT/exchange-incremantal-restore-test-after-non-delta.txt + echo result=$(grep -i -e 'Restoring to folder ' $TEST_RESULT/exchange-incremantal-restore-test-after-non-delta.txt | sed "s/Restoring to folder//" ) >> $GITHUB_OUTPUT + + - name: Restoration check + env: + SANITY_RESTORE_FOLDER: ${{ steps.exchange-incremantal-restore-test-after-non-delta.outputs.result }} + SANITY_RESTORE_SERVICE: "exchange" + TEST_DATA: Corso_Restore_st_${{ steps.repo-init.outputs.result }} + BASE_BACKUP: ${{ steps.exchange-restore-test.outputs.result }} + run: | + set -euo pipefail + ./sanityTest + ########################################################################################################################################## # Onedrive @@ -383,7 +476,7 @@ jobs: --user "${TEST_USER}" \ --json \ 2>&1 | tee $TEST_RESULT/backup_onedrive_incremental.txt - + resultjson=$(sed -e '1,/Completed Backups/d' $TEST_RESULT/backup_onedrive_incremental.txt ) if [[ $( echo $resultjson | jq -r '.[0] | .stats.errorCount') -ne 0 ]]; then @@ -393,7 +486,7 @@ jobs: data=$( echo $resultjson | jq -r '.[0] | .id' ) echo result=$data >> $GITHUB_OUTPUT - + # restore from incremental - name: Backup onedrive restore id: onedrive-incremental-restore-test diff --git a/CHANGELOG.md b/CHANGELOG.md index 3140e6ea1..eabd0da96 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Released the --mask-sensitive-data flag, which will automatically obscure private data in logs. +- Added `--disable-delta` flag to disable delta based backups for Exchange ### Fixed - Graph requests now automatically retry in case of a Bad Gateway or Gateway Timeout. @@ -21,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - OneDrive and SharePoint file downloads will properly redirect from 3xx responses. - Refined oneDrive rate limiter controls to reduce throttling errors. - Fix handling of duplicate folders at the same hierarchy level in Exchange. Duplicate folders will be merged during restore operations. +- Fix backup for mailboxes that has used up all their storage quota ### Known Issues - Restore operations will merge duplicate Exchange folders at the same hierarchy level into a single folder. @@ -52,6 +54,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - The CORSO_LOG_FILE env is appropriately utilized if no --log-file flag is provided. - Fixed Exchange events progress output to show calendar names instead of IDs. - Fixed reporting no items match if restoring or listing details on an older Exchange backup and filtering by folder. +- Fix backup for mailboxes that has used up all their storage quota ### Known Issues - Restoring a OneDrive or SharePoint file with the same name as a file with that name as its M365 ID may restore both items. diff --git a/src/cli/backup/exchange.go b/src/cli/backup/exchange.go index fd0a56bec..ded194a05 100644 --- a/src/cli/backup/exchange.go +++ b/src/cli/backup/exchange.go @@ -87,6 +87,7 @@ func addExchangeCommands(cmd *cobra.Command) *cobra.Command { options.AddFetchParallelismFlag(c) options.AddFailFastFlag(c) options.AddDisableIncrementalsFlag(c) + options.AddDisableDeltaFlag(c) options.AddEnableImmutableIDFlag(c) options.AddDisableConcurrencyLimiterFlag(c) diff --git a/src/cli/backup/exchange_test.go b/src/cli/backup/exchange_test.go index f4b864cd6..d8d4f9e68 100644 --- a/src/cli/backup/exchange_test.go +++ b/src/cli/backup/exchange_test.go @@ -43,6 +43,7 @@ func (suite *ExchangeUnitSuite) TestAddExchangeCommands() { utils.UserFN, utils.CategoryDataFN, options.DisableIncrementalsFN, + options.DisableDeltaFN, options.FailFastFN, options.FetchParallelismFN, options.SkipReduceFN, diff --git a/src/cli/options/options.go b/src/cli/options/options.go index 8c091b682..ac76b41b8 100644 --- a/src/cli/options/options.go +++ b/src/cli/options/options.go @@ -18,6 +18,7 @@ func Control() control.Options { opt.RestorePermissions = restorePermissionsFV opt.SkipReduce = skipReduceFV opt.ToggleFeatures.DisableIncrementals = disableIncrementalsFV + opt.ToggleFeatures.DisableDelta = disableDeltaFV opt.ToggleFeatures.ExchangeImmutableIDs = enableImmutableID opt.ToggleFeatures.DisableConcurrencyLimiter = disableConcurrencyLimiterFV opt.Parallelism.ItemFetch = fetchParallelismFV @@ -35,6 +36,7 @@ const ( NoStatsFN = "no-stats" RestorePermissionsFN = "restore-permissions" SkipReduceFN = "skip-reduce" + DisableDeltaFN = "disable-delta" DisableIncrementalsFN = "disable-incrementals" EnableImmutableIDFN = "enable-immutable-id" DisableConcurrencyLimiterFN = "disable-concurrency-limiter" @@ -92,7 +94,10 @@ func AddFetchParallelismFlag(cmd *cobra.Command) { // Feature Flags // --------------------------------------------------------------------------- -var disableIncrementalsFV bool +var ( + disableIncrementalsFV bool + disableDeltaFV bool +) // Adds the hidden '--disable-incrementals' cli flag which, when set, disables // incremental backups. @@ -106,6 +111,18 @@ func AddDisableIncrementalsFlag(cmd *cobra.Command) { cobra.CheckErr(fs.MarkHidden(DisableIncrementalsFN)) } +// Adds the hidden '--disable-delta' cli flag which, when set, disables +// delta based backups. +func AddDisableDeltaFlag(cmd *cobra.Command) { + fs := cmd.Flags() + fs.BoolVar( + &disableDeltaFV, + DisableDeltaFN, + false, + "Disable delta based data retrieval in backups.") + cobra.CheckErr(fs.MarkHidden(DisableDeltaFN)) +} + var enableImmutableID bool // Adds the hidden '--enable-immutable-id' cli flag which, when set, enables diff --git a/src/cli/options/options_test.go b/src/cli/options/options_test.go index 78617f3e1..8538e3441 100644 --- a/src/cli/options/options_test.go +++ b/src/cli/options/options_test.go @@ -28,6 +28,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() { Run: func(cmd *cobra.Command, args []string) { assert.True(t, failFastFV, FailFastFN) assert.True(t, disableIncrementalsFV, DisableIncrementalsFN) + assert.True(t, disableDeltaFV, DisableDeltaFN) assert.True(t, noStatsFV, NoStatsFN) assert.True(t, restorePermissionsFV, RestorePermissionsFN) assert.True(t, skipReduceFV, SkipReduceFN) @@ -41,6 +42,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() { AddFailFastFlag(cmd) AddDisableIncrementalsFlag(cmd) + AddDisableDeltaFlag(cmd) AddRestorePermissionsFlag(cmd) AddSkipReduceFlag(cmd) AddFetchParallelismFlag(cmd) @@ -51,6 +53,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() { "test", "--" + FailFastFN, "--" + DisableIncrementalsFN, + "--" + DisableDeltaFN, "--" + NoStatsFN, "--" + RestorePermissionsFN, "--" + SkipReduceFN, diff --git a/src/internal/connector/data_collections.go b/src/internal/connector/data_collections.go index e66846fef..c57853596 100644 --- a/src/internal/connector/data_collections.go +++ b/src/internal/connector/data_collections.go @@ -21,6 +21,7 @@ import ( "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/filters" + "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/selectors" ) @@ -60,11 +61,12 @@ func (gc *GraphConnector) ProduceBackupCollections( return nil, nil, clues.Stack(err).WithClues(ctx) } - serviceEnabled, err := checkServiceEnabled( + serviceEnabled, canMakeDeltaQueries, err := checkServiceEnabled( ctx, gc.Discovery.Users(), path.ServiceType(sels.Service), - sels.DiscreteOwner) + sels.DiscreteOwner, + ) if err != nil { return nil, nil, err } @@ -78,6 +80,12 @@ func (gc *GraphConnector) ProduceBackupCollections( ssmb *prefixmatcher.StringSetMatcher ) + if !canMakeDeltaQueries { + logger.Ctx(ctx).Info("delta requests not available") + + ctrlOpts.ToggleFeatures.DisableDelta = true + } + switch sels.Service { case selectors.ServiceExchange: colls, ssmb, err = exchange.DataCollections( @@ -171,22 +179,28 @@ func checkServiceEnabled( gi discovery.GetInfoer, service path.ServiceType, resource string, -) (bool, error) { +) (bool, bool, error) { if service == path.SharePointService { // No "enabled" check required for sharepoint - return true, nil + return true, true, nil } info, err := gi.GetInfo(ctx, resource) if err != nil { - return false, err + return false, false, err } if !info.ServiceEnabled(service) { - return false, clues.Wrap(graph.ErrServiceNotEnabled, "checking service access") + return false, false, clues.Wrap(graph.ErrServiceNotEnabled, "checking service access") } - return true, nil + canMakeDeltaQueries := true + if service == path.ExchangeService { + // we currently can only check quota exceeded for exchange + canMakeDeltaQueries = info.CanMakeDeltaQueries() + } + + return true, canMakeDeltaQueries, nil } // ConsumeRestoreCollections restores data from the specified collections diff --git a/src/internal/connector/data_collections_test.go b/src/internal/connector/data_collections_test.go index 3025a385c..649f8c59b 100644 --- a/src/internal/connector/data_collections_test.go +++ b/src/internal/connector/data_collections_test.go @@ -95,44 +95,57 @@ func (suite *DataCollectionIntgSuite) TestExchangeDataCollection() { } for _, test := range tests { - suite.Run(test.name, func() { - t := suite.T() + for _, canMakeDeltaQueries := range []bool{true, false} { + name := test.name - sel := test.getSelector(t) - - collections, excludes, err := exchange.DataCollections( - ctx, - sel, - sel, - nil, - connector.credentials, - connector.UpdateStatus, - control.Defaults(), - fault.New(true)) - require.NoError(t, err, clues.ToCore(err)) - assert.True(t, excludes.Empty()) - - for range collections { - connector.incrementAwaitingMessages() + if canMakeDeltaQueries { + name += "-delta" + } else { + name += "-non-delta" } - // Categories with delta endpoints will produce a collection for metadata - // as well as the actual data pulled, and the "temp" root collection. - assert.GreaterOrEqual(t, len(collections), 1, "expected 1 <= num collections <= 2") - assert.GreaterOrEqual(t, 3, len(collections), "expected 1 <= num collections <= 3") + suite.Run(name, func() { + t := suite.T() - for _, col := range collections { - for object := range col.Items(ctx, fault.New(true)) { - buf := &bytes.Buffer{} - _, err := buf.ReadFrom(object.ToReader()) - assert.NoError(t, err, "received a buf.Read error", clues.ToCore(err)) + sel := test.getSelector(t) + + ctrlOpts := control.Defaults() + ctrlOpts.ToggleFeatures.DisableDelta = !canMakeDeltaQueries + + collections, excludes, err := exchange.DataCollections( + ctx, + sel, + sel, + nil, + connector.credentials, + connector.UpdateStatus, + ctrlOpts, + fault.New(true)) + require.NoError(t, err, clues.ToCore(err)) + assert.True(t, excludes.Empty()) + + for range collections { + connector.incrementAwaitingMessages() } - } - status := connector.Wait() - assert.NotZero(t, status.Successes) - t.Log(status.String()) - }) + // Categories with delta endpoints will produce a collection for metadata + // as well as the actual data pulled, and the "temp" root collection. + assert.GreaterOrEqual(t, len(collections), 1, "expected 1 <= num collections <= 2") + assert.GreaterOrEqual(t, 3, len(collections), "expected 1 <= num collections <= 3") + + for _, col := range collections { + for object := range col.Items(ctx, fault.New(true)) { + buf := &bytes.Buffer{} + _, err := buf.ReadFrom(object.ToReader()) + assert.NoError(t, err, "received a buf.Read error", clues.ToCore(err)) + } + } + + status := connector.Wait() + assert.NotZero(t, status.Successes) + t.Log(status.String()) + }) + } } } diff --git a/src/internal/connector/exchange/api/contacts.go b/src/internal/connector/exchange/api/contacts.go index 78d6d7366..9b08fcd82 100644 --- a/src/internal/connector/exchange/api/contacts.go +++ b/src/internal/connector/exchange/api/contacts.go @@ -191,77 +191,35 @@ var _ itemPager = &contactPager{} type contactPager struct { gs graph.Servicer - builder *users.ItemContactFoldersItemContactsDeltaRequestBuilder - options *users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration + builder *users.ItemContactFoldersItemContactsRequestBuilder + options *users.ItemContactFoldersItemContactsRequestBuilderGetRequestConfiguration } -func (p *contactPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) { - resp, err := p.builder.Get(ctx, p.options) - if err != nil { - return nil, graph.Stack(ctx, err) - } - - return resp, nil -} - -func (p *contactPager) setNext(nextLink string) { - p.builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(nextLink, p.gs.Adapter()) -} - -func (p *contactPager) valuesIn(pl api.DeltaPageLinker) ([]getIDAndAddtler, error) { - return toValues[models.Contactable](pl) -} - -func (c Contacts) GetAddedAndRemovedItemIDs( +func NewContactPager( ctx context.Context, - user, directoryID, oldDelta string, + gs graph.Servicer, + user, directoryID string, immutableIDs bool, -) ([]string, []string, DeltaUpdate, error) { - service, err := c.service() +) (itemPager, error) { + selecting, err := buildOptions([]string{"parentFolderId"}, fieldsForContacts) if err != nil { - return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err) + return nil, err } - var resetDelta bool + requestParameters := &users.ItemContactFoldersItemContactsRequestBuilderGetQueryParameters{ + Select: selecting, + } - ctx = clues.Add( - ctx, - "category", selectors.ExchangeContact, - "container_id", directoryID) + options := &users.ItemContactFoldersItemContactsRequestBuilderGetRequestConfiguration{ + QueryParameters: requestParameters, + Headers: buildPreferHeaders(true, immutableIDs), + } - options, err := optionsForContactFoldersItemDelta( - []string{"parentFolderId"}, - immutableIDs) if err != nil { - return nil, - nil, - DeltaUpdate{}, - graph.Wrap(ctx, err, "setting contact folder options") + return &contactPager{}, err } - if len(oldDelta) > 0 { - var ( - builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, service.Adapter()) - pgr = &contactPager{service, builder, options} - ) - - added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) - // note: happy path, not the error condition - if err == nil { - return added, removed, DeltaUpdate{deltaURL, false}, err - } - - // only return on error if it is NOT a delta issue. - // on bad deltas we retry the call with the regular builder - if !graph.IsErrInvalidDelta(err) { - return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err) - } - - resetDelta = true - } - - builder := service.Client().UsersById(user).ContactFoldersById(directoryID).Contacts().Delta() - pgr := &contactPager{service, builder, options} + builder := gs.Client().UsersById(user).ContactFoldersById(directoryID).Contacts() if len(os.Getenv("CORSO_URL_LOGGING")) > 0 { gri, err := builder.ToGetRequestInformation(ctx, options) @@ -273,12 +231,146 @@ func (c Contacts) GetAddedAndRemovedItemIDs( } } - added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + return &contactPager{gs, builder, options}, nil +} + +func (p *contactPager) getPage(ctx context.Context) (api.PageLinker, error) { + resp, err := p.builder.Get(ctx, p.options) if err != nil { - return nil, nil, DeltaUpdate{}, err + return nil, graph.Stack(ctx, err) } - return added, removed, DeltaUpdate{deltaURL, resetDelta}, nil + return resp, nil +} + +func (p *contactPager) setNext(nextLink string) { + p.builder = users.NewItemContactFoldersItemContactsRequestBuilder(nextLink, p.gs.Adapter()) +} + +// non delta pagers don't need reset +func (p *contactPager) reset(context.Context) {} + +func (p *contactPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) { + return toValues[models.Contactable](pl) +} + +// --------------------------------------------------------------------------- +// delta item pager +// --------------------------------------------------------------------------- + +var _ itemPager = &contactDeltaPager{} + +type contactDeltaPager struct { + gs graph.Servicer + user string + directoryID string + builder *users.ItemContactFoldersItemContactsDeltaRequestBuilder + options *users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration +} + +func getContactDeltaBuilder( + ctx context.Context, + gs graph.Servicer, + user string, + directoryID string, + options *users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration, +) *users.ItemContactFoldersItemContactsDeltaRequestBuilder { + builder := gs.Client().UsersById(user).ContactFoldersById(directoryID).Contacts().Delta() + if len(os.Getenv("CORSO_URL_LOGGING")) > 0 { + gri, err := builder.ToGetRequestInformation(ctx, options) + if err != nil { + logger.CtxErr(ctx, err).Error("getting builder info") + } else { + logger.Ctx(ctx). + Infow("builder path-parameters", "path_parameters", gri.PathParameters) + } + } + + return builder +} + +func NewContactDeltaPager( + ctx context.Context, + gs graph.Servicer, + user, directoryID, deltaURL string, + immutableIDs bool, +) (itemPager, error) { + selecting, err := buildOptions([]string{"parentFolderId"}, fieldsForContacts) + if err != nil { + return nil, err + } + + requestParameters := &users.ItemContactFoldersItemContactsDeltaRequestBuilderGetQueryParameters{ + Select: selecting, + } + + options := &users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration{ + QueryParameters: requestParameters, + Headers: buildPreferHeaders(true, immutableIDs), + } + + if err != nil { + return &contactDeltaPager{}, err + } + + var builder *users.ItemContactFoldersItemContactsDeltaRequestBuilder + if deltaURL != "" { + builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(deltaURL, gs.Adapter()) + } else { + builder = getContactDeltaBuilder(ctx, gs, user, directoryID, options) + } + + return &contactDeltaPager{gs, user, directoryID, builder, options}, nil +} + +func (p *contactDeltaPager) getPage(ctx context.Context) (api.PageLinker, error) { + resp, err := p.builder.Get(ctx, p.options) + if err != nil { + return nil, graph.Stack(ctx, err) + } + + return resp, nil +} + +func (p *contactDeltaPager) setNext(nextLink string) { + p.builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(nextLink, p.gs.Adapter()) +} + +func (p *contactDeltaPager) reset(ctx context.Context) { + p.builder = getContactDeltaBuilder(ctx, p.gs, p.user, p.directoryID, p.options) +} + +func (p *contactDeltaPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) { + return toValues[models.Contactable](pl) +} + +func (c Contacts) GetAddedAndRemovedItemIDs( + ctx context.Context, + user, directoryID, oldDelta string, + immutableIDs bool, + canMakeDeltaQueries bool, +) ([]string, []string, DeltaUpdate, error) { + service, err := c.service() + if err != nil { + return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err) + } + + ctx = clues.Add( + ctx, + "category", selectors.ExchangeContact, + "container_id", directoryID) + + pager, err := NewContactPager(ctx, service, user, directoryID, immutableIDs) + if err != nil { + return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating non-delta pager") + } + + deltaPager, err := NewContactDeltaPager(ctx, service, user, directoryID, oldDelta, immutableIDs) + if err != nil { + return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating delta pager") + } + + return getAddedAndRemovedItemIDs(ctx, service, pager, deltaPager, oldDelta, canMakeDeltaQueries) } // --------------------------------------------------------------------------- diff --git a/src/internal/connector/exchange/api/events.go b/src/internal/connector/exchange/api/events.go index cdf05d778..67f47fc22 100644 --- a/src/internal/connector/exchange/api/events.go +++ b/src/internal/connector/exchange/api/events.go @@ -254,23 +254,47 @@ func (c Events) EnumerateContainers( return el.Failure() } +const ( + eventBetaDeltaURLTemplate = "https://graph.microsoft.com/beta/users/%s/calendars/%s/events/delta" +) + // --------------------------------------------------------------------------- // item pager // --------------------------------------------------------------------------- var _ itemPager = &eventPager{} -const ( - eventBetaDeltaURLTemplate = "https://graph.microsoft.com/beta/users/%s/calendars/%s/events/delta" -) - type eventPager struct { gs graph.Servicer - builder *users.ItemCalendarsItemEventsDeltaRequestBuilder - options *users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration + builder *users.ItemCalendarsItemEventsRequestBuilder + options *users.ItemCalendarsItemEventsRequestBuilderGetRequestConfiguration } -func (p *eventPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) { +func NewEventPager( + ctx context.Context, + gs graph.Servicer, + user, calendarID string, + immutableIDs bool, +) (itemPager, error) { + options := &users.ItemCalendarsItemEventsRequestBuilderGetRequestConfiguration{ + Headers: buildPreferHeaders(true, immutableIDs), + } + + builder := gs.Client().UsersById(user).CalendarsById(calendarID).Events() + if len(os.Getenv("CORSO_URL_LOGGING")) > 0 { + gri, err := builder.ToGetRequestInformation(ctx, options) + if err != nil { + logger.CtxErr(ctx, err).Error("getting builder info") + } else { + logger.Ctx(ctx). + Infow("builder path-parameters", "path_parameters", gri.PathParameters) + } + } + + return &eventPager{gs, builder, options}, nil +} + +func (p *eventPager) getPage(ctx context.Context) (api.PageLinker, error) { resp, err := p.builder.Get(ctx, p.options) if err != nil { return nil, graph.Stack(ctx, err) @@ -280,54 +304,58 @@ func (p *eventPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) { } func (p *eventPager) setNext(nextLink string) { - p.builder = users.NewItemCalendarsItemEventsDeltaRequestBuilder(nextLink, p.gs.Adapter()) + p.builder = users.NewItemCalendarsItemEventsRequestBuilder(nextLink, p.gs.Adapter()) } -func (p *eventPager) valuesIn(pl api.DeltaPageLinker) ([]getIDAndAddtler, error) { +// non delta pagers don't need reset +func (p *eventPager) reset(context.Context) {} + +func (p *eventPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) { return toValues[models.Eventable](pl) } -func (c Events) GetAddedAndRemovedItemIDs( +// --------------------------------------------------------------------------- +// delta item pager +// --------------------------------------------------------------------------- + +var _ itemPager = &eventDeltaPager{} + +type eventDeltaPager struct { + gs graph.Servicer + user string + calendarID string + builder *users.ItemCalendarsItemEventsDeltaRequestBuilder + options *users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration +} + +func NewEventDeltaPager( ctx context.Context, - user, calendarID, oldDelta string, + gs graph.Servicer, + user, calendarID, deltaURL string, immutableIDs bool, -) ([]string, []string, DeltaUpdate, error) { - service, err := c.service() - if err != nil { - return nil, nil, DeltaUpdate{}, err +) (itemPager, error) { + options := &users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration{ + Headers: buildPreferHeaders(true, immutableIDs), } - var ( - resetDelta bool - opts = &users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration{ - Headers: buildPreferHeaders(true, immutableIDs), - } - ) + var builder *users.ItemCalendarsItemEventsDeltaRequestBuilder - ctx = clues.Add( - ctx, - "container_id", calendarID) - - if len(oldDelta) > 0 { - var ( - builder = users.NewItemCalendarsItemEventsDeltaRequestBuilder(oldDelta, service.Adapter()) - pgr = &eventPager{service, builder, opts} - ) - - added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) - // note: happy path, not the error condition - if err == nil { - return added, removed, DeltaUpdate{deltaURL, false}, nil - } - // only return on error if it is NOT a delta issue. - // on bad deltas we retry the call with the regular builder - if !graph.IsErrInvalidDelta(err) { - return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err) - } - - resetDelta = true + if deltaURL == "" { + builder = getEventDeltaBuilder(ctx, gs, user, calendarID, options) + } else { + builder = users.NewItemCalendarsItemEventsDeltaRequestBuilder(deltaURL, gs.Adapter()) } + return &eventDeltaPager{gs, user, calendarID, builder, options}, nil +} + +func getEventDeltaBuilder( + ctx context.Context, + gs graph.Servicer, + user string, + calendarID string, + options *users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration, +) *users.ItemCalendarsItemEventsDeltaRequestBuilder { // Graph SDK only supports delta queries against events on the beta version, so we're // manufacturing use of the beta version url to make the call instead. // See: https://learn.microsoft.com/ko-kr/graph/api/event-delta?view=graph-rest-beta&tabs=http @@ -337,11 +365,10 @@ func (c Events) GetAddedAndRemovedItemIDs( // Likewise, the NextLink and DeltaLink odata tags carry our hack forward, so the rest of the code // works as intended (until, at least, we want to _not_ call the beta anymore). rawURL := fmt.Sprintf(eventBetaDeltaURLTemplate, user, calendarID) - builder := users.NewItemCalendarsItemEventsDeltaRequestBuilder(rawURL, service.Adapter()) - pgr := &eventPager{service, builder, opts} + builder := users.NewItemCalendarsItemEventsDeltaRequestBuilder(rawURL, gs.Adapter()) if len(os.Getenv("CORSO_URL_LOGGING")) > 0 { - gri, err := builder.ToGetRequestInformation(ctx, nil) + gri, err := builder.ToGetRequestInformation(ctx, options) if err != nil { logger.CtxErr(ctx, err).Error("getting builder info") } else { @@ -350,13 +377,56 @@ func (c Events) GetAddedAndRemovedItemIDs( } } - added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + return builder +} + +func (p *eventDeltaPager) getPage(ctx context.Context) (api.PageLinker, error) { + resp, err := p.builder.Get(ctx, p.options) + if err != nil { + return nil, graph.Stack(ctx, err) + } + + return resp, nil +} + +func (p *eventDeltaPager) setNext(nextLink string) { + p.builder = users.NewItemCalendarsItemEventsDeltaRequestBuilder(nextLink, p.gs.Adapter()) +} + +func (p *eventDeltaPager) reset(ctx context.Context) { + p.builder = getEventDeltaBuilder(ctx, p.gs, p.user, p.calendarID, p.options) +} + +func (p *eventDeltaPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) { + return toValues[models.Eventable](pl) +} + +func (c Events) GetAddedAndRemovedItemIDs( + ctx context.Context, + user, calendarID, oldDelta string, + immutableIDs bool, + canMakeDeltaQueries bool, +) ([]string, []string, DeltaUpdate, error) { + service, err := c.service() if err != nil { return nil, nil, DeltaUpdate{}, err } - // Events don't have a delta endpoint so just return an empty string. - return added, removed, DeltaUpdate{deltaURL, resetDelta}, nil + ctx = clues.Add( + ctx, + "container_id", calendarID) + + pager, err := NewEventPager(ctx, service, user, calendarID, immutableIDs) + if err != nil { + return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating non-delta pager") + } + + deltaPager, err := NewEventDeltaPager(ctx, service, user, calendarID, oldDelta, immutableIDs) + if err != nil { + return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating delta pager") + } + + return getAddedAndRemovedItemIDs(ctx, service, pager, deltaPager, oldDelta, canMakeDeltaQueries) } // --------------------------------------------------------------------------- diff --git a/src/internal/connector/exchange/api/mail.go b/src/internal/connector/exchange/api/mail.go index 57a561b8a..232a49601 100644 --- a/src/internal/connector/exchange/api/mail.go +++ b/src/internal/connector/exchange/api/mail.go @@ -20,6 +20,10 @@ import ( "github.com/alcionai/corso/src/pkg/selectors" ) +const ( + mailFoldersBetaURLTemplate = "https://graph.microsoft.com/beta/users/%s/mailFolders" +) + // --------------------------------------------------------------------------- // controller // --------------------------------------------------------------------------- @@ -241,6 +245,43 @@ func (c Mail) GetItem( return mail, MailInfo(mail, size), nil } +type mailFolderPager struct { + service graph.Servicer + builder *users.ItemMailFoldersRequestBuilder +} + +func NewMailFolderPager(service graph.Servicer, user string) mailFolderPager { + // v1.0 non delta /mailFolders endpoint does not return any of the nested folders + rawURL := fmt.Sprintf(mailFoldersBetaURLTemplate, user) + builder := users.NewItemMailFoldersRequestBuilder(rawURL, service.Adapter()) + + return mailFolderPager{service, builder} +} + +func (p *mailFolderPager) getPage(ctx context.Context) (api.PageLinker, error) { + page, err := p.builder.Get(ctx, nil) + if err != nil { + return nil, graph.Stack(ctx, err) + } + + return page, nil +} + +func (p *mailFolderPager) setNext(nextLink string) { + p.builder = users.NewItemMailFoldersRequestBuilder(nextLink, p.service.Adapter()) +} + +func (p *mailFolderPager) valuesIn(pl api.PageLinker) ([]models.MailFolderable, error) { + // Ideally this should be `users.ItemMailFoldersResponseable`, but + // that is not a thing as stable returns different result + page, ok := pl.(models.MailFolderCollectionResponseable) + if !ok { + return nil, clues.New("converting to ItemMailFoldersResponseable") + } + + return page.GetValue(), nil +} + // EnumerateContainers iterates through all of the users current // mail folders, converting each to a graph.CacheFolder, and calling // fn(cf) on each one. @@ -258,22 +299,25 @@ func (c Mail) EnumerateContainers( } el := errs.Local() - builder := service.Client(). - UsersById(userID). - MailFolders(). - Delta() + + pgr := NewMailFolderPager(service, userID) for { if el.Failure() != nil { break } - resp, err := builder.Get(ctx, nil) + page, err := pgr.getPage(ctx) if err != nil { return graph.Stack(ctx, err) } - for _, v := range resp.GetValue() { + resp, err := pgr.valuesIn(page) + if err != nil { + return graph.Stack(ctx, err) + } + + for _, v := range resp { if el.Failure() != nil { break } @@ -290,12 +334,12 @@ func (c Mail) EnumerateContainers( } } - link, ok := ptr.ValOK(resp.GetOdataNextLink()) + link, ok := ptr.ValOK(page.GetOdataNextLink()) if !ok { break } - builder = users.NewItemMailFoldersDeltaRequestBuilder(link, service.Adapter()) + pgr.setNext(link) } return el.Failure() @@ -309,77 +353,35 @@ var _ itemPager = &mailPager{} type mailPager struct { gs graph.Servicer - builder *users.ItemMailFoldersItemMessagesDeltaRequestBuilder - options *users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration + builder *users.ItemMailFoldersItemMessagesRequestBuilder + options *users.ItemMailFoldersItemMessagesRequestBuilderGetRequestConfiguration } -func (p *mailPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) { - page, err := p.builder.Get(ctx, p.options) - if err != nil { - return nil, graph.Stack(ctx, err) - } - - return page, nil -} - -func (p *mailPager) setNext(nextLink string) { - p.builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(nextLink, p.gs.Adapter()) -} - -func (p *mailPager) valuesIn(pl api.DeltaPageLinker) ([]getIDAndAddtler, error) { - return toValues[models.Messageable](pl) -} - -func (c Mail) GetAddedAndRemovedItemIDs( +func NewMailPager( ctx context.Context, - user, directoryID, oldDelta string, + gs graph.Servicer, + user, directoryID string, immutableIDs bool, -) ([]string, []string, DeltaUpdate, error) { - service, err := c.service() +) (itemPager, error) { + selecting, err := buildOptions([]string{"isRead"}, fieldsForMessages) if err != nil { - return nil, nil, DeltaUpdate{}, err + return nil, err } - var ( - deltaURL string - resetDelta bool - ) + requestParameters := &users.ItemMailFoldersItemMessagesRequestBuilderGetQueryParameters{ + Select: selecting, + } - ctx = clues.Add( - ctx, - "category", selectors.ExchangeMail, - "container_id", directoryID) + options := &users.ItemMailFoldersItemMessagesRequestBuilderGetRequestConfiguration{ + QueryParameters: requestParameters, + Headers: buildPreferHeaders(true, immutableIDs), + } - options, err := optionsForFolderMessagesDelta([]string{"isRead"}, immutableIDs) if err != nil { - return nil, - nil, - DeltaUpdate{}, - graph.Wrap(ctx, err, "setting contact folder options") + return &mailPager{}, err } - if len(oldDelta) > 0 { - var ( - builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, service.Adapter()) - pgr = &mailPager{service, builder, options} - ) - - added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) - // note: happy path, not the error condition - if err == nil { - return added, removed, DeltaUpdate{deltaURL, false}, err - } - // only return on error if it is NOT a delta issue. - // on bad deltas we retry the call with the regular builder - if !graph.IsErrInvalidDelta(err) { - return nil, nil, DeltaUpdate{}, err - } - - resetDelta = true - } - - builder := service.Client().UsersById(user).MailFoldersById(directoryID).Messages().Delta() - pgr := &mailPager{service, builder, options} + builder := gs.Client().UsersById(user).MailFoldersById(directoryID).Messages() if len(os.Getenv("CORSO_URL_LOGGING")) > 0 { gri, err := builder.ToGetRequestInformation(ctx, options) @@ -391,12 +393,158 @@ func (c Mail) GetAddedAndRemovedItemIDs( } } - added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + return &mailPager{gs, builder, options}, nil +} + +func (p *mailPager) getPage(ctx context.Context) (api.PageLinker, error) { + page, err := p.builder.Get(ctx, p.options) + if err != nil { + return nil, graph.Stack(ctx, err) + } + + return page, nil +} + +func (p *mailPager) setNext(nextLink string) { + p.builder = users.NewItemMailFoldersItemMessagesRequestBuilder(nextLink, p.gs.Adapter()) +} + +// non delta pagers don't have reset +func (p *mailPager) reset(context.Context) {} + +func (p *mailPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) { + return toValues[models.Messageable](pl) +} + +// --------------------------------------------------------------------------- +// delta item pager +// --------------------------------------------------------------------------- + +var _ itemPager = &mailDeltaPager{} + +type mailDeltaPager struct { + gs graph.Servicer + user string + directoryID string + builder *users.ItemMailFoldersItemMessagesDeltaRequestBuilder + options *users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration +} + +func getMailDeltaBuilder( + ctx context.Context, + gs graph.Servicer, + user string, + directoryID string, + options *users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration, +) *users.ItemMailFoldersItemMessagesDeltaRequestBuilder { + builder := gs.Client().UsersById(user).MailFoldersById(directoryID).Messages().Delta() + + if len(os.Getenv("CORSO_URL_LOGGING")) > 0 { + gri, err := builder.ToGetRequestInformation(ctx, options) + if err != nil { + logger.CtxErr(ctx, err).Error("getting builder info") + } else { + logger.Ctx(ctx). + Infow("builder path-parameters", "path_parameters", gri.PathParameters) + } + } + + return builder +} + +func NewMailDeltaPager( + ctx context.Context, + gs graph.Servicer, + user, directoryID, oldDelta string, + immutableIDs bool, +) (itemPager, error) { + selecting, err := buildOptions([]string{"isRead"}, fieldsForMessages) + if err != nil { + return nil, err + } + + requestParameters := &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetQueryParameters{ + Select: selecting, + } + + options := &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration{ + QueryParameters: requestParameters, + Headers: buildPreferHeaders(true, immutableIDs), + } + + if err != nil { + return &mailDeltaPager{}, err + } + + var builder *users.ItemMailFoldersItemMessagesDeltaRequestBuilder + + if len(oldDelta) > 0 { + builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter()) + } else { + builder = getMailDeltaBuilder(ctx, gs, user, directoryID, options) + } + + return &mailDeltaPager{gs, user, directoryID, builder, options}, nil +} + +func (p *mailDeltaPager) getPage(ctx context.Context) (api.PageLinker, error) { + page, err := p.builder.Get(ctx, p.options) + if err != nil { + return nil, graph.Stack(ctx, err) + } + + return page, nil +} + +func (p *mailDeltaPager) setNext(nextLink string) { + p.builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(nextLink, p.gs.Adapter()) +} + +func (p *mailDeltaPager) reset(ctx context.Context) { + p.builder = p.gs.Client().UsersById(p.user).MailFoldersById(p.directoryID).Messages().Delta() + + if len(os.Getenv("CORSO_URL_LOGGING")) > 0 { + gri, err := p.builder.ToGetRequestInformation(ctx, p.options) + if err != nil { + logger.CtxErr(ctx, err).Error("getting builder info") + } else { + logger.Ctx(ctx). + Infow("builder path-parameters", "path_parameters", gri.PathParameters) + } + } +} + +func (p *mailDeltaPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) { + return toValues[models.Messageable](pl) +} + +func (c Mail) GetAddedAndRemovedItemIDs( + ctx context.Context, + user, directoryID, oldDelta string, + immutableIDs bool, + canMakeDeltaQueries bool, +) ([]string, []string, DeltaUpdate, error) { + service, err := c.service() if err != nil { return nil, nil, DeltaUpdate{}, err } - return added, removed, DeltaUpdate{deltaURL, resetDelta}, nil + ctx = clues.Add( + ctx, + "category", selectors.ExchangeMail, + "container_id", directoryID) + + pager, err := NewMailPager(ctx, service, user, directoryID, immutableIDs) + if err != nil { + return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating delta pager") + } + + deltaPager, err := NewMailDeltaPager(ctx, service, user, directoryID, oldDelta, immutableIDs) + if err != nil { + return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating delta pager") + } + + return getAddedAndRemovedItemIDs(ctx, service, pager, deltaPager, oldDelta, canMakeDeltaQueries) } // --------------------------------------------------------------------------- diff --git a/src/internal/connector/exchange/api/options.go b/src/internal/connector/exchange/api/options.go index 54f6bb1e7..ff506e7d5 100644 --- a/src/internal/connector/exchange/api/options.go +++ b/src/internal/connector/exchange/api/options.go @@ -75,27 +75,6 @@ const ( // which reduces the overall latency of complex calls // ----------------------------------------------------------------------- -func optionsForFolderMessagesDelta( - moreOps []string, - immutableIDs bool, -) (*users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration, error) { - selecting, err := buildOptions(moreOps, fieldsForMessages) - if err != nil { - return nil, err - } - - requestParameters := &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetQueryParameters{ - Select: selecting, - } - - options := &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration{ - QueryParameters: requestParameters, - Headers: buildPreferHeaders(true, immutableIDs), - } - - return options, nil -} - // optionsForCalendars places allowed options for exchange.Calendar object // @param moreOps should reflect elements from fieldsForCalendars // @return is first call in Calendars().GetWithRequestConfigurationAndResponseHandler @@ -180,27 +159,6 @@ func optionsForMailFoldersItem( return options, nil } -func optionsForContactFoldersItemDelta( - moreOps []string, - immutableIDs bool, -) (*users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration, error) { - selecting, err := buildOptions(moreOps, fieldsForContacts) - if err != nil { - return nil, err - } - - requestParameters := &users.ItemContactFoldersItemContactsDeltaRequestBuilderGetQueryParameters{ - Select: selecting, - } - - options := &users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration{ - QueryParameters: requestParameters, - Headers: buildPreferHeaders(true, immutableIDs), - } - - return options, nil -} - // optionsForContactChildFolders builds a contacts child folders request. func optionsForContactChildFolders( moreOps []string, diff --git a/src/internal/connector/exchange/api/shared.go b/src/internal/connector/exchange/api/shared.go index 0fbfa33f3..6a9b45cdb 100644 --- a/src/internal/connector/exchange/api/shared.go +++ b/src/internal/connector/exchange/api/shared.go @@ -18,9 +18,16 @@ import ( // --------------------------------------------------------------------------- type itemPager interface { - getPage(context.Context) (api.DeltaPageLinker, error) + // getPage get a page with the specified options from graph + getPage(context.Context) (api.PageLinker, error) + // setNext is used to pass in the next url got from graph setNext(string) - valuesIn(api.DeltaPageLinker) ([]getIDAndAddtler, error) + // reset is used to clear delta url in delta pagers. When + // reset is called, we reset the state(delta url) that we + // currently have and start a new delta query without the token. + reset(context.Context) + // valuesIn gets us the values in a page + valuesIn(api.PageLinker) ([]getIDAndAddtler, error) } type getIDAndAddtler interface { @@ -56,6 +63,54 @@ func toValues[T any](a any) ([]getIDAndAddtler, error) { return r, nil } +func getAddedAndRemovedItemIDs( + ctx context.Context, + service graph.Servicer, + pager itemPager, + deltaPager itemPager, + oldDelta string, + canMakeDeltaQueries bool, +) ([]string, []string, DeltaUpdate, error) { + var ( + pgr itemPager + resetDelta bool + ) + + if canMakeDeltaQueries { + pgr = deltaPager + resetDelta = len(oldDelta) == 0 + } else { + pgr = pager + resetDelta = true + } + + added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) + // note: happy path, not the error condition + if err == nil { + return added, removed, DeltaUpdate{deltaURL, resetDelta}, err + } + + // If we already tried with a non-delta url, we can return + if !canMakeDeltaQueries { + return nil, nil, DeltaUpdate{}, err + } + + // return error if invalid not delta error or oldDelta was empty + if !graph.IsErrInvalidDelta(err) || len(oldDelta) == 0 { + return nil, nil, DeltaUpdate{}, err + } + + // reset deltaPager + pgr.reset(ctx) + + added, removed, deltaURL, err = getItemsAddedAndRemovedFromContainer(ctx, pgr) + if err != nil { + return nil, nil, DeltaUpdate{}, err + } + + return added, removed, DeltaUpdate{deltaURL, true}, nil +} + // generic controller for retrieving all item ids in a container. func getItemsAddedAndRemovedFromContainer( ctx context.Context, @@ -65,6 +120,8 @@ func getItemsAddedAndRemovedFromContainer( addedIDs = []string{} removedIDs = []string{} deltaURL string + nextLink string + deltaLink string ) itemCount := 0 @@ -104,10 +161,20 @@ func getItemsAddedAndRemovedFromContainer( } } - nextLink, delta := api.NextAndDeltaLink(resp) + dresp, ok := resp.(api.DeltaPageLinker) + if ok { + nextLink, deltaLink = api.NextAndDeltaLink(dresp) + } else { + nextLink = api.NextLink(resp) + deltaLink = "" // to make sure we don't use an old value + } + if len(os.Getenv("CORSO_URL_LOGGING")) > 0 { - if !api.IsNextLinkValid(nextLink) || api.IsNextLinkValid(delta) { - logger.Ctx(ctx).Infof("Received invalid link from M365:\nNext Link: %s\nDelta Link: %s\n", nextLink, delta) + if !api.IsNextLinkValid(nextLink) || !api.IsNextLinkValid(deltaLink) { + logger.Ctx(ctx). + With("next_link", graph.LoggableURL(nextLink)). + With("delta_link", graph.LoggableURL(deltaLink)). + Info("invalid link from M365") } } @@ -115,8 +182,8 @@ func getItemsAddedAndRemovedFromContainer( // once we run through pages of nextLinks, the last query will // produce a deltaLink instead (if supported), which we'll use on // the next backup to only get the changes since this run. - if len(delta) > 0 { - deltaURL = delta + if len(deltaLink) > 0 { + deltaURL = deltaLink } // the nextLink is our page cursor within this query. diff --git a/src/internal/connector/exchange/api/shared_test.go b/src/internal/connector/exchange/api/shared_test.go new file mode 100644 index 000000000..447d1b2c8 --- /dev/null +++ b/src/internal/connector/exchange/api/shared_test.go @@ -0,0 +1,256 @@ +package api + +import ( + "context" + "testing" + + "github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/connector/graph" + "github.com/alcionai/corso/src/internal/connector/graph/api" + "github.com/alcionai/corso/src/internal/tester" +) + +type testPagerValue struct { + id string + removed bool +} + +func (v testPagerValue) GetId() *string { return &v.id } //revive:disable-line:var-naming +func (v testPagerValue) GetAdditionalData() map[string]any { + if v.removed { + return map[string]any{graph.AddtlDataRemoved: true} + } + + return map[string]any{} +} + +type testPage struct{} + +func (p testPage) GetOdataNextLink() *string { + next := "" // no next, just one page + return &next +} + +var _ itemPager = &testPager{} + +type testPager struct { + t *testing.T + added []string + removed []string + errorCode string + needsReset bool +} + +func (p *testPager) getPage(ctx context.Context) (api.PageLinker, error) { + if p.errorCode != "" { + ierr := odataerrors.NewMainError() + ierr.SetCode(&p.errorCode) + + err := odataerrors.NewODataError() + err.SetError(ierr) + + return nil, err + } + + return testPage{}, nil +} +func (p *testPager) setNext(string) {} +func (p *testPager) reset(context.Context) { + if !p.needsReset { + require.Fail(p.t, "reset should not be called") + } + + p.needsReset = false + p.errorCode = "" +} + +func (p *testPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) { + items := []getIDAndAddtler{} + + for _, id := range p.added { + items = append(items, testPagerValue{id: id}) + } + + for _, id := range p.removed { + items = append(items, testPagerValue{id: id, removed: true}) + } + + return items, nil +} + +type SharedAPIUnitSuite struct { + tester.Suite +} + +func TestSharedAPIUnitSuite(t *testing.T) { + suite.Run(t, &SharedAPIUnitSuite{Suite: tester.NewUnitSuite(t)}) +} + +func (suite *SharedAPIUnitSuite) TestGetAddedAndRemovedItemIDs() { + tests := []struct { + name string + pagerGetter func(context.Context, graph.Servicer, string, string, bool) (itemPager, error) + deltaPagerGetter func(context.Context, graph.Servicer, string, string, string, bool) (itemPager, error) + added []string + removed []string + deltaUpdate DeltaUpdate + delta string + canMakeDeltaQueries bool + }{ + { + name: "no prev delta", + pagerGetter: func( + ctx context.Context, + gs graph.Servicer, + user string, + directory string, + immutableIDs bool, + ) (itemPager, error) { + // this should not be called + return nil, assert.AnError + }, + deltaPagerGetter: func( + ctx context.Context, + gs graph.Servicer, + user string, + directory string, + delta string, + immutableIDs bool, + ) (itemPager, error) { + return &testPager{ + t: suite.T(), + added: []string{"uno", "dos"}, + removed: []string{"tres", "quatro"}, + }, nil + }, + added: []string{"uno", "dos"}, + removed: []string{"tres", "quatro"}, + deltaUpdate: DeltaUpdate{Reset: true}, + canMakeDeltaQueries: true, + }, + { + name: "with prev delta", + pagerGetter: func( + ctx context.Context, + gs graph.Servicer, + user string, + directory string, + immutableIDs bool, + ) (itemPager, error) { + // this should not be called + return nil, assert.AnError + }, + deltaPagerGetter: func( + ctx context.Context, + gs graph.Servicer, + user string, + directory string, + delta string, + immutableIDs bool, + ) (itemPager, error) { + return &testPager{ + t: suite.T(), + added: []string{"uno", "dos"}, + removed: []string{"tres", "quatro"}, + }, nil + }, + added: []string{"uno", "dos"}, + removed: []string{"tres", "quatro"}, + delta: "delta", + deltaUpdate: DeltaUpdate{Reset: false}, + canMakeDeltaQueries: true, + }, + { + name: "delta expired", + pagerGetter: func( + ctx context.Context, + gs graph.Servicer, + user string, + directory string, + immutableIDs bool, + ) (itemPager, error) { + // this should not be called + return nil, assert.AnError + }, + deltaPagerGetter: func( + ctx context.Context, + gs graph.Servicer, + user string, + directory string, + delta string, + immutableIDs bool, + ) (itemPager, error) { + return &testPager{ + t: suite.T(), + added: []string{"uno", "dos"}, + removed: []string{"tres", "quatro"}, + errorCode: "SyncStateNotFound", + needsReset: true, + }, nil + }, + added: []string{"uno", "dos"}, + removed: []string{"tres", "quatro"}, + delta: "delta", + deltaUpdate: DeltaUpdate{Reset: true}, + canMakeDeltaQueries: true, + }, + { + name: "quota exceeded", + pagerGetter: func( + ctx context.Context, + gs graph.Servicer, + user string, + directory string, + immutableIDs bool, + ) (itemPager, error) { + return &testPager{ + t: suite.T(), + added: []string{"uno", "dos"}, + removed: []string{"tres", "quatro"}, + }, nil + }, + deltaPagerGetter: func( + ctx context.Context, + gs graph.Servicer, + user string, + directory string, + delta string, + immutableIDs bool, + ) (itemPager, error) { + return &testPager{errorCode: "ErrorQuotaExceeded"}, nil + }, + added: []string{"uno", "dos"}, + removed: []string{"tres", "quatro"}, + deltaUpdate: DeltaUpdate{Reset: true}, + canMakeDeltaQueries: false, + }, + } + + for _, tt := range tests { + suite.Run(tt.name, func() { + ctx, flush := tester.NewContext() + defer flush() + + pager, _ := tt.pagerGetter(ctx, graph.Service{}, "user", "directory", false) + deltaPager, _ := tt.deltaPagerGetter(ctx, graph.Service{}, "user", "directory", tt.delta, false) + + added, removed, deltaUpdate, err := getAddedAndRemovedItemIDs( + ctx, + graph.Service{}, + pager, + deltaPager, + tt.delta, + tt.canMakeDeltaQueries, + ) + + require.NoError(suite.T(), err, "getting added and removed item IDs") + require.EqualValues(suite.T(), tt.added, added, "added item IDs") + require.EqualValues(suite.T(), tt.removed, removed, "removed item IDs") + require.Equal(suite.T(), tt.deltaUpdate, deltaUpdate, "delta update") + }) + } +} diff --git a/src/internal/connector/exchange/data_collections.go b/src/internal/connector/exchange/data_collections.go index 66c02cc9f..11b2cb0be 100644 --- a/src/internal/connector/exchange/data_collections.go +++ b/src/internal/connector/exchange/data_collections.go @@ -41,7 +41,7 @@ func (dps DeltaPaths) AddDelta(k, d string) { dp = DeltaPath{} } - dp.delta = d + dp.Delta = d dps[k] = dp } @@ -51,13 +51,13 @@ func (dps DeltaPaths) AddPath(k, p string) { dp = DeltaPath{} } - dp.path = p + dp.Path = p dps[k] = dp } type DeltaPath struct { - delta string - path string + Delta string + Path string } // ParseMetadataCollections produces a map of structs holding delta @@ -148,7 +148,7 @@ func parseMetadataCollections( // complete backup on the next run. for _, dps := range cdp { for k, dp := range dps { - if len(dp.delta) == 0 || len(dp.path) == 0 { + if len(dp.Path) == 0 { delete(dps, k) } } diff --git a/src/internal/connector/exchange/data_collections_test.go b/src/internal/connector/exchange/data_collections_test.go index 2c23747df..f453227af 100644 --- a/src/internal/connector/exchange/data_collections_test.go +++ b/src/internal/connector/exchange/data_collections_test.go @@ -68,7 +68,12 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { data: []fileValues{ {graph.PreviousPathFileName, "prev-path"}, }, - expect: map[string]DeltaPath{}, + expect: map[string]DeltaPath{ + "key": { + Delta: "delta-link", + Path: "prev-path", + }, + }, expectError: assert.NoError, }, { @@ -87,8 +92,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { }, expect: map[string]DeltaPath{ "key": { - delta: "delta-link", - path: "prev-path", + Delta: "delta-link", + Path: "prev-path", }, }, expectError: assert.NoError, @@ -108,7 +113,12 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { {graph.DeltaURLsFileName, ""}, {graph.PreviousPathFileName, "prev-path"}, }, - expect: map[string]DeltaPath{}, + expect: map[string]DeltaPath{ + "key": { + Delta: "delta-link", + Path: "prev-path", + }, + }, expectError: assert.NoError, }, { @@ -119,8 +129,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { }, expect: map[string]DeltaPath{ "key": { - delta: "`!@#$%^&*()_[]{}/\"\\", - path: "prev-path", + Delta: "`!@#$%^&*()_[]{}/\"\\", + Path: "prev-path", }, }, expectError: assert.NoError, @@ -133,8 +143,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { }, expect: map[string]DeltaPath{ "key": { - delta: "\\n\\r\\t\\b\\f\\v\\0\\\\", - path: "prev-path", + Delta: "\\n\\r\\t\\b\\f\\v\\0\\\\", + Path: "prev-path", }, }, expectError: assert.NoError, @@ -150,8 +160,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { }, expect: map[string]DeltaPath{ "key": { - delta: "\\n", - path: "prev-path", + Delta: "\\n", + Path: "prev-path", }, }, expectError: assert.NoError, @@ -191,8 +201,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { assert.Len(t, emails, len(test.expect)) for k, v := range emails { - assert.Equal(t, v.delta, emails[k].delta, "delta") - assert.Equal(t, v.path, emails[k].path, "path") + assert.Equal(t, v.Delta, emails[k].Delta, "delta") + assert.Equal(t, v.Path, emails[k].Path, "path") } }) } @@ -245,9 +255,10 @@ func (suite *DataCollectionsIntegrationSuite) TestMailFetch() { require.NoError(suite.T(), err, clues.ToCore(err)) tests := []struct { - name string - scope selectors.ExchangeScope - folderNames map[string]struct{} + name string + scope selectors.ExchangeScope + folderNames map[string]struct{} + canMakeDeltaQueries bool }{ { name: "Folder Iterative Check Mail", @@ -258,6 +269,18 @@ func (suite *DataCollectionsIntegrationSuite) TestMailFetch() { folderNames: map[string]struct{}{ DefaultMailFolder: {}, }, + canMakeDeltaQueries: true, + }, + { + name: "Folder Iterative Check Mail Non-Delta", + scope: selectors.NewExchangeBackup(users).MailFolders( + []string{DefaultMailFolder}, + selectors.PrefixMatch(), + )[0], + folderNames: map[string]struct{}{ + DefaultMailFolder: {}, + }, + canMakeDeltaQueries: false, }, } @@ -265,13 +288,16 @@ func (suite *DataCollectionsIntegrationSuite) TestMailFetch() { suite.Run(test.name, func() { t := suite.T() + ctrlOpts := control.Defaults() + ctrlOpts.ToggleFeatures.DisableDelta = !test.canMakeDeltaQueries + collections, err := createCollections( ctx, acct, inMock.NewProvider(userID, userID), test.scope, DeltaPaths{}, - control.Defaults(), + ctrlOpts, func(status *support.ConnectorOperationStatus) {}, fault.New(true)) require.NoError(t, err, clues.ToCore(err)) diff --git a/src/internal/connector/exchange/service_iterators.go b/src/internal/connector/exchange/service_iterators.go index 9f707df21..0aa6680fb 100644 --- a/src/internal/connector/exchange/service_iterators.go +++ b/src/internal/connector/exchange/service_iterators.go @@ -23,6 +23,7 @@ type addedAndRemovedItemIDsGetter interface { ctx context.Context, user, containerID, oldDeltaToken string, immutableIDs bool, + canMakeDeltaQueries bool, ) ([]string, []string, api.DeltaUpdate, error) } @@ -85,8 +86,8 @@ func filterContainersAndFillCollections( var ( dp = dps[cID] - prevDelta = dp.delta - prevPathStr = dp.path // do not log: pii; log prevPath instead + prevDelta = dp.Delta + prevPathStr = dp.Path // do not log: pii; log prevPath instead prevPath path.Path ictx = clues.Add( ctx, @@ -119,7 +120,8 @@ func filterContainersAndFillCollections( qp.ResourceOwner.ID(), cID, prevDelta, - ctrlOpts.ToggleFeatures.ExchangeImmutableIDs) + ctrlOpts.ToggleFeatures.ExchangeImmutableIDs, + !ctrlOpts.ToggleFeatures.DisableDelta) if err != nil { if !graph.IsErrDeletedInFlight(err) { el.AddRecoverable(clues.Stack(err).Label(fault.LabelForceNoBackupCreation)) @@ -243,7 +245,7 @@ func makeTombstones(dps DeltaPaths) map[string]string { r := make(map[string]string, len(dps)) for id, v := range dps { - r[id] = v.path + r[id] = v.Path } return r diff --git a/src/internal/connector/exchange/service_iterators_test.go b/src/internal/connector/exchange/service_iterators_test.go index 5b4d11940..7cc784374 100644 --- a/src/internal/connector/exchange/service_iterators_test.go +++ b/src/internal/connector/exchange/service_iterators_test.go @@ -30,7 +30,10 @@ import ( var _ addedAndRemovedItemIDsGetter = &mockGetter{} type ( - mockGetter map[string]mockGetterResults + mockGetter struct { + noReturnDelta bool + results map[string]mockGetterResults + } mockGetterResults struct { added []string removed []string @@ -43,18 +46,24 @@ func (mg mockGetter) GetAddedAndRemovedItemIDs( ctx context.Context, userID, cID, prevDelta string, _ bool, + _ bool, ) ( []string, []string, api.DeltaUpdate, error, ) { - results, ok := mg[cID] + results, ok := mg.results[cID] if !ok { return nil, nil, api.DeltaUpdate{}, clues.New("mock not found for " + cID) } - return results.added, results.removed, results.newDelta, results.err + delta := results.newDelta + if mg.noReturnDelta { + delta.URL = "" + } + + return results.added, results.removed, delta, results.err } var _ graph.ContainerResolver = &mockResolver{} @@ -171,8 +180,10 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { }{ { name: "happy path, one container", - getter: map[string]mockGetterResults{ - "1": commonResult, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": commonResult, + }, }, resolver: newMockResolver(container1), scope: allScope, @@ -182,9 +193,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { }, { name: "happy path, many containers", - getter: map[string]mockGetterResults{ - "1": commonResult, - "2": commonResult, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": commonResult, + "2": commonResult, + }, }, resolver: newMockResolver(container1, container2), scope: allScope, @@ -194,9 +207,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { }, { name: "no containers pass scope", - getter: map[string]mockGetterResults{ - "1": commonResult, - "2": commonResult, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": commonResult, + "2": commonResult, + }, }, resolver: newMockResolver(container1, container2), scope: selectors.NewExchangeBackup(nil).MailFolders(selectors.None())[0], @@ -206,8 +221,10 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { }, { name: "err: deleted in flight", - getter: map[string]mockGetterResults{ - "1": deletedInFlightResult, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": deletedInFlightResult, + }, }, resolver: newMockResolver(container1), scope: allScope, @@ -218,8 +235,10 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { }, { name: "err: other error", - getter: map[string]mockGetterResults{ - "1": errorResult, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": errorResult, + }, }, resolver: newMockResolver(container1), scope: allScope, @@ -229,9 +248,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { }, { name: "half collections error: deleted in flight", - getter: map[string]mockGetterResults{ - "1": deletedInFlightResult, - "2": commonResult, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": deletedInFlightResult, + "2": commonResult, + }, }, resolver: newMockResolver(container1, container2), scope: allScope, @@ -242,9 +263,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { }, { name: "half collections error: other error", - getter: map[string]mockGetterResults{ - "1": errorResult, - "2": commonResult, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": errorResult, + "2": commonResult, + }, }, resolver: newMockResolver(container1, container2), scope: allScope, @@ -254,9 +277,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { }, { name: "half collections error: deleted in flight, fail fast", - getter: map[string]mockGetterResults{ - "1": deletedInFlightResult, - "2": commonResult, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": deletedInFlightResult, + "2": commonResult, + }, }, resolver: newMockResolver(container1, container2), scope: allScope, @@ -268,9 +293,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { }, { name: "half collections error: other error, fail fast", - getter: map[string]mockGetterResults{ - "1": errorResult, - "2": commonResult, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": errorResult, + "2": commonResult, + }, }, resolver: newMockResolver(container1, container2), scope: allScope, @@ -281,77 +308,90 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() { }, } for _, test := range table { - suite.Run(test.name, func() { - t := suite.T() + for _, canMakeDeltaQueries := range []bool{true, false} { + name := test.name - ctx, flush := tester.NewContext() - defer flush() - - collections, err := filterContainersAndFillCollections( - ctx, - qp, - test.getter, - statusUpdater, - test.resolver, - test.scope, - dps, - control.Options{FailureHandling: test.failFast}, - fault.New(test.failFast == control.FailFast)) - test.expectErr(t, err, clues.ToCore(err)) - - // collection assertions - - deleteds, news, metadatas, doNotMerges := 0, 0, 0, 0 - for _, c := range collections { - if c.FullPath().Service() == path.ExchangeMetadataService { - metadatas++ - continue - } - - if c.State() == data.DeletedState { - deleteds++ - } - - if c.State() == data.NewState { - news++ - } - - if c.DoNotMergeItems() { - doNotMerges++ - } + if canMakeDeltaQueries { + name += "-delta" + } else { + name += "-non-delta" } - assert.Zero(t, deleteds, "deleted collections") - assert.Equal(t, test.expectNewColls, news, "new collections") - assert.Equal(t, test.expectMetadataColls, metadatas, "metadata collections") - assert.Equal(t, test.expectDoNotMergeColls, doNotMerges, "doNotMerge collections") + suite.Run(name, func() { + t := suite.T() - // items in collections assertions - for k, expect := range test.getter { - coll := collections[k] + ctx, flush := tester.NewContext() + defer flush() - if coll == nil { - continue - } + ctrlOpts := control.Options{FailureHandling: test.failFast} + ctrlOpts.ToggleFeatures.DisableDelta = !canMakeDeltaQueries - exColl, ok := coll.(*Collection) - require.True(t, ok, "collection is an *exchange.Collection") + collections, err := filterContainersAndFillCollections( + ctx, + qp, + test.getter, + statusUpdater, + test.resolver, + test.scope, + dps, + ctrlOpts, + fault.New(test.failFast == control.FailFast)) + test.expectErr(t, err, clues.ToCore(err)) - ids := [][]string{ - make([]string, 0, len(exColl.added)), - make([]string, 0, len(exColl.removed)), - } + // collection assertions - for i, cIDs := range []map[string]struct{}{exColl.added, exColl.removed} { - for id := range cIDs { - ids[i] = append(ids[i], id) + deleteds, news, metadatas, doNotMerges := 0, 0, 0, 0 + for _, c := range collections { + if c.FullPath().Service() == path.ExchangeMetadataService { + metadatas++ + continue + } + + if c.State() == data.DeletedState { + deleteds++ + } + + if c.State() == data.NewState { + news++ + } + + if c.DoNotMergeItems() { + doNotMerges++ } } - assert.ElementsMatch(t, expect.added, ids[0], "added items") - assert.ElementsMatch(t, expect.removed, ids[1], "removed items") - } - }) + assert.Zero(t, deleteds, "deleted collections") + assert.Equal(t, test.expectNewColls, news, "new collections") + assert.Equal(t, test.expectMetadataColls, metadatas, "metadata collections") + assert.Equal(t, test.expectDoNotMergeColls, doNotMerges, "doNotMerge collections") + + // items in collections assertions + for k, expect := range test.getter.results { + coll := collections[k] + + if coll == nil { + continue + } + + exColl, ok := coll.(*Collection) + require.True(t, ok, "collection is an *exchange.Collection") + + ids := [][]string{ + make([]string, 0, len(exColl.added)), + make([]string, 0, len(exColl.removed)), + } + + for i, cIDs := range []map[string]struct{}{exColl.added, exColl.removed} { + for id := range cIDs { + ids[i] = append(ids[i], id) + } + } + + assert.ElementsMatch(t, expect.added, ids[0], "added items") + assert.ElementsMatch(t, expect.removed, ids[1], "removed items") + } + }) + } } } @@ -488,73 +528,79 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_Dupli }{ { name: "1 moved to duplicate", - getter: map[string]mockGetterResults{ - "1": result1, - "2": result2, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": result1, + "2": result2, + }, }, resolver: newMockResolver(container1, container2), inputMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { return DeltaPaths{ "1": DeltaPath{ - delta: "old_delta", - path: oldPath1(t, cat).String(), + Delta: "old_delta", + Path: oldPath1(t, cat).String(), }, "2": DeltaPath{ - delta: "old_delta", - path: idPath2(t, cat).String(), + Delta: "old_delta", + Path: idPath2(t, cat).String(), }, } }, expectMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { return DeltaPaths{ "1": DeltaPath{ - delta: "delta_url", - path: idPath1(t, cat).String(), + Delta: "delta_url", + Path: idPath1(t, cat).String(), }, "2": DeltaPath{ - delta: "delta_url2", - path: idPath2(t, cat).String(), + Delta: "delta_url2", + Path: idPath2(t, cat).String(), }, } }, }, { name: "both move to duplicate", - getter: map[string]mockGetterResults{ - "1": result1, - "2": result2, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": result1, + "2": result2, + }, }, resolver: newMockResolver(container1, container2), inputMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { return DeltaPaths{ "1": DeltaPath{ - delta: "old_delta", - path: oldPath1(t, cat).String(), + Delta: "old_delta", + Path: oldPath1(t, cat).String(), }, "2": DeltaPath{ - delta: "old_delta", - path: oldPath2(t, cat).String(), + Delta: "old_delta", + Path: oldPath2(t, cat).String(), }, } }, expectMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { return DeltaPaths{ "1": DeltaPath{ - delta: "delta_url", - path: idPath1(t, cat).String(), + Delta: "delta_url", + Path: idPath1(t, cat).String(), }, "2": DeltaPath{ - delta: "delta_url2", - path: idPath2(t, cat).String(), + Delta: "delta_url2", + Path: idPath2(t, cat).String(), }, } }, }, { name: "both new", - getter: map[string]mockGetterResults{ - "1": result1, - "2": result2, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": result1, + "2": result2, + }, }, resolver: newMockResolver(container1, container2), inputMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { @@ -564,27 +610,29 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_Dupli expectMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { return DeltaPaths{ "1": DeltaPath{ - delta: "delta_url", - path: idPath1(t, cat).String(), + Delta: "delta_url", + Path: idPath1(t, cat).String(), }, "2": DeltaPath{ - delta: "delta_url2", - path: idPath2(t, cat).String(), + Delta: "delta_url2", + Path: idPath2(t, cat).String(), }, } }, }, { name: "add 1 remove 2", - getter: map[string]mockGetterResults{ - "1": result1, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": result1, + }, }, resolver: newMockResolver(container1), inputMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { return DeltaPaths{ "2": DeltaPath{ - delta: "old_delta", - path: idPath2(t, cat).String(), + Delta: "old_delta", + Path: idPath2(t, cat).String(), }, } }, @@ -593,8 +641,8 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_Dupli expectMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { return DeltaPaths{ "1": DeltaPath{ - delta: "delta_url", - path: idPath1(t, cat).String(), + Delta: "delta_url", + Path: idPath1(t, cat).String(), }, } }, @@ -649,7 +697,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_Dupli assert.Equal(t, 1, metadatas, "metadata collections") // items in collections assertions - for k, expect := range test.getter { + for k, expect := range test.getter.results { coll := collections[k] if coll == nil { @@ -690,10 +738,12 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea }{ { name: "repeated adds", - getter: map[string]mockGetterResults{ - "1": { - added: []string{"a1", "a2", "a3", "a1"}, - newDelta: newDelta, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": { + added: []string{"a1", "a2", "a3", "a1"}, + newDelta: newDelta, + }, }, }, expectAdded: map[string]struct{}{ @@ -705,10 +755,12 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea }, { name: "repeated removes", - getter: map[string]mockGetterResults{ - "1": { - removed: []string{"r1", "r2", "r3", "r1"}, - newDelta: newDelta, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": { + removed: []string{"r1", "r2", "r3", "r1"}, + newDelta: newDelta, + }, }, }, expectAdded: map[string]struct{}{}, @@ -720,11 +772,13 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea }, { name: "remove for same item wins", - getter: map[string]mockGetterResults{ - "1": { - added: []string{"i1", "a2", "a3"}, - removed: []string{"i1", "r2", "r3"}, - newDelta: newDelta, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": { + added: []string{"i1", "a2", "a3"}, + removed: []string{"i1", "r2", "r3"}, + newDelta: newDelta, + }, }, }, expectAdded: map[string]struct{}{ @@ -806,7 +860,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea assert.Zero(t, doNotMerges, "doNotMerge collections") // items in collections assertions - for k := range test.getter { + for k := range test.getter.results { coll := collections[k] if !assert.NotNilf(t, coll, "missing collection for path %s", k) { continue @@ -822,7 +876,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea } } -func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incrementals() { +func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incrementals_nondelta() { var ( userID = "user_id" tenantID = suite.creds.AzureTenantID @@ -860,16 +914,19 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre } table := []struct { - name string - getter mockGetter - resolver graph.ContainerResolver - dps DeltaPaths - expect map[string]endState + name string + getter mockGetter + resolver graph.ContainerResolver + dps DeltaPaths + expect map[string]endState + skipWhenForcedNoDelta bool }{ { name: "new container", - getter: map[string]mockGetterResults{ - "1": commonResults, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": commonResults, + }, }, resolver: newMockResolver(mockContainer{ id: strPtr("1"), @@ -884,8 +941,10 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }, { name: "not moved container", - getter: map[string]mockGetterResults{ - "1": commonResults, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": commonResults, + }, }, resolver: newMockResolver(mockContainer{ id: strPtr("1"), @@ -895,8 +954,8 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }), dps: DeltaPaths{ "1": DeltaPath{ - delta: "old_delta_url", - path: prevPath(suite.T(), "1", "not_moved").String(), + Delta: "old_delta_url", + Path: prevPath(suite.T(), "1", "not_moved").String(), }, }, expect: map[string]endState{ @@ -905,8 +964,10 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }, { name: "moved container", - getter: map[string]mockGetterResults{ - "1": commonResults, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": commonResults, + }, }, resolver: newMockResolver(mockContainer{ id: strPtr("1"), @@ -916,8 +977,8 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }), dps: DeltaPaths{ "1": DeltaPath{ - delta: "old_delta_url", - path: prevPath(suite.T(), "1", "prev").String(), + Delta: "old_delta_url", + Path: prevPath(suite.T(), "1", "prev").String(), }, }, expect: map[string]endState{ @@ -925,13 +986,15 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }, }, { - name: "deleted container", - getter: map[string]mockGetterResults{}, + name: "deleted container", + getter: mockGetter{ + results: map[string]mockGetterResults{}, + }, resolver: newMockResolver(), dps: DeltaPaths{ "1": DeltaPath{ - delta: "old_delta_url", - path: prevPath(suite.T(), "1", "deleted").String(), + Delta: "old_delta_url", + Path: prevPath(suite.T(), "1", "deleted").String(), }, }, expect: map[string]endState{ @@ -940,8 +1003,10 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }, { name: "one deleted, one new", - getter: map[string]mockGetterResults{ - "2": commonResults, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "2": commonResults, + }, }, resolver: newMockResolver(mockContainer{ id: strPtr("2"), @@ -951,8 +1016,8 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }), dps: DeltaPaths{ "1": DeltaPath{ - delta: "old_delta_url", - path: prevPath(suite.T(), "1", "deleted").String(), + Delta: "old_delta_url", + Path: prevPath(suite.T(), "1", "deleted").String(), }, }, expect: map[string]endState{ @@ -962,8 +1027,10 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }, { name: "one deleted, one new, same path", - getter: map[string]mockGetterResults{ - "2": commonResults, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "2": commonResults, + }, }, resolver: newMockResolver(mockContainer{ id: strPtr("2"), @@ -973,8 +1040,8 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }), dps: DeltaPaths{ "1": DeltaPath{ - delta: "old_delta_url", - path: prevPath(suite.T(), "1", "same").String(), + Delta: "old_delta_url", + Path: prevPath(suite.T(), "1", "same").String(), }, }, expect: map[string]endState{ @@ -984,9 +1051,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }, { name: "one moved, one new, same path", - getter: map[string]mockGetterResults{ - "1": commonResults, - "2": commonResults, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": commonResults, + "2": commonResults, + }, }, resolver: newMockResolver( mockContainer{ @@ -1004,8 +1073,8 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre ), dps: DeltaPaths{ "1": DeltaPath{ - delta: "old_delta_url", - path: prevPath(suite.T(), "1", "prev").String(), + Delta: "old_delta_url", + Path: prevPath(suite.T(), "1", "prev").String(), }, }, expect: map[string]endState{ @@ -1015,8 +1084,10 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }, { name: "bad previous path strings", - getter: map[string]mockGetterResults{ - "1": commonResults, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": commonResults, + }, }, resolver: newMockResolver(mockContainer{ id: strPtr("1"), @@ -1026,12 +1097,12 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }), dps: DeltaPaths{ "1": DeltaPath{ - delta: "old_delta_url", - path: "1/fnords/mc/smarfs", + Delta: "old_delta_url", + Path: "1/fnords/mc/smarfs", }, "2": DeltaPath{ - delta: "old_delta_url", - path: "2/fnords/mc/smarfs", + Delta: "old_delta_url", + Path: "2/fnords/mc/smarfs", }, }, expect: map[string]endState{ @@ -1040,8 +1111,10 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }, { name: "delta expiration", - getter: map[string]mockGetterResults{ - "1": expiredResults, + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": expiredResults, + }, }, resolver: newMockResolver(mockContainer{ id: strPtr("1"), @@ -1051,22 +1124,25 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre }), dps: DeltaPaths{ "1": DeltaPath{ - delta: "old_delta_url", - path: prevPath(suite.T(), "1", "same").String(), + Delta: "old_delta_url", + Path: prevPath(suite.T(), "1", "same").String(), }, }, expect: map[string]endState{ "1": {data.NotMovedState, true}, }, + skipWhenForcedNoDelta: true, // this is not a valid test for non-delta }, { name: "a little bit of everything", - getter: map[string]mockGetterResults{ - "1": commonResults, // new - "2": commonResults, // notMoved - "3": commonResults, // moved - "4": expiredResults, // moved - // "5" gets deleted + getter: mockGetter{ + results: map[string]mockGetterResults{ + "1": commonResults, // new + "2": commonResults, // notMoved + "3": commonResults, // moved + "4": expiredResults, // moved + // "5" gets deleted + }, }, resolver: newMockResolver( mockContainer{ @@ -1096,20 +1172,20 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre ), dps: DeltaPaths{ "2": DeltaPath{ - delta: "old_delta_url", - path: prevPath(suite.T(), "2", "not_moved").String(), + Delta: "old_delta_url", + Path: prevPath(suite.T(), "2", "not_moved").String(), }, "3": DeltaPath{ - delta: "old_delta_url", - path: prevPath(suite.T(), "3", "prev").String(), + Delta: "old_delta_url", + Path: prevPath(suite.T(), "3", "prev").String(), }, "4": DeltaPath{ - delta: "old_delta_url", - path: prevPath(suite.T(), "4", "prev").String(), + Delta: "old_delta_url", + Path: prevPath(suite.T(), "4", "prev").String(), }, "5": DeltaPath{ - delta: "old_delta_url", - path: prevPath(suite.T(), "5", "deleted").String(), + Delta: "old_delta_url", + Path: prevPath(suite.T(), "5", "deleted").String(), }, }, expect: map[string]endState{ @@ -1119,51 +1195,83 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre "4": {data.MovedState, true}, "5": {data.DeletedState, false}, }, + skipWhenForcedNoDelta: true, }, } for _, test := range table { - suite.Run(test.name, func() { - t := suite.T() + for _, deltaBefore := range []bool{true, false} { + for _, deltaAfter := range []bool{true, false} { + name := test.name - ctx, flush := tester.NewContext() - defer flush() - - collections, err := filterContainersAndFillCollections( - ctx, - qp, - test.getter, - statusUpdater, - test.resolver, - allScope, - test.dps, - control.Defaults(), - fault.New(true)) - assert.NoError(t, err, clues.ToCore(err)) - - metadatas := 0 - for _, c := range collections { - p := c.FullPath() - if p == nil { - p = c.PreviousPath() + if deltaAfter { + name += "-delta" + } else { + if test.skipWhenForcedNoDelta { + suite.T().Skip("intentionally skipped non-delta case") + } + name += "-non-delta" } - require.NotNil(t, p) + suite.Run(name, func() { + t := suite.T() - if p.Service() == path.ExchangeMetadataService { - metadatas++ - continue - } + ctx, flush := tester.NewContext() + defer flush() - p0 := p.Folders()[0] + ctrlOpts := control.Defaults() + ctrlOpts.ToggleFeatures.DisableDelta = !deltaAfter - expect, ok := test.expect[p0] - assert.True(t, ok, "collection is expected in result") + getter := test.getter + if !deltaAfter { + getter.noReturnDelta = false + } - assert.Equalf(t, expect.state, c.State(), "collection %s state", p0) - assert.Equalf(t, expect.doNotMerge, c.DoNotMergeItems(), "collection %s DoNotMergeItems", p0) + dps := test.dps + if !deltaBefore { + for k, dp := range dps { + dp.Delta = "" + dps[k] = dp + } + } + + collections, err := filterContainersAndFillCollections( + ctx, + qp, + test.getter, + statusUpdater, + test.resolver, + allScope, + test.dps, + ctrlOpts, + fault.New(true)) + assert.NoError(t, err, clues.ToCore(err)) + + metadatas := 0 + for _, c := range collections { + p := c.FullPath() + if p == nil { + p = c.PreviousPath() + } + + require.NotNil(t, p) + + if p.Service() == path.ExchangeMetadataService { + metadatas++ + continue + } + + p0 := p.Folders()[0] + + expect, ok := test.expect[p0] + assert.True(t, ok, "collection is expected in result") + + assert.Equalf(t, expect.state, c.State(), "collection %s state", p0) + assert.Equalf(t, expect.doNotMerge, c.DoNotMergeItems(), "collection %s DoNotMergeItems", p0) + } + + assert.Equal(t, 1, metadatas, "metadata collections") + }) } - - assert.Equal(t, 1, metadatas, "metadata collections") - }) + } } } diff --git a/src/internal/connector/graph/errors.go b/src/internal/connector/graph/errors.go index 70f2dd416..81886965e 100644 --- a/src/internal/connector/graph/errors.go +++ b/src/internal/connector/graph/errors.go @@ -36,6 +36,7 @@ const ( mailboxNotEnabledForRESTAPI errorCode = "MailboxNotEnabledForRESTAPI" malwareDetected errorCode = "malwareDetected" requestResourceNotFound errorCode = "Request_ResourceNotFound" + quotaExceeded errorCode = "ErrorQuotaExceeded" resourceNotFound errorCode = "ResourceNotFound" resyncRequired errorCode = "ResyncRequired" // alt: resyncRequired syncFolderNotFound errorCode = "ErrorSyncFolderNotFound" @@ -111,6 +112,10 @@ func IsErrInvalidDelta(err error) bool { errors.Is(err, ErrInvalidDelta) } +func IsErrQuotaExceeded(err error) bool { + return hasErrorCode(err, quotaExceeded) +} + func IsErrExchangeMailFolderNotFound(err error) bool { return hasErrorCode(err, resourceNotFound, mailboxNotEnabledForRESTAPI) } diff --git a/src/internal/connector/graph/errors_test.go b/src/internal/connector/graph/errors_test.go index 8706834e7..e04023446 100644 --- a/src/internal/connector/graph/errors_test.go +++ b/src/internal/connector/graph/errors_test.go @@ -161,6 +161,45 @@ func (suite *GraphErrorsUnitSuite) TestIsErrInvalidDelta() { } } +func (suite *GraphErrorsUnitSuite) TestIsErrQuotaExceeded() { + table := []struct { + name string + err error + expect assert.BoolAssertionFunc + }{ + { + name: "nil", + err: nil, + expect: assert.False, + }, + { + name: "non-matching", + err: assert.AnError, + expect: assert.False, + }, + { + name: "as", + err: ErrInvalidDelta, + expect: assert.False, + }, + { + name: "non-matching oDataErr", + err: odErr("fnords"), + expect: assert.False, + }, + { + name: "quota-exceeded oDataErr", + err: odErr("ErrorQuotaExceeded"), + expect: assert.True, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + test.expect(suite.T(), IsErrQuotaExceeded(test.err)) + }) + } +} + func (suite *GraphErrorsUnitSuite) TestIsErrUserNotFound() { table := []struct { name string diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index ddc59e6ce..c7e48f001 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -708,9 +708,15 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() { } } -// TestBackup_Run ensures that Integration Testing works -// for the following scopes: Contacts, Events, and Mail func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { + testExchangeContinuousBackups(suite, control.Toggles{}) +} + +func (suite *BackupOpIntegrationSuite) TestBackup_Run_nonIncrementalExchange() { + testExchangeContinuousBackups(suite, control.Toggles{DisableDelta: true}) +} + +func testExchangeContinuousBackups(suite *BackupOpIntegrationSuite, toggles control.Toggles) { ctx, flush := tester.NewContext() defer flush() @@ -719,7 +725,6 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { var ( t = suite.T() acct = tester.NewM365Account(t) - ffs = control.Toggles{} mb = evmock.NewBus() now = dttm.Now() service = path.ExchangeService @@ -860,7 +865,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { } } - bo, acct, kw, ms, ss, gc, sels, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, ffs, version.Backup) + bo, acct, kw, ms, ss, gc, sels, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, toggles, version.Backup) defer closer() // run the initial backup @@ -946,15 +951,19 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { table := []struct { name string // performs the incremental update required for the test. - updateUserData func(t *testing.T) - itemsRead int - itemsWritten int + updateUserData func(t *testing.T) + deltaItemsRead int + deltaItemsWritten int + nonDeltaItemsRead int + nonDeltaItemsWritten int }{ { - name: "clean incremental, no changes", - updateUserData: func(t *testing.T) {}, - itemsRead: 0, - itemsWritten: 0, + name: "clean, no changes", + updateUserData: func(t *testing.T) {}, + deltaItemsRead: 0, + deltaItemsWritten: 0, + nonDeltaItemsRead: 8, + nonDeltaItemsWritten: 0, // unchanged items are not counted towards write }, { name: "move an email folder to a subfolder", @@ -979,8 +988,10 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { newLoc := expectDeets.MoveLocation(cat.String(), from.locRef, to.locRef) from.locRef = newLoc }, - itemsRead: 0, // zero because we don't count container reads - itemsWritten: 2, + deltaItemsRead: 0, // zero because we don't count container reads + deltaItemsWritten: 2, + nonDeltaItemsRead: 8, + nonDeltaItemsWritten: 2, }, { name: "delete a folder", @@ -1003,8 +1014,10 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { expectDeets.RemoveLocation(category.String(), d.dests[container2].locRef) } }, - itemsRead: 0, - itemsWritten: 0, // deletions are not counted as "writes" + deltaItemsRead: 0, + deltaItemsWritten: 0, // deletions are not counted as "writes" + nonDeltaItemsRead: 4, + nonDeltaItemsWritten: 0, }, { name: "add a new folder", @@ -1053,8 +1066,10 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { } } }, - itemsRead: 4, - itemsWritten: 4, + deltaItemsRead: 4, + deltaItemsWritten: 4, + nonDeltaItemsRead: 8, + nonDeltaItemsWritten: 4, }, { name: "rename a folder", @@ -1111,10 +1126,12 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { } } }, - itemsRead: 0, // containers are not counted as reads + deltaItemsRead: 0, // containers are not counted as reads // Renaming a folder doesn't cause kopia changes as the folder ID doesn't // change. - itemsWritten: 0, + deltaItemsWritten: 0, // two items per category + nonDeltaItemsRead: 8, + nonDeltaItemsWritten: 0, }, { name: "add a new item", @@ -1165,8 +1182,10 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { } } }, - itemsRead: 2, - itemsWritten: 2, + deltaItemsRead: 2, + deltaItemsWritten: 2, + nonDeltaItemsRead: 10, + nonDeltaItemsWritten: 2, }, { name: "delete an existing item", @@ -1177,7 +1196,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { switch category { case path.EmailCategory: - ids, _, _, err := ac.Mail().GetAddedAndRemovedItemIDs(ctx, uidn.ID(), containerID, "", false) + ids, _, _, err := ac.Mail().GetAddedAndRemovedItemIDs(ctx, uidn.ID(), containerID, "", false, true) require.NoError(t, err, "getting message ids", clues.ToCore(err)) require.NotEmpty(t, ids, "message ids in folder") @@ -1190,7 +1209,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { ids[0]) case path.ContactsCategory: - ids, _, _, err := ac.Contacts().GetAddedAndRemovedItemIDs(ctx, uidn.ID(), containerID, "", false) + ids, _, _, err := ac.Contacts().GetAddedAndRemovedItemIDs(ctx, uidn.ID(), containerID, "", false, true) require.NoError(t, err, "getting contact ids", clues.ToCore(err)) require.NotEmpty(t, ids, "contact ids in folder") @@ -1203,7 +1222,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { ids[0]) case path.EventsCategory: - ids, _, _, err := ac.Events().GetAddedAndRemovedItemIDs(ctx, uidn.ID(), containerID, "", false) + ids, _, _, err := ac.Events().GetAddedAndRemovedItemIDs(ctx, uidn.ID(), containerID, "", false, true) require.NoError(t, err, "getting event ids", clues.ToCore(err)) require.NotEmpty(t, ids, "event ids in folder") @@ -1217,16 +1236,19 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { } } }, - itemsRead: 2, - itemsWritten: 0, // deletes are not counted as "writes" + deltaItemsRead: 2, + deltaItemsWritten: 0, // deletes are not counted as "writes" + nonDeltaItemsRead: 8, + nonDeltaItemsWritten: 0, }, } + for _, test := range table { suite.Run(test.name, func() { var ( t = suite.T() incMB = evmock.NewBus() - incBO = newTestBackupOp(t, ctx, kw, ms, gc, acct, sels, incMB, ffs, closer) + incBO = newTestBackupOp(t, ctx, kw, ms, gc, acct, sels, incMB, toggles, closer) atid = m365.AzureTenantID ) @@ -1243,8 +1265,14 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { // do some additional checks to ensure the incremental dealt with fewer items. // +4 on read/writes to account for metadata: 1 delta and 1 path for each type. - assert.Equal(t, test.itemsWritten+4, incBO.Results.ItemsWritten, "incremental items written") - assert.Equal(t, test.itemsRead+4, incBO.Results.ItemsRead, "incremental items read") + if !toggles.DisableDelta { + assert.Equal(t, test.deltaItemsRead+4, incBO.Results.ItemsRead, "incremental items read") + assert.Equal(t, test.deltaItemsWritten+4, incBO.Results.ItemsWritten, "incremental items written") + } else { + assert.Equal(t, test.nonDeltaItemsRead+4, incBO.Results.ItemsRead, "non delta items read") + assert.Equal(t, test.nonDeltaItemsWritten+4, incBO.Results.ItemsWritten, "non delta items written") + } + assert.NoError(t, incBO.Errors.Failure(), "incremental non-recoverable error", clues.ToCore(incBO.Errors.Failure())) assert.Empty(t, incBO.Errors.Recovered(), "incremental recoverable/iteration errors") assert.Equal(t, 1, incMB.TimesCalled[events.BackupStart], "incremental backup-start events") diff --git a/src/pkg/backup/details/details.go b/src/pkg/backup/details/details.go index a96045f9b..273191ebd 100644 --- a/src/pkg/backup/details/details.go +++ b/src/pkg/backup/details/details.go @@ -626,11 +626,9 @@ const ( func UpdateItem(item *ItemInfo, newLocPath *path.Builder) { // Only OneDrive and SharePoint have information about parent folders // contained in them. - var updatePath func(newLocPath *path.Builder) - // Can't switch based on infoType because that's been unstable. if item.Exchange != nil { - updatePath = item.Exchange.UpdateParentPath + item.Exchange.UpdateParentPath(newLocPath) } else if item.SharePoint != nil { // SharePoint used to store library items with the OneDriveItem ItemType. // Start switching them over as we see them since there's no point in @@ -639,14 +637,10 @@ func UpdateItem(item *ItemInfo, newLocPath *path.Builder) { item.SharePoint.ItemType = SharePointLibrary } - updatePath = item.SharePoint.UpdateParentPath + item.SharePoint.UpdateParentPath(newLocPath) } else if item.OneDrive != nil { - updatePath = item.OneDrive.UpdateParentPath - } else { - return + item.OneDrive.UpdateParentPath(newLocPath) } - - updatePath(newLocPath) } // ItemInfo is a oneOf that contains service specific diff --git a/src/pkg/control/options.go b/src/pkg/control/options.go index df36ceca7..3bda48854 100644 --- a/src/pkg/control/options.go +++ b/src/pkg/control/options.go @@ -101,6 +101,14 @@ type Toggles struct { // DisableIncrementals prevents backups from using incremental lookups, // forcing a new, complete backup of all data regardless of prior state. DisableIncrementals bool `json:"exchangeIncrementals,omitempty"` + // DisableDelta prevents backups from using delta based lookups, + // forcing a backup by enumerating all items. This is different + // from DisableIncrementals in that this does not even makes use of + // delta endpoints with or without a delta token. This is necessary + // when the user has filled up the mailbox storage available to the + // user as Microsoft prevents the API from being able to make calls + // to delta endpoints. + DisableDelta bool `json:"exchangeDelta,omitempty"` // ExchangeImmutableIDs denotes whether Corso should store items with // immutable Exchange IDs. This is only safe to set if the previous backup for // incremental backups used immutable IDs or if a full backup is being done. diff --git a/src/pkg/services/m365/api/users.go b/src/pkg/services/m365/api/users.go index d6fc71aed..73cddce28 100644 --- a/src/pkg/services/m365/api/users.go +++ b/src/pkg/services/m365/api/users.go @@ -58,6 +58,7 @@ type MailboxInfo struct { Language Language WorkingHours WorkingHours ErrGetMailBoxSetting []error + QuotaExceeded bool } type AutomaticRepliesSettings struct { @@ -109,6 +110,12 @@ func (ui *UserInfo) ServiceEnabled(service path.ServiceType) bool { return ok } +// Returns if we can run delta queries on a mailbox. We cannot run +// them if the mailbox is full which is indicated by QuotaExceeded. +func (ui *UserInfo) CanMakeDeltaQueries() bool { + return !ui.Mailbox.QuotaExceeded +} + // --------------------------------------------------------------------------- // methods // --------------------------------------------------------------------------- @@ -260,7 +267,8 @@ func (c Users) GetInfo(ctx context.Context, userID string) (*UserInfo, error) { QueryParameters: &requestParameters, } - if _, err := c.GetMailFolders(ctx, userID, options); err != nil { + mfs, err := c.GetMailFolders(ctx, userID, options) + if err != nil { if graph.IsErrUserNotFound(err) { logger.CtxErr(ctx, err).Error("user not found") return nil, graph.Stack(ctx, clues.Stack(graph.ErrResourceOwnerNotFound, err)) @@ -295,6 +303,32 @@ func (c Users) GetInfo(ctx context.Context, userID string) (*UserInfo, error) { userInfo.Mailbox = mbxInfo + // TODO: This tries to determine if the user has hit their mailbox + // limit by trying to fetch an item and seeing if we get the quota + // exceeded error. Ideally(if available) we should convert this to + // pull the user's usage via an api and compare if they have used + // up their quota. + if mfs != nil { + mf := mfs.GetValue()[0] // we will always have one + options := &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration{ + QueryParameters: &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetQueryParameters{ + Top: ptr.To[int32](1), // just one item is enough + }, + } + _, err = c.stable.Client(). + UsersById(userID). + MailFoldersById(ptr.Val(mf.GetId())). + Messages(). + Delta(). + Get(ctx, options) + + if err != nil && !graph.IsErrQuotaExceeded(err) { + return nil, err + } + + userInfo.Mailbox.QuotaExceeded = graph.IsErrQuotaExceeded(err) + } + return userInfo, nil }