Add non delta pagers to exchange (#3212)

When the user's mailbox is full, we cannot make use of delta apis. This adds initial changes needed to create separate delta and non delta pagers for all of exchange.

*I would suggest looking commit wise when reviewing the PR.*

---

#### Does this PR need a docs update or release note?

- [x]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [ ]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [x] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abin Simon 2023-05-12 21:26:49 +05:30 committed by GitHub
parent 4274de2b73
commit f8aa37b822
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1544 additions and 564 deletions

View File

@ -253,6 +253,99 @@ jobs:
set -euo pipefail set -euo pipefail
./sanityTest ./sanityTest
# non-delta backup
- name: Backup exchange incremental without delta
id: exchange-incremental-test-no-delta
run: |
set -euo pipefail
echo -e "\nBackup Exchange incremental test without delta\n" >> ${CORSO_LOG_FILE}
./corso backup create exchange \
--no-stats \
--hide-progress \
--disable-delta \
--mailbox "${TEST_USER}" \
--json \
2>&1 | tee $TEST_RESULT/backup_exchange_incremental.txt
resultjson=$(sed -e '1,/Completed Backups/d' $TEST_RESULT/backup_exchange_incremental.txt )
if [[ $( echo $resultjson | jq -r '.[0] | .stats.errorCount') -ne 0 ]]; then
echo "backup was not successful"
exit 1
fi
echo result=$( echo $resultjson | jq -r '.[0] | .id' ) >> $GITHUB_OUTPUT
# restore from non delta
- name: Backup non delta exchange restore
id: exchange-non-delta-restore-test
run: |
set -euo pipefail
echo -e "\nBackup Exchange incremental without delta restore test\n" >> ${CORSO_LOG_FILE}
./corso restore exchange \
--no-stats \
--hide-progress \
--backup "${{ steps.exchange-incremental-test-no-delta.outputs.result }}" \
--email-folder Corso_Restore_st_${{ steps.repo-init.outputs.result }} \
2>&1 | tee $TEST_RESULT/exchange-incremantal-restore-test.txt
echo result=$(grep -i -e 'Restoring to folder ' $TEST_RESULT/exchange-incremantal-restore-test.txt | sed "s/Restoring to folder//" ) >> $GITHUB_OUTPUT
- name: Restoration check
env:
SANITY_RESTORE_FOLDER: ${{ steps.exchange-non-delta-restore-test.outputs.result }}
SANITY_RESTORE_SERVICE: "exchange"
TEST_DATA: Corso_Restore_st_${{ steps.repo-init.outputs.result }}
BASE_BACKUP: ${{ steps.exchange-restore-test.outputs.result }}
run: |
set -euo pipefail
./sanityTest
# incremental backup after non-delta
- name: Backup exchange incremental after non-delta
id: exchange-incremental-test-after-non-delta
run: |
set -euo pipefail
echo -e "\nBackup Exchange incremental test after non-delta\n" >> ${CORSO_LOG_FILE}
./corso backup create exchange \
--no-stats \
--hide-progress \
--mailbox "${TEST_USER}" \
--json \
2>&1 | tee $TEST_RESULT/backup_exchange_incremental_after_non_delta.txt
resultjson=$(sed -e '1,/Completed Backups/d' $TEST_RESULT/backup_exchange_incremental_after_non_delta.txt )
if [[ $( echo $resultjson | jq -r '.[0] | .stats.errorCount') -ne 0 ]]; then
echo "backup was not successful"
exit 1
fi
echo result=$( echo $resultjson | jq -r '.[0] | .id' ) >> $GITHUB_OUTPUT
# restore from incremental
- name: Backup incremantal exchange restore after non-delta
id: exchange-incremantal-restore-test-after-non-delta
run: |
set -euo pipefail
echo -e "\nBackup Exchange incremental restore test after non-delta\n" >> ${CORSO_LOG_FILE}
./corso restore exchange \
--no-stats \
--hide-progress \
--backup "${{ steps.exchange-incremental-test-after-non-delta.outputs.result }}" \
--email-folder Corso_Restore_st_${{ steps.repo-init.outputs.result }} \
2>&1 | tee $TEST_RESULT/exchange-incremantal-restore-test-after-non-delta.txt
echo result=$(grep -i -e 'Restoring to folder ' $TEST_RESULT/exchange-incremantal-restore-test-after-non-delta.txt | sed "s/Restoring to folder//" ) >> $GITHUB_OUTPUT
- name: Restoration check
env:
SANITY_RESTORE_FOLDER: ${{ steps.exchange-incremantal-restore-test-after-non-delta.outputs.result }}
SANITY_RESTORE_SERVICE: "exchange"
TEST_DATA: Corso_Restore_st_${{ steps.repo-init.outputs.result }}
BASE_BACKUP: ${{ steps.exchange-restore-test.outputs.result }}
run: |
set -euo pipefail
./sanityTest
########################################################################################################################################## ##########################################################################################################################################
# Onedrive # Onedrive

View File

@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added ### Added
- Released the --mask-sensitive-data flag, which will automatically obscure private data in logs. - Released the --mask-sensitive-data flag, which will automatically obscure private data in logs.
- Added `--disable-delta` flag to disable delta based backups for Exchange
### Fixed ### Fixed
- Graph requests now automatically retry in case of a Bad Gateway or Gateway Timeout. - Graph requests now automatically retry in case of a Bad Gateway or Gateway Timeout.
@ -21,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- OneDrive and SharePoint file downloads will properly redirect from 3xx responses. - OneDrive and SharePoint file downloads will properly redirect from 3xx responses.
- Refined oneDrive rate limiter controls to reduce throttling errors. - Refined oneDrive rate limiter controls to reduce throttling errors.
- Fix handling of duplicate folders at the same hierarchy level in Exchange. Duplicate folders will be merged during restore operations. - Fix handling of duplicate folders at the same hierarchy level in Exchange. Duplicate folders will be merged during restore operations.
- Fix backup for mailboxes that has used up all their storage quota
### Known Issues ### Known Issues
- Restore operations will merge duplicate Exchange folders at the same hierarchy level into a single folder. - Restore operations will merge duplicate Exchange folders at the same hierarchy level into a single folder.
@ -52,6 +54,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- The CORSO_LOG_FILE env is appropriately utilized if no --log-file flag is provided. - The CORSO_LOG_FILE env is appropriately utilized if no --log-file flag is provided.
- Fixed Exchange events progress output to show calendar names instead of IDs. - Fixed Exchange events progress output to show calendar names instead of IDs.
- Fixed reporting no items match if restoring or listing details on an older Exchange backup and filtering by folder. - Fixed reporting no items match if restoring or listing details on an older Exchange backup and filtering by folder.
- Fix backup for mailboxes that has used up all their storage quota
### Known Issues ### Known Issues
- Restoring a OneDrive or SharePoint file with the same name as a file with that name as its M365 ID may restore both items. - Restoring a OneDrive or SharePoint file with the same name as a file with that name as its M365 ID may restore both items.

View File

@ -87,6 +87,7 @@ func addExchangeCommands(cmd *cobra.Command) *cobra.Command {
options.AddFetchParallelismFlag(c) options.AddFetchParallelismFlag(c)
options.AddFailFastFlag(c) options.AddFailFastFlag(c)
options.AddDisableIncrementalsFlag(c) options.AddDisableIncrementalsFlag(c)
options.AddDisableDeltaFlag(c)
options.AddEnableImmutableIDFlag(c) options.AddEnableImmutableIDFlag(c)
options.AddDisableConcurrencyLimiterFlag(c) options.AddDisableConcurrencyLimiterFlag(c)

View File

@ -43,6 +43,7 @@ func (suite *ExchangeUnitSuite) TestAddExchangeCommands() {
utils.UserFN, utils.UserFN,
utils.CategoryDataFN, utils.CategoryDataFN,
options.DisableIncrementalsFN, options.DisableIncrementalsFN,
options.DisableDeltaFN,
options.FailFastFN, options.FailFastFN,
options.FetchParallelismFN, options.FetchParallelismFN,
options.SkipReduceFN, options.SkipReduceFN,

View File

@ -18,6 +18,7 @@ func Control() control.Options {
opt.RestorePermissions = restorePermissionsFV opt.RestorePermissions = restorePermissionsFV
opt.SkipReduce = skipReduceFV opt.SkipReduce = skipReduceFV
opt.ToggleFeatures.DisableIncrementals = disableIncrementalsFV opt.ToggleFeatures.DisableIncrementals = disableIncrementalsFV
opt.ToggleFeatures.DisableDelta = disableDeltaFV
opt.ToggleFeatures.ExchangeImmutableIDs = enableImmutableID opt.ToggleFeatures.ExchangeImmutableIDs = enableImmutableID
opt.ToggleFeatures.DisableConcurrencyLimiter = disableConcurrencyLimiterFV opt.ToggleFeatures.DisableConcurrencyLimiter = disableConcurrencyLimiterFV
opt.Parallelism.ItemFetch = fetchParallelismFV opt.Parallelism.ItemFetch = fetchParallelismFV
@ -35,6 +36,7 @@ const (
NoStatsFN = "no-stats" NoStatsFN = "no-stats"
RestorePermissionsFN = "restore-permissions" RestorePermissionsFN = "restore-permissions"
SkipReduceFN = "skip-reduce" SkipReduceFN = "skip-reduce"
DisableDeltaFN = "disable-delta"
DisableIncrementalsFN = "disable-incrementals" DisableIncrementalsFN = "disable-incrementals"
EnableImmutableIDFN = "enable-immutable-id" EnableImmutableIDFN = "enable-immutable-id"
DisableConcurrencyLimiterFN = "disable-concurrency-limiter" DisableConcurrencyLimiterFN = "disable-concurrency-limiter"
@ -92,7 +94,10 @@ func AddFetchParallelismFlag(cmd *cobra.Command) {
// Feature Flags // Feature Flags
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
var disableIncrementalsFV bool var (
disableIncrementalsFV bool
disableDeltaFV bool
)
// Adds the hidden '--disable-incrementals' cli flag which, when set, disables // Adds the hidden '--disable-incrementals' cli flag which, when set, disables
// incremental backups. // incremental backups.
@ -106,6 +111,18 @@ func AddDisableIncrementalsFlag(cmd *cobra.Command) {
cobra.CheckErr(fs.MarkHidden(DisableIncrementalsFN)) cobra.CheckErr(fs.MarkHidden(DisableIncrementalsFN))
} }
// Adds the hidden '--disable-delta' cli flag which, when set, disables
// delta based backups.
func AddDisableDeltaFlag(cmd *cobra.Command) {
fs := cmd.Flags()
fs.BoolVar(
&disableDeltaFV,
DisableDeltaFN,
false,
"Disable delta based data retrieval in backups.")
cobra.CheckErr(fs.MarkHidden(DisableDeltaFN))
}
var enableImmutableID bool var enableImmutableID bool
// Adds the hidden '--enable-immutable-id' cli flag which, when set, enables // Adds the hidden '--enable-immutable-id' cli flag which, when set, enables

View File

@ -28,6 +28,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
assert.True(t, failFastFV, FailFastFN) assert.True(t, failFastFV, FailFastFN)
assert.True(t, disableIncrementalsFV, DisableIncrementalsFN) assert.True(t, disableIncrementalsFV, DisableIncrementalsFN)
assert.True(t, disableDeltaFV, DisableDeltaFN)
assert.True(t, noStatsFV, NoStatsFN) assert.True(t, noStatsFV, NoStatsFN)
assert.True(t, restorePermissionsFV, RestorePermissionsFN) assert.True(t, restorePermissionsFV, RestorePermissionsFN)
assert.True(t, skipReduceFV, SkipReduceFN) assert.True(t, skipReduceFV, SkipReduceFN)
@ -41,6 +42,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
AddFailFastFlag(cmd) AddFailFastFlag(cmd)
AddDisableIncrementalsFlag(cmd) AddDisableIncrementalsFlag(cmd)
AddDisableDeltaFlag(cmd)
AddRestorePermissionsFlag(cmd) AddRestorePermissionsFlag(cmd)
AddSkipReduceFlag(cmd) AddSkipReduceFlag(cmd)
AddFetchParallelismFlag(cmd) AddFetchParallelismFlag(cmd)
@ -51,6 +53,7 @@ func (suite *OptionsUnitSuite) TestAddExchangeCommands() {
"test", "test",
"--" + FailFastFN, "--" + FailFastFN,
"--" + DisableIncrementalsFN, "--" + DisableIncrementalsFN,
"--" + DisableDeltaFN,
"--" + NoStatsFN, "--" + NoStatsFN,
"--" + RestorePermissionsFN, "--" + RestorePermissionsFN,
"--" + SkipReduceFN, "--" + SkipReduceFN,

View File

@ -21,6 +21,7 @@ import (
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/filters" "github.com/alcionai/corso/src/pkg/filters"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
) )
@ -60,11 +61,12 @@ func (gc *GraphConnector) ProduceBackupCollections(
return nil, nil, clues.Stack(err).WithClues(ctx) return nil, nil, clues.Stack(err).WithClues(ctx)
} }
serviceEnabled, err := checkServiceEnabled( serviceEnabled, canMakeDeltaQueries, err := checkServiceEnabled(
ctx, ctx,
gc.Discovery.Users(), gc.Discovery.Users(),
path.ServiceType(sels.Service), path.ServiceType(sels.Service),
sels.DiscreteOwner) sels.DiscreteOwner,
)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -78,6 +80,12 @@ func (gc *GraphConnector) ProduceBackupCollections(
ssmb *prefixmatcher.StringSetMatcher ssmb *prefixmatcher.StringSetMatcher
) )
if !canMakeDeltaQueries {
logger.Ctx(ctx).Info("delta requests not available")
ctrlOpts.ToggleFeatures.DisableDelta = true
}
switch sels.Service { switch sels.Service {
case selectors.ServiceExchange: case selectors.ServiceExchange:
colls, ssmb, err = exchange.DataCollections( colls, ssmb, err = exchange.DataCollections(
@ -171,22 +179,28 @@ func checkServiceEnabled(
gi discovery.GetInfoer, gi discovery.GetInfoer,
service path.ServiceType, service path.ServiceType,
resource string, resource string,
) (bool, error) { ) (bool, bool, error) {
if service == path.SharePointService { if service == path.SharePointService {
// No "enabled" check required for sharepoint // No "enabled" check required for sharepoint
return true, nil return true, true, nil
} }
info, err := gi.GetInfo(ctx, resource) info, err := gi.GetInfo(ctx, resource)
if err != nil { if err != nil {
return false, err return false, false, err
} }
if !info.ServiceEnabled(service) { if !info.ServiceEnabled(service) {
return false, clues.Wrap(graph.ErrServiceNotEnabled, "checking service access") return false, false, clues.Wrap(graph.ErrServiceNotEnabled, "checking service access")
} }
return true, nil canMakeDeltaQueries := true
if service == path.ExchangeService {
// we currently can only check quota exceeded for exchange
canMakeDeltaQueries = info.CanMakeDeltaQueries()
}
return true, canMakeDeltaQueries, nil
} }
// ConsumeRestoreCollections restores data from the specified collections // ConsumeRestoreCollections restores data from the specified collections

View File

@ -95,11 +95,23 @@ func (suite *DataCollectionIntgSuite) TestExchangeDataCollection() {
} }
for _, test := range tests { for _, test := range tests {
suite.Run(test.name, func() { for _, canMakeDeltaQueries := range []bool{true, false} {
name := test.name
if canMakeDeltaQueries {
name += "-delta"
} else {
name += "-non-delta"
}
suite.Run(name, func() {
t := suite.T() t := suite.T()
sel := test.getSelector(t) sel := test.getSelector(t)
ctrlOpts := control.Defaults()
ctrlOpts.ToggleFeatures.DisableDelta = !canMakeDeltaQueries
collections, excludes, err := exchange.DataCollections( collections, excludes, err := exchange.DataCollections(
ctx, ctx,
sel, sel,
@ -107,7 +119,7 @@ func (suite *DataCollectionIntgSuite) TestExchangeDataCollection() {
nil, nil,
connector.credentials, connector.credentials,
connector.UpdateStatus, connector.UpdateStatus,
control.Defaults(), ctrlOpts,
fault.New(true)) fault.New(true))
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
assert.True(t, excludes.Empty()) assert.True(t, excludes.Empty())
@ -135,6 +147,7 @@ func (suite *DataCollectionIntgSuite) TestExchangeDataCollection() {
}) })
} }
} }
}
// TestInvalidUserForDataCollections ensures verification process for users // TestInvalidUserForDataCollections ensures verification process for users
func (suite *DataCollectionIntgSuite) TestDataCollections_invalidResourceOwner() { func (suite *DataCollectionIntgSuite) TestDataCollections_invalidResourceOwner() {

View File

@ -191,77 +191,35 @@ var _ itemPager = &contactPager{}
type contactPager struct { type contactPager struct {
gs graph.Servicer gs graph.Servicer
builder *users.ItemContactFoldersItemContactsDeltaRequestBuilder builder *users.ItemContactFoldersItemContactsRequestBuilder
options *users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration options *users.ItemContactFoldersItemContactsRequestBuilderGetRequestConfiguration
} }
func (p *contactPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) { func NewContactPager(
resp, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return resp, nil
}
func (p *contactPager) setNext(nextLink string) {
p.builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(nextLink, p.gs.Adapter())
}
func (p *contactPager) valuesIn(pl api.DeltaPageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Contactable](pl)
}
func (c Contacts) GetAddedAndRemovedItemIDs(
ctx context.Context, ctx context.Context,
user, directoryID, oldDelta string, gs graph.Servicer,
user, directoryID string,
immutableIDs bool, immutableIDs bool,
) ([]string, []string, DeltaUpdate, error) { ) (itemPager, error) {
service, err := c.service() selecting, err := buildOptions([]string{"parentFolderId"}, fieldsForContacts)
if err != nil { if err != nil {
return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err) return nil, err
} }
var resetDelta bool requestParameters := &users.ItemContactFoldersItemContactsRequestBuilderGetQueryParameters{
Select: selecting,
}
ctx = clues.Add( options := &users.ItemContactFoldersItemContactsRequestBuilderGetRequestConfiguration{
ctx, QueryParameters: requestParameters,
"category", selectors.ExchangeContact, Headers: buildPreferHeaders(true, immutableIDs),
"container_id", directoryID) }
options, err := optionsForContactFoldersItemDelta(
[]string{"parentFolderId"},
immutableIDs)
if err != nil { if err != nil {
return nil, return &contactPager{}, err
nil,
DeltaUpdate{},
graph.Wrap(ctx, err, "setting contact folder options")
} }
if len(oldDelta) > 0 { builder := gs.Client().UsersById(user).ContactFoldersById(directoryID).Contacts()
var (
builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(oldDelta, service.Adapter())
pgr = &contactPager{service, builder, options}
)
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
// note: happy path, not the error condition
if err == nil {
return added, removed, DeltaUpdate{deltaURL, false}, err
}
// only return on error if it is NOT a delta issue.
// on bad deltas we retry the call with the regular builder
if !graph.IsErrInvalidDelta(err) {
return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err)
}
resetDelta = true
}
builder := service.Client().UsersById(user).ContactFoldersById(directoryID).Contacts().Delta()
pgr := &contactPager{service, builder, options}
if len(os.Getenv("CORSO_URL_LOGGING")) > 0 { if len(os.Getenv("CORSO_URL_LOGGING")) > 0 {
gri, err := builder.ToGetRequestInformation(ctx, options) gri, err := builder.ToGetRequestInformation(ctx, options)
@ -273,12 +231,146 @@ func (c Contacts) GetAddedAndRemovedItemIDs(
} }
} }
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) return &contactPager{gs, builder, options}, nil
if err != nil {
return nil, nil, DeltaUpdate{}, err
} }
return added, removed, DeltaUpdate{deltaURL, resetDelta}, nil func (p *contactPager) getPage(ctx context.Context) (api.PageLinker, error) {
resp, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return resp, nil
}
func (p *contactPager) setNext(nextLink string) {
p.builder = users.NewItemContactFoldersItemContactsRequestBuilder(nextLink, p.gs.Adapter())
}
// non delta pagers don't need reset
func (p *contactPager) reset(context.Context) {}
func (p *contactPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Contactable](pl)
}
// ---------------------------------------------------------------------------
// delta item pager
// ---------------------------------------------------------------------------
var _ itemPager = &contactDeltaPager{}
type contactDeltaPager struct {
gs graph.Servicer
user string
directoryID string
builder *users.ItemContactFoldersItemContactsDeltaRequestBuilder
options *users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration
}
func getContactDeltaBuilder(
ctx context.Context,
gs graph.Servicer,
user string,
directoryID string,
options *users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration,
) *users.ItemContactFoldersItemContactsDeltaRequestBuilder {
builder := gs.Client().UsersById(user).ContactFoldersById(directoryID).Contacts().Delta()
if len(os.Getenv("CORSO_URL_LOGGING")) > 0 {
gri, err := builder.ToGetRequestInformation(ctx, options)
if err != nil {
logger.CtxErr(ctx, err).Error("getting builder info")
} else {
logger.Ctx(ctx).
Infow("builder path-parameters", "path_parameters", gri.PathParameters)
}
}
return builder
}
func NewContactDeltaPager(
ctx context.Context,
gs graph.Servicer,
user, directoryID, deltaURL string,
immutableIDs bool,
) (itemPager, error) {
selecting, err := buildOptions([]string{"parentFolderId"}, fieldsForContacts)
if err != nil {
return nil, err
}
requestParameters := &users.ItemContactFoldersItemContactsDeltaRequestBuilderGetQueryParameters{
Select: selecting,
}
options := &users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration{
QueryParameters: requestParameters,
Headers: buildPreferHeaders(true, immutableIDs),
}
if err != nil {
return &contactDeltaPager{}, err
}
var builder *users.ItemContactFoldersItemContactsDeltaRequestBuilder
if deltaURL != "" {
builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(deltaURL, gs.Adapter())
} else {
builder = getContactDeltaBuilder(ctx, gs, user, directoryID, options)
}
return &contactDeltaPager{gs, user, directoryID, builder, options}, nil
}
func (p *contactDeltaPager) getPage(ctx context.Context) (api.PageLinker, error) {
resp, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return resp, nil
}
func (p *contactDeltaPager) setNext(nextLink string) {
p.builder = users.NewItemContactFoldersItemContactsDeltaRequestBuilder(nextLink, p.gs.Adapter())
}
func (p *contactDeltaPager) reset(ctx context.Context) {
p.builder = getContactDeltaBuilder(ctx, p.gs, p.user, p.directoryID, p.options)
}
func (p *contactDeltaPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Contactable](pl)
}
func (c Contacts) GetAddedAndRemovedItemIDs(
ctx context.Context,
user, directoryID, oldDelta string,
immutableIDs bool,
canMakeDeltaQueries bool,
) ([]string, []string, DeltaUpdate, error) {
service, err := c.service()
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err)
}
ctx = clues.Add(
ctx,
"category", selectors.ExchangeContact,
"container_id", directoryID)
pager, err := NewContactPager(ctx, service, user, directoryID, immutableIDs)
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating non-delta pager")
}
deltaPager, err := NewContactDeltaPager(ctx, service, user, directoryID, oldDelta, immutableIDs)
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating delta pager")
}
return getAddedAndRemovedItemIDs(ctx, service, pager, deltaPager, oldDelta, canMakeDeltaQueries)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -254,23 +254,47 @@ func (c Events) EnumerateContainers(
return el.Failure() return el.Failure()
} }
const (
eventBetaDeltaURLTemplate = "https://graph.microsoft.com/beta/users/%s/calendars/%s/events/delta"
)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// item pager // item pager
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
var _ itemPager = &eventPager{} var _ itemPager = &eventPager{}
const (
eventBetaDeltaURLTemplate = "https://graph.microsoft.com/beta/users/%s/calendars/%s/events/delta"
)
type eventPager struct { type eventPager struct {
gs graph.Servicer gs graph.Servicer
builder *users.ItemCalendarsItemEventsDeltaRequestBuilder builder *users.ItemCalendarsItemEventsRequestBuilder
options *users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration options *users.ItemCalendarsItemEventsRequestBuilderGetRequestConfiguration
} }
func (p *eventPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) { func NewEventPager(
ctx context.Context,
gs graph.Servicer,
user, calendarID string,
immutableIDs bool,
) (itemPager, error) {
options := &users.ItemCalendarsItemEventsRequestBuilderGetRequestConfiguration{
Headers: buildPreferHeaders(true, immutableIDs),
}
builder := gs.Client().UsersById(user).CalendarsById(calendarID).Events()
if len(os.Getenv("CORSO_URL_LOGGING")) > 0 {
gri, err := builder.ToGetRequestInformation(ctx, options)
if err != nil {
logger.CtxErr(ctx, err).Error("getting builder info")
} else {
logger.Ctx(ctx).
Infow("builder path-parameters", "path_parameters", gri.PathParameters)
}
}
return &eventPager{gs, builder, options}, nil
}
func (p *eventPager) getPage(ctx context.Context) (api.PageLinker, error) {
resp, err := p.builder.Get(ctx, p.options) resp, err := p.builder.Get(ctx, p.options)
if err != nil { if err != nil {
return nil, graph.Stack(ctx, err) return nil, graph.Stack(ctx, err)
@ -280,54 +304,58 @@ func (p *eventPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) {
} }
func (p *eventPager) setNext(nextLink string) { func (p *eventPager) setNext(nextLink string) {
p.builder = users.NewItemCalendarsItemEventsDeltaRequestBuilder(nextLink, p.gs.Adapter()) p.builder = users.NewItemCalendarsItemEventsRequestBuilder(nextLink, p.gs.Adapter())
} }
func (p *eventPager) valuesIn(pl api.DeltaPageLinker) ([]getIDAndAddtler, error) { // non delta pagers don't need reset
func (p *eventPager) reset(context.Context) {}
func (p *eventPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Eventable](pl) return toValues[models.Eventable](pl)
} }
func (c Events) GetAddedAndRemovedItemIDs( // ---------------------------------------------------------------------------
ctx context.Context, // delta item pager
user, calendarID, oldDelta string, // ---------------------------------------------------------------------------
immutableIDs bool,
) ([]string, []string, DeltaUpdate, error) { var _ itemPager = &eventDeltaPager{}
service, err := c.service()
if err != nil { type eventDeltaPager struct {
return nil, nil, DeltaUpdate{}, err gs graph.Servicer
user string
calendarID string
builder *users.ItemCalendarsItemEventsDeltaRequestBuilder
options *users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration
} }
var ( func NewEventDeltaPager(
resetDelta bool ctx context.Context,
opts = &users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration{ gs graph.Servicer,
user, calendarID, deltaURL string,
immutableIDs bool,
) (itemPager, error) {
options := &users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration{
Headers: buildPreferHeaders(true, immutableIDs), Headers: buildPreferHeaders(true, immutableIDs),
} }
)
ctx = clues.Add( var builder *users.ItemCalendarsItemEventsDeltaRequestBuilder
ctx,
"container_id", calendarID)
if len(oldDelta) > 0 { if deltaURL == "" {
var ( builder = getEventDeltaBuilder(ctx, gs, user, calendarID, options)
builder = users.NewItemCalendarsItemEventsDeltaRequestBuilder(oldDelta, service.Adapter()) } else {
pgr = &eventPager{service, builder, opts} builder = users.NewItemCalendarsItemEventsDeltaRequestBuilder(deltaURL, gs.Adapter())
)
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
// note: happy path, not the error condition
if err == nil {
return added, removed, DeltaUpdate{deltaURL, false}, nil
}
// only return on error if it is NOT a delta issue.
// on bad deltas we retry the call with the regular builder
if !graph.IsErrInvalidDelta(err) {
return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err)
} }
resetDelta = true return &eventDeltaPager{gs, user, calendarID, builder, options}, nil
} }
func getEventDeltaBuilder(
ctx context.Context,
gs graph.Servicer,
user string,
calendarID string,
options *users.ItemCalendarsItemEventsDeltaRequestBuilderGetRequestConfiguration,
) *users.ItemCalendarsItemEventsDeltaRequestBuilder {
// Graph SDK only supports delta queries against events on the beta version, so we're // Graph SDK only supports delta queries against events on the beta version, so we're
// manufacturing use of the beta version url to make the call instead. // manufacturing use of the beta version url to make the call instead.
// See: https://learn.microsoft.com/ko-kr/graph/api/event-delta?view=graph-rest-beta&tabs=http // See: https://learn.microsoft.com/ko-kr/graph/api/event-delta?view=graph-rest-beta&tabs=http
@ -337,11 +365,10 @@ func (c Events) GetAddedAndRemovedItemIDs(
// Likewise, the NextLink and DeltaLink odata tags carry our hack forward, so the rest of the code // Likewise, the NextLink and DeltaLink odata tags carry our hack forward, so the rest of the code
// works as intended (until, at least, we want to _not_ call the beta anymore). // works as intended (until, at least, we want to _not_ call the beta anymore).
rawURL := fmt.Sprintf(eventBetaDeltaURLTemplate, user, calendarID) rawURL := fmt.Sprintf(eventBetaDeltaURLTemplate, user, calendarID)
builder := users.NewItemCalendarsItemEventsDeltaRequestBuilder(rawURL, service.Adapter()) builder := users.NewItemCalendarsItemEventsDeltaRequestBuilder(rawURL, gs.Adapter())
pgr := &eventPager{service, builder, opts}
if len(os.Getenv("CORSO_URL_LOGGING")) > 0 { if len(os.Getenv("CORSO_URL_LOGGING")) > 0 {
gri, err := builder.ToGetRequestInformation(ctx, nil) gri, err := builder.ToGetRequestInformation(ctx, options)
if err != nil { if err != nil {
logger.CtxErr(ctx, err).Error("getting builder info") logger.CtxErr(ctx, err).Error("getting builder info")
} else { } else {
@ -350,13 +377,56 @@ func (c Events) GetAddedAndRemovedItemIDs(
} }
} }
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) return builder
}
func (p *eventDeltaPager) getPage(ctx context.Context) (api.PageLinker, error) {
resp, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return resp, nil
}
func (p *eventDeltaPager) setNext(nextLink string) {
p.builder = users.NewItemCalendarsItemEventsDeltaRequestBuilder(nextLink, p.gs.Adapter())
}
func (p *eventDeltaPager) reset(ctx context.Context) {
p.builder = getEventDeltaBuilder(ctx, p.gs, p.user, p.calendarID, p.options)
}
func (p *eventDeltaPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Eventable](pl)
}
func (c Events) GetAddedAndRemovedItemIDs(
ctx context.Context,
user, calendarID, oldDelta string,
immutableIDs bool,
canMakeDeltaQueries bool,
) ([]string, []string, DeltaUpdate, error) {
service, err := c.service()
if err != nil { if err != nil {
return nil, nil, DeltaUpdate{}, err return nil, nil, DeltaUpdate{}, err
} }
// Events don't have a delta endpoint so just return an empty string. ctx = clues.Add(
return added, removed, DeltaUpdate{deltaURL, resetDelta}, nil ctx,
"container_id", calendarID)
pager, err := NewEventPager(ctx, service, user, calendarID, immutableIDs)
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating non-delta pager")
}
deltaPager, err := NewEventDeltaPager(ctx, service, user, calendarID, oldDelta, immutableIDs)
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating delta pager")
}
return getAddedAndRemovedItemIDs(ctx, service, pager, deltaPager, oldDelta, canMakeDeltaQueries)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -20,6 +20,10 @@ import (
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
) )
const (
mailFoldersBetaURLTemplate = "https://graph.microsoft.com/beta/users/%s/mailFolders"
)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// controller // controller
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -241,6 +245,43 @@ func (c Mail) GetItem(
return mail, MailInfo(mail, size), nil return mail, MailInfo(mail, size), nil
} }
type mailFolderPager struct {
service graph.Servicer
builder *users.ItemMailFoldersRequestBuilder
}
func NewMailFolderPager(service graph.Servicer, user string) mailFolderPager {
// v1.0 non delta /mailFolders endpoint does not return any of the nested folders
rawURL := fmt.Sprintf(mailFoldersBetaURLTemplate, user)
builder := users.NewItemMailFoldersRequestBuilder(rawURL, service.Adapter())
return mailFolderPager{service, builder}
}
func (p *mailFolderPager) getPage(ctx context.Context) (api.PageLinker, error) {
page, err := p.builder.Get(ctx, nil)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return page, nil
}
func (p *mailFolderPager) setNext(nextLink string) {
p.builder = users.NewItemMailFoldersRequestBuilder(nextLink, p.service.Adapter())
}
func (p *mailFolderPager) valuesIn(pl api.PageLinker) ([]models.MailFolderable, error) {
// Ideally this should be `users.ItemMailFoldersResponseable`, but
// that is not a thing as stable returns different result
page, ok := pl.(models.MailFolderCollectionResponseable)
if !ok {
return nil, clues.New("converting to ItemMailFoldersResponseable")
}
return page.GetValue(), nil
}
// EnumerateContainers iterates through all of the users current // EnumerateContainers iterates through all of the users current
// mail folders, converting each to a graph.CacheFolder, and calling // mail folders, converting each to a graph.CacheFolder, and calling
// fn(cf) on each one. // fn(cf) on each one.
@ -258,22 +299,25 @@ func (c Mail) EnumerateContainers(
} }
el := errs.Local() el := errs.Local()
builder := service.Client().
UsersById(userID). pgr := NewMailFolderPager(service, userID)
MailFolders().
Delta()
for { for {
if el.Failure() != nil { if el.Failure() != nil {
break break
} }
resp, err := builder.Get(ctx, nil) page, err := pgr.getPage(ctx)
if err != nil { if err != nil {
return graph.Stack(ctx, err) return graph.Stack(ctx, err)
} }
for _, v := range resp.GetValue() { resp, err := pgr.valuesIn(page)
if err != nil {
return graph.Stack(ctx, err)
}
for _, v := range resp {
if el.Failure() != nil { if el.Failure() != nil {
break break
} }
@ -290,12 +334,12 @@ func (c Mail) EnumerateContainers(
} }
} }
link, ok := ptr.ValOK(resp.GetOdataNextLink()) link, ok := ptr.ValOK(page.GetOdataNextLink())
if !ok { if !ok {
break break
} }
builder = users.NewItemMailFoldersDeltaRequestBuilder(link, service.Adapter()) pgr.setNext(link)
} }
return el.Failure() return el.Failure()
@ -309,77 +353,35 @@ var _ itemPager = &mailPager{}
type mailPager struct { type mailPager struct {
gs graph.Servicer gs graph.Servicer
builder *users.ItemMailFoldersItemMessagesDeltaRequestBuilder builder *users.ItemMailFoldersItemMessagesRequestBuilder
options *users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration options *users.ItemMailFoldersItemMessagesRequestBuilderGetRequestConfiguration
} }
func (p *mailPager) getPage(ctx context.Context) (api.DeltaPageLinker, error) { func NewMailPager(
page, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return page, nil
}
func (p *mailPager) setNext(nextLink string) {
p.builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(nextLink, p.gs.Adapter())
}
func (p *mailPager) valuesIn(pl api.DeltaPageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Messageable](pl)
}
func (c Mail) GetAddedAndRemovedItemIDs(
ctx context.Context, ctx context.Context,
user, directoryID, oldDelta string, gs graph.Servicer,
user, directoryID string,
immutableIDs bool, immutableIDs bool,
) ([]string, []string, DeltaUpdate, error) { ) (itemPager, error) {
service, err := c.service() selecting, err := buildOptions([]string{"isRead"}, fieldsForMessages)
if err != nil { if err != nil {
return nil, nil, DeltaUpdate{}, err return nil, err
} }
var ( requestParameters := &users.ItemMailFoldersItemMessagesRequestBuilderGetQueryParameters{
deltaURL string Select: selecting,
resetDelta bool }
)
ctx = clues.Add( options := &users.ItemMailFoldersItemMessagesRequestBuilderGetRequestConfiguration{
ctx, QueryParameters: requestParameters,
"category", selectors.ExchangeMail, Headers: buildPreferHeaders(true, immutableIDs),
"container_id", directoryID) }
options, err := optionsForFolderMessagesDelta([]string{"isRead"}, immutableIDs)
if err != nil { if err != nil {
return nil, return &mailPager{}, err
nil,
DeltaUpdate{},
graph.Wrap(ctx, err, "setting contact folder options")
} }
if len(oldDelta) > 0 { builder := gs.Client().UsersById(user).MailFoldersById(directoryID).Messages()
var (
builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, service.Adapter())
pgr = &mailPager{service, builder, options}
)
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
// note: happy path, not the error condition
if err == nil {
return added, removed, DeltaUpdate{deltaURL, false}, err
}
// only return on error if it is NOT a delta issue.
// on bad deltas we retry the call with the regular builder
if !graph.IsErrInvalidDelta(err) {
return nil, nil, DeltaUpdate{}, err
}
resetDelta = true
}
builder := service.Client().UsersById(user).MailFoldersById(directoryID).Messages().Delta()
pgr := &mailPager{service, builder, options}
if len(os.Getenv("CORSO_URL_LOGGING")) > 0 { if len(os.Getenv("CORSO_URL_LOGGING")) > 0 {
gri, err := builder.ToGetRequestInformation(ctx, options) gri, err := builder.ToGetRequestInformation(ctx, options)
@ -391,12 +393,158 @@ func (c Mail) GetAddedAndRemovedItemIDs(
} }
} }
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr) return &mailPager{gs, builder, options}, nil
}
func (p *mailPager) getPage(ctx context.Context) (api.PageLinker, error) {
page, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return page, nil
}
func (p *mailPager) setNext(nextLink string) {
p.builder = users.NewItemMailFoldersItemMessagesRequestBuilder(nextLink, p.gs.Adapter())
}
// non delta pagers don't have reset
func (p *mailPager) reset(context.Context) {}
func (p *mailPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Messageable](pl)
}
// ---------------------------------------------------------------------------
// delta item pager
// ---------------------------------------------------------------------------
var _ itemPager = &mailDeltaPager{}
type mailDeltaPager struct {
gs graph.Servicer
user string
directoryID string
builder *users.ItemMailFoldersItemMessagesDeltaRequestBuilder
options *users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration
}
func getMailDeltaBuilder(
ctx context.Context,
gs graph.Servicer,
user string,
directoryID string,
options *users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration,
) *users.ItemMailFoldersItemMessagesDeltaRequestBuilder {
builder := gs.Client().UsersById(user).MailFoldersById(directoryID).Messages().Delta()
if len(os.Getenv("CORSO_URL_LOGGING")) > 0 {
gri, err := builder.ToGetRequestInformation(ctx, options)
if err != nil {
logger.CtxErr(ctx, err).Error("getting builder info")
} else {
logger.Ctx(ctx).
Infow("builder path-parameters", "path_parameters", gri.PathParameters)
}
}
return builder
}
func NewMailDeltaPager(
ctx context.Context,
gs graph.Servicer,
user, directoryID, oldDelta string,
immutableIDs bool,
) (itemPager, error) {
selecting, err := buildOptions([]string{"isRead"}, fieldsForMessages)
if err != nil {
return nil, err
}
requestParameters := &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetQueryParameters{
Select: selecting,
}
options := &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration{
QueryParameters: requestParameters,
Headers: buildPreferHeaders(true, immutableIDs),
}
if err != nil {
return &mailDeltaPager{}, err
}
var builder *users.ItemMailFoldersItemMessagesDeltaRequestBuilder
if len(oldDelta) > 0 {
builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(oldDelta, gs.Adapter())
} else {
builder = getMailDeltaBuilder(ctx, gs, user, directoryID, options)
}
return &mailDeltaPager{gs, user, directoryID, builder, options}, nil
}
func (p *mailDeltaPager) getPage(ctx context.Context) (api.PageLinker, error) {
page, err := p.builder.Get(ctx, p.options)
if err != nil {
return nil, graph.Stack(ctx, err)
}
return page, nil
}
func (p *mailDeltaPager) setNext(nextLink string) {
p.builder = users.NewItemMailFoldersItemMessagesDeltaRequestBuilder(nextLink, p.gs.Adapter())
}
func (p *mailDeltaPager) reset(ctx context.Context) {
p.builder = p.gs.Client().UsersById(p.user).MailFoldersById(p.directoryID).Messages().Delta()
if len(os.Getenv("CORSO_URL_LOGGING")) > 0 {
gri, err := p.builder.ToGetRequestInformation(ctx, p.options)
if err != nil {
logger.CtxErr(ctx, err).Error("getting builder info")
} else {
logger.Ctx(ctx).
Infow("builder path-parameters", "path_parameters", gri.PathParameters)
}
}
}
func (p *mailDeltaPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) {
return toValues[models.Messageable](pl)
}
func (c Mail) GetAddedAndRemovedItemIDs(
ctx context.Context,
user, directoryID, oldDelta string,
immutableIDs bool,
canMakeDeltaQueries bool,
) ([]string, []string, DeltaUpdate, error) {
service, err := c.service()
if err != nil { if err != nil {
return nil, nil, DeltaUpdate{}, err return nil, nil, DeltaUpdate{}, err
} }
return added, removed, DeltaUpdate{deltaURL, resetDelta}, nil ctx = clues.Add(
ctx,
"category", selectors.ExchangeMail,
"container_id", directoryID)
pager, err := NewMailPager(ctx, service, user, directoryID, immutableIDs)
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating delta pager")
}
deltaPager, err := NewMailDeltaPager(ctx, service, user, directoryID, oldDelta, immutableIDs)
if err != nil {
return nil, nil, DeltaUpdate{}, graph.Wrap(ctx, err, "creating delta pager")
}
return getAddedAndRemovedItemIDs(ctx, service, pager, deltaPager, oldDelta, canMakeDeltaQueries)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -75,27 +75,6 @@ const (
// which reduces the overall latency of complex calls // which reduces the overall latency of complex calls
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
func optionsForFolderMessagesDelta(
moreOps []string,
immutableIDs bool,
) (*users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration, error) {
selecting, err := buildOptions(moreOps, fieldsForMessages)
if err != nil {
return nil, err
}
requestParameters := &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetQueryParameters{
Select: selecting,
}
options := &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration{
QueryParameters: requestParameters,
Headers: buildPreferHeaders(true, immutableIDs),
}
return options, nil
}
// optionsForCalendars places allowed options for exchange.Calendar object // optionsForCalendars places allowed options for exchange.Calendar object
// @param moreOps should reflect elements from fieldsForCalendars // @param moreOps should reflect elements from fieldsForCalendars
// @return is first call in Calendars().GetWithRequestConfigurationAndResponseHandler // @return is first call in Calendars().GetWithRequestConfigurationAndResponseHandler
@ -180,27 +159,6 @@ func optionsForMailFoldersItem(
return options, nil return options, nil
} }
func optionsForContactFoldersItemDelta(
moreOps []string,
immutableIDs bool,
) (*users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration, error) {
selecting, err := buildOptions(moreOps, fieldsForContacts)
if err != nil {
return nil, err
}
requestParameters := &users.ItemContactFoldersItemContactsDeltaRequestBuilderGetQueryParameters{
Select: selecting,
}
options := &users.ItemContactFoldersItemContactsDeltaRequestBuilderGetRequestConfiguration{
QueryParameters: requestParameters,
Headers: buildPreferHeaders(true, immutableIDs),
}
return options, nil
}
// optionsForContactChildFolders builds a contacts child folders request. // optionsForContactChildFolders builds a contacts child folders request.
func optionsForContactChildFolders( func optionsForContactChildFolders(
moreOps []string, moreOps []string,

View File

@ -18,9 +18,16 @@ import (
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
type itemPager interface { type itemPager interface {
getPage(context.Context) (api.DeltaPageLinker, error) // getPage get a page with the specified options from graph
getPage(context.Context) (api.PageLinker, error)
// setNext is used to pass in the next url got from graph
setNext(string) setNext(string)
valuesIn(api.DeltaPageLinker) ([]getIDAndAddtler, error) // reset is used to clear delta url in delta pagers. When
// reset is called, we reset the state(delta url) that we
// currently have and start a new delta query without the token.
reset(context.Context)
// valuesIn gets us the values in a page
valuesIn(api.PageLinker) ([]getIDAndAddtler, error)
} }
type getIDAndAddtler interface { type getIDAndAddtler interface {
@ -56,6 +63,54 @@ func toValues[T any](a any) ([]getIDAndAddtler, error) {
return r, nil return r, nil
} }
func getAddedAndRemovedItemIDs(
ctx context.Context,
service graph.Servicer,
pager itemPager,
deltaPager itemPager,
oldDelta string,
canMakeDeltaQueries bool,
) ([]string, []string, DeltaUpdate, error) {
var (
pgr itemPager
resetDelta bool
)
if canMakeDeltaQueries {
pgr = deltaPager
resetDelta = len(oldDelta) == 0
} else {
pgr = pager
resetDelta = true
}
added, removed, deltaURL, err := getItemsAddedAndRemovedFromContainer(ctx, pgr)
// note: happy path, not the error condition
if err == nil {
return added, removed, DeltaUpdate{deltaURL, resetDelta}, err
}
// If we already tried with a non-delta url, we can return
if !canMakeDeltaQueries {
return nil, nil, DeltaUpdate{}, err
}
// return error if invalid not delta error or oldDelta was empty
if !graph.IsErrInvalidDelta(err) || len(oldDelta) == 0 {
return nil, nil, DeltaUpdate{}, err
}
// reset deltaPager
pgr.reset(ctx)
added, removed, deltaURL, err = getItemsAddedAndRemovedFromContainer(ctx, pgr)
if err != nil {
return nil, nil, DeltaUpdate{}, err
}
return added, removed, DeltaUpdate{deltaURL, true}, nil
}
// generic controller for retrieving all item ids in a container. // generic controller for retrieving all item ids in a container.
func getItemsAddedAndRemovedFromContainer( func getItemsAddedAndRemovedFromContainer(
ctx context.Context, ctx context.Context,
@ -65,6 +120,8 @@ func getItemsAddedAndRemovedFromContainer(
addedIDs = []string{} addedIDs = []string{}
removedIDs = []string{} removedIDs = []string{}
deltaURL string deltaURL string
nextLink string
deltaLink string
) )
itemCount := 0 itemCount := 0
@ -104,10 +161,20 @@ func getItemsAddedAndRemovedFromContainer(
} }
} }
nextLink, delta := api.NextAndDeltaLink(resp) dresp, ok := resp.(api.DeltaPageLinker)
if ok {
nextLink, deltaLink = api.NextAndDeltaLink(dresp)
} else {
nextLink = api.NextLink(resp)
deltaLink = "" // to make sure we don't use an old value
}
if len(os.Getenv("CORSO_URL_LOGGING")) > 0 { if len(os.Getenv("CORSO_URL_LOGGING")) > 0 {
if !api.IsNextLinkValid(nextLink) || api.IsNextLinkValid(delta) { if !api.IsNextLinkValid(nextLink) || !api.IsNextLinkValid(deltaLink) {
logger.Ctx(ctx).Infof("Received invalid link from M365:\nNext Link: %s\nDelta Link: %s\n", nextLink, delta) logger.Ctx(ctx).
With("next_link", graph.LoggableURL(nextLink)).
With("delta_link", graph.LoggableURL(deltaLink)).
Info("invalid link from M365")
} }
} }
@ -115,8 +182,8 @@ func getItemsAddedAndRemovedFromContainer(
// once we run through pages of nextLinks, the last query will // once we run through pages of nextLinks, the last query will
// produce a deltaLink instead (if supported), which we'll use on // produce a deltaLink instead (if supported), which we'll use on
// the next backup to only get the changes since this run. // the next backup to only get the changes since this run.
if len(delta) > 0 { if len(deltaLink) > 0 {
deltaURL = delta deltaURL = deltaLink
} }
// the nextLink is our page cursor within this query. // the nextLink is our page cursor within this query.

View File

@ -0,0 +1,256 @@
package api
import (
"context"
"testing"
"github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/graph/api"
"github.com/alcionai/corso/src/internal/tester"
)
type testPagerValue struct {
id string
removed bool
}
func (v testPagerValue) GetId() *string { return &v.id } //revive:disable-line:var-naming
func (v testPagerValue) GetAdditionalData() map[string]any {
if v.removed {
return map[string]any{graph.AddtlDataRemoved: true}
}
return map[string]any{}
}
type testPage struct{}
func (p testPage) GetOdataNextLink() *string {
next := "" // no next, just one page
return &next
}
var _ itemPager = &testPager{}
type testPager struct {
t *testing.T
added []string
removed []string
errorCode string
needsReset bool
}
func (p *testPager) getPage(ctx context.Context) (api.PageLinker, error) {
if p.errorCode != "" {
ierr := odataerrors.NewMainError()
ierr.SetCode(&p.errorCode)
err := odataerrors.NewODataError()
err.SetError(ierr)
return nil, err
}
return testPage{}, nil
}
func (p *testPager) setNext(string) {}
func (p *testPager) reset(context.Context) {
if !p.needsReset {
require.Fail(p.t, "reset should not be called")
}
p.needsReset = false
p.errorCode = ""
}
func (p *testPager) valuesIn(pl api.PageLinker) ([]getIDAndAddtler, error) {
items := []getIDAndAddtler{}
for _, id := range p.added {
items = append(items, testPagerValue{id: id})
}
for _, id := range p.removed {
items = append(items, testPagerValue{id: id, removed: true})
}
return items, nil
}
type SharedAPIUnitSuite struct {
tester.Suite
}
func TestSharedAPIUnitSuite(t *testing.T) {
suite.Run(t, &SharedAPIUnitSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *SharedAPIUnitSuite) TestGetAddedAndRemovedItemIDs() {
tests := []struct {
name string
pagerGetter func(context.Context, graph.Servicer, string, string, bool) (itemPager, error)
deltaPagerGetter func(context.Context, graph.Servicer, string, string, string, bool) (itemPager, error)
added []string
removed []string
deltaUpdate DeltaUpdate
delta string
canMakeDeltaQueries bool
}{
{
name: "no prev delta",
pagerGetter: func(
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
immutableIDs bool,
) (itemPager, error) {
// this should not be called
return nil, assert.AnError
},
deltaPagerGetter: func(
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
delta string,
immutableIDs bool,
) (itemPager, error) {
return &testPager{
t: suite.T(),
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
}, nil
},
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
deltaUpdate: DeltaUpdate{Reset: true},
canMakeDeltaQueries: true,
},
{
name: "with prev delta",
pagerGetter: func(
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
immutableIDs bool,
) (itemPager, error) {
// this should not be called
return nil, assert.AnError
},
deltaPagerGetter: func(
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
delta string,
immutableIDs bool,
) (itemPager, error) {
return &testPager{
t: suite.T(),
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
}, nil
},
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
delta: "delta",
deltaUpdate: DeltaUpdate{Reset: false},
canMakeDeltaQueries: true,
},
{
name: "delta expired",
pagerGetter: func(
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
immutableIDs bool,
) (itemPager, error) {
// this should not be called
return nil, assert.AnError
},
deltaPagerGetter: func(
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
delta string,
immutableIDs bool,
) (itemPager, error) {
return &testPager{
t: suite.T(),
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
errorCode: "SyncStateNotFound",
needsReset: true,
}, nil
},
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
delta: "delta",
deltaUpdate: DeltaUpdate{Reset: true},
canMakeDeltaQueries: true,
},
{
name: "quota exceeded",
pagerGetter: func(
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
immutableIDs bool,
) (itemPager, error) {
return &testPager{
t: suite.T(),
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
}, nil
},
deltaPagerGetter: func(
ctx context.Context,
gs graph.Servicer,
user string,
directory string,
delta string,
immutableIDs bool,
) (itemPager, error) {
return &testPager{errorCode: "ErrorQuotaExceeded"}, nil
},
added: []string{"uno", "dos"},
removed: []string{"tres", "quatro"},
deltaUpdate: DeltaUpdate{Reset: true},
canMakeDeltaQueries: false,
},
}
for _, tt := range tests {
suite.Run(tt.name, func() {
ctx, flush := tester.NewContext()
defer flush()
pager, _ := tt.pagerGetter(ctx, graph.Service{}, "user", "directory", false)
deltaPager, _ := tt.deltaPagerGetter(ctx, graph.Service{}, "user", "directory", tt.delta, false)
added, removed, deltaUpdate, err := getAddedAndRemovedItemIDs(
ctx,
graph.Service{},
pager,
deltaPager,
tt.delta,
tt.canMakeDeltaQueries,
)
require.NoError(suite.T(), err, "getting added and removed item IDs")
require.EqualValues(suite.T(), tt.added, added, "added item IDs")
require.EqualValues(suite.T(), tt.removed, removed, "removed item IDs")
require.Equal(suite.T(), tt.deltaUpdate, deltaUpdate, "delta update")
})
}
}

View File

@ -41,7 +41,7 @@ func (dps DeltaPaths) AddDelta(k, d string) {
dp = DeltaPath{} dp = DeltaPath{}
} }
dp.delta = d dp.Delta = d
dps[k] = dp dps[k] = dp
} }
@ -51,13 +51,13 @@ func (dps DeltaPaths) AddPath(k, p string) {
dp = DeltaPath{} dp = DeltaPath{}
} }
dp.path = p dp.Path = p
dps[k] = dp dps[k] = dp
} }
type DeltaPath struct { type DeltaPath struct {
delta string Delta string
path string Path string
} }
// ParseMetadataCollections produces a map of structs holding delta // ParseMetadataCollections produces a map of structs holding delta
@ -148,7 +148,7 @@ func parseMetadataCollections(
// complete backup on the next run. // complete backup on the next run.
for _, dps := range cdp { for _, dps := range cdp {
for k, dp := range dps { for k, dp := range dps {
if len(dp.delta) == 0 || len(dp.path) == 0 { if len(dp.Path) == 0 {
delete(dps, k) delete(dps, k)
} }
} }

View File

@ -68,7 +68,12 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
data: []fileValues{ data: []fileValues{
{graph.PreviousPathFileName, "prev-path"}, {graph.PreviousPathFileName, "prev-path"},
}, },
expect: map[string]DeltaPath{}, expect: map[string]DeltaPath{
"key": {
Delta: "delta-link",
Path: "prev-path",
},
},
expectError: assert.NoError, expectError: assert.NoError,
}, },
{ {
@ -87,8 +92,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
}, },
expect: map[string]DeltaPath{ expect: map[string]DeltaPath{
"key": { "key": {
delta: "delta-link", Delta: "delta-link",
path: "prev-path", Path: "prev-path",
}, },
}, },
expectError: assert.NoError, expectError: assert.NoError,
@ -108,7 +113,12 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
{graph.DeltaURLsFileName, ""}, {graph.DeltaURLsFileName, ""},
{graph.PreviousPathFileName, "prev-path"}, {graph.PreviousPathFileName, "prev-path"},
}, },
expect: map[string]DeltaPath{}, expect: map[string]DeltaPath{
"key": {
Delta: "delta-link",
Path: "prev-path",
},
},
expectError: assert.NoError, expectError: assert.NoError,
}, },
{ {
@ -119,8 +129,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
}, },
expect: map[string]DeltaPath{ expect: map[string]DeltaPath{
"key": { "key": {
delta: "`!@#$%^&*()_[]{}/\"\\", Delta: "`!@#$%^&*()_[]{}/\"\\",
path: "prev-path", Path: "prev-path",
}, },
}, },
expectError: assert.NoError, expectError: assert.NoError,
@ -133,8 +143,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
}, },
expect: map[string]DeltaPath{ expect: map[string]DeltaPath{
"key": { "key": {
delta: "\\n\\r\\t\\b\\f\\v\\0\\\\", Delta: "\\n\\r\\t\\b\\f\\v\\0\\\\",
path: "prev-path", Path: "prev-path",
}, },
}, },
expectError: assert.NoError, expectError: assert.NoError,
@ -150,8 +160,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
}, },
expect: map[string]DeltaPath{ expect: map[string]DeltaPath{
"key": { "key": {
delta: "\\n", Delta: "\\n",
path: "prev-path", Path: "prev-path",
}, },
}, },
expectError: assert.NoError, expectError: assert.NoError,
@ -191,8 +201,8 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
assert.Len(t, emails, len(test.expect)) assert.Len(t, emails, len(test.expect))
for k, v := range emails { for k, v := range emails {
assert.Equal(t, v.delta, emails[k].delta, "delta") assert.Equal(t, v.Delta, emails[k].Delta, "delta")
assert.Equal(t, v.path, emails[k].path, "path") assert.Equal(t, v.Path, emails[k].Path, "path")
} }
}) })
} }
@ -248,6 +258,7 @@ func (suite *DataCollectionsIntegrationSuite) TestMailFetch() {
name string name string
scope selectors.ExchangeScope scope selectors.ExchangeScope
folderNames map[string]struct{} folderNames map[string]struct{}
canMakeDeltaQueries bool
}{ }{
{ {
name: "Folder Iterative Check Mail", name: "Folder Iterative Check Mail",
@ -258,6 +269,18 @@ func (suite *DataCollectionsIntegrationSuite) TestMailFetch() {
folderNames: map[string]struct{}{ folderNames: map[string]struct{}{
DefaultMailFolder: {}, DefaultMailFolder: {},
}, },
canMakeDeltaQueries: true,
},
{
name: "Folder Iterative Check Mail Non-Delta",
scope: selectors.NewExchangeBackup(users).MailFolders(
[]string{DefaultMailFolder},
selectors.PrefixMatch(),
)[0],
folderNames: map[string]struct{}{
DefaultMailFolder: {},
},
canMakeDeltaQueries: false,
}, },
} }
@ -265,13 +288,16 @@ func (suite *DataCollectionsIntegrationSuite) TestMailFetch() {
suite.Run(test.name, func() { suite.Run(test.name, func() {
t := suite.T() t := suite.T()
ctrlOpts := control.Defaults()
ctrlOpts.ToggleFeatures.DisableDelta = !test.canMakeDeltaQueries
collections, err := createCollections( collections, err := createCollections(
ctx, ctx,
acct, acct,
inMock.NewProvider(userID, userID), inMock.NewProvider(userID, userID),
test.scope, test.scope,
DeltaPaths{}, DeltaPaths{},
control.Defaults(), ctrlOpts,
func(status *support.ConnectorOperationStatus) {}, func(status *support.ConnectorOperationStatus) {},
fault.New(true)) fault.New(true))
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))

View File

@ -23,6 +23,7 @@ type addedAndRemovedItemIDsGetter interface {
ctx context.Context, ctx context.Context,
user, containerID, oldDeltaToken string, user, containerID, oldDeltaToken string,
immutableIDs bool, immutableIDs bool,
canMakeDeltaQueries bool,
) ([]string, []string, api.DeltaUpdate, error) ) ([]string, []string, api.DeltaUpdate, error)
} }
@ -85,8 +86,8 @@ func filterContainersAndFillCollections(
var ( var (
dp = dps[cID] dp = dps[cID]
prevDelta = dp.delta prevDelta = dp.Delta
prevPathStr = dp.path // do not log: pii; log prevPath instead prevPathStr = dp.Path // do not log: pii; log prevPath instead
prevPath path.Path prevPath path.Path
ictx = clues.Add( ictx = clues.Add(
ctx, ctx,
@ -119,7 +120,8 @@ func filterContainersAndFillCollections(
qp.ResourceOwner.ID(), qp.ResourceOwner.ID(),
cID, cID,
prevDelta, prevDelta,
ctrlOpts.ToggleFeatures.ExchangeImmutableIDs) ctrlOpts.ToggleFeatures.ExchangeImmutableIDs,
!ctrlOpts.ToggleFeatures.DisableDelta)
if err != nil { if err != nil {
if !graph.IsErrDeletedInFlight(err) { if !graph.IsErrDeletedInFlight(err) {
el.AddRecoverable(clues.Stack(err).Label(fault.LabelForceNoBackupCreation)) el.AddRecoverable(clues.Stack(err).Label(fault.LabelForceNoBackupCreation))
@ -243,7 +245,7 @@ func makeTombstones(dps DeltaPaths) map[string]string {
r := make(map[string]string, len(dps)) r := make(map[string]string, len(dps))
for id, v := range dps { for id, v := range dps {
r[id] = v.path r[id] = v.Path
} }
return r return r

View File

@ -30,7 +30,10 @@ import (
var _ addedAndRemovedItemIDsGetter = &mockGetter{} var _ addedAndRemovedItemIDsGetter = &mockGetter{}
type ( type (
mockGetter map[string]mockGetterResults mockGetter struct {
noReturnDelta bool
results map[string]mockGetterResults
}
mockGetterResults struct { mockGetterResults struct {
added []string added []string
removed []string removed []string
@ -43,18 +46,24 @@ func (mg mockGetter) GetAddedAndRemovedItemIDs(
ctx context.Context, ctx context.Context,
userID, cID, prevDelta string, userID, cID, prevDelta string,
_ bool, _ bool,
_ bool,
) ( ) (
[]string, []string,
[]string, []string,
api.DeltaUpdate, api.DeltaUpdate,
error, error,
) { ) {
results, ok := mg[cID] results, ok := mg.results[cID]
if !ok { if !ok {
return nil, nil, api.DeltaUpdate{}, clues.New("mock not found for " + cID) return nil, nil, api.DeltaUpdate{}, clues.New("mock not found for " + cID)
} }
return results.added, results.removed, results.newDelta, results.err delta := results.newDelta
if mg.noReturnDelta {
delta.URL = ""
}
return results.added, results.removed, delta, results.err
} }
var _ graph.ContainerResolver = &mockResolver{} var _ graph.ContainerResolver = &mockResolver{}
@ -171,9 +180,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
}{ }{
{ {
name: "happy path, one container", name: "happy path, one container",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": commonResult, "1": commonResult,
}, },
},
resolver: newMockResolver(container1), resolver: newMockResolver(container1),
scope: allScope, scope: allScope,
expectErr: assert.NoError, expectErr: assert.NoError,
@ -182,10 +193,12 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
}, },
{ {
name: "happy path, many containers", name: "happy path, many containers",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": commonResult, "1": commonResult,
"2": commonResult, "2": commonResult,
}, },
},
resolver: newMockResolver(container1, container2), resolver: newMockResolver(container1, container2),
scope: allScope, scope: allScope,
expectErr: assert.NoError, expectErr: assert.NoError,
@ -194,10 +207,12 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
}, },
{ {
name: "no containers pass scope", name: "no containers pass scope",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": commonResult, "1": commonResult,
"2": commonResult, "2": commonResult,
}, },
},
resolver: newMockResolver(container1, container2), resolver: newMockResolver(container1, container2),
scope: selectors.NewExchangeBackup(nil).MailFolders(selectors.None())[0], scope: selectors.NewExchangeBackup(nil).MailFolders(selectors.None())[0],
expectErr: assert.NoError, expectErr: assert.NoError,
@ -206,9 +221,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
}, },
{ {
name: "err: deleted in flight", name: "err: deleted in flight",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": deletedInFlightResult, "1": deletedInFlightResult,
}, },
},
resolver: newMockResolver(container1), resolver: newMockResolver(container1),
scope: allScope, scope: allScope,
expectErr: assert.NoError, expectErr: assert.NoError,
@ -218,9 +235,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
}, },
{ {
name: "err: other error", name: "err: other error",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": errorResult, "1": errorResult,
}, },
},
resolver: newMockResolver(container1), resolver: newMockResolver(container1),
scope: allScope, scope: allScope,
expectErr: assert.NoError, expectErr: assert.NoError,
@ -229,10 +248,12 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
}, },
{ {
name: "half collections error: deleted in flight", name: "half collections error: deleted in flight",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": deletedInFlightResult, "1": deletedInFlightResult,
"2": commonResult, "2": commonResult,
}, },
},
resolver: newMockResolver(container1, container2), resolver: newMockResolver(container1, container2),
scope: allScope, scope: allScope,
expectErr: assert.NoError, expectErr: assert.NoError,
@ -242,10 +263,12 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
}, },
{ {
name: "half collections error: other error", name: "half collections error: other error",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": errorResult, "1": errorResult,
"2": commonResult, "2": commonResult,
}, },
},
resolver: newMockResolver(container1, container2), resolver: newMockResolver(container1, container2),
scope: allScope, scope: allScope,
expectErr: assert.NoError, expectErr: assert.NoError,
@ -254,10 +277,12 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
}, },
{ {
name: "half collections error: deleted in flight, fail fast", name: "half collections error: deleted in flight, fail fast",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": deletedInFlightResult, "1": deletedInFlightResult,
"2": commonResult, "2": commonResult,
}, },
},
resolver: newMockResolver(container1, container2), resolver: newMockResolver(container1, container2),
scope: allScope, scope: allScope,
failFast: control.FailFast, failFast: control.FailFast,
@ -268,10 +293,12 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
}, },
{ {
name: "half collections error: other error, fail fast", name: "half collections error: other error, fail fast",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": errorResult, "1": errorResult,
"2": commonResult, "2": commonResult,
}, },
},
resolver: newMockResolver(container1, container2), resolver: newMockResolver(container1, container2),
scope: allScope, scope: allScope,
failFast: control.FailFast, failFast: control.FailFast,
@ -281,12 +308,24 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
}, },
} }
for _, test := range table { for _, test := range table {
suite.Run(test.name, func() { for _, canMakeDeltaQueries := range []bool{true, false} {
name := test.name
if canMakeDeltaQueries {
name += "-delta"
} else {
name += "-non-delta"
}
suite.Run(name, func() {
t := suite.T() t := suite.T()
ctx, flush := tester.NewContext() ctx, flush := tester.NewContext()
defer flush() defer flush()
ctrlOpts := control.Options{FailureHandling: test.failFast}
ctrlOpts.ToggleFeatures.DisableDelta = !canMakeDeltaQueries
collections, err := filterContainersAndFillCollections( collections, err := filterContainersAndFillCollections(
ctx, ctx,
qp, qp,
@ -295,7 +334,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
test.resolver, test.resolver,
test.scope, test.scope,
dps, dps,
control.Options{FailureHandling: test.failFast}, ctrlOpts,
fault.New(test.failFast == control.FailFast)) fault.New(test.failFast == control.FailFast))
test.expectErr(t, err, clues.ToCore(err)) test.expectErr(t, err, clues.ToCore(err))
@ -327,7 +366,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
assert.Equal(t, test.expectDoNotMergeColls, doNotMerges, "doNotMerge collections") assert.Equal(t, test.expectDoNotMergeColls, doNotMerges, "doNotMerge collections")
// items in collections assertions // items in collections assertions
for k, expect := range test.getter { for k, expect := range test.getter.results {
coll := collections[k] coll := collections[k]
if coll == nil { if coll == nil {
@ -354,6 +393,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections() {
}) })
} }
} }
}
func checkMetadata( func checkMetadata(
t *testing.T, t *testing.T,
@ -488,74 +528,80 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_Dupli
}{ }{
{ {
name: "1 moved to duplicate", name: "1 moved to duplicate",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": result1, "1": result1,
"2": result2, "2": result2,
}, },
},
resolver: newMockResolver(container1, container2), resolver: newMockResolver(container1, container2),
inputMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { inputMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths {
return DeltaPaths{ return DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "old_delta", Delta: "old_delta",
path: oldPath1(t, cat).String(), Path: oldPath1(t, cat).String(),
}, },
"2": DeltaPath{ "2": DeltaPath{
delta: "old_delta", Delta: "old_delta",
path: idPath2(t, cat).String(), Path: idPath2(t, cat).String(),
}, },
} }
}, },
expectMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { expectMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths {
return DeltaPaths{ return DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "delta_url", Delta: "delta_url",
path: idPath1(t, cat).String(), Path: idPath1(t, cat).String(),
}, },
"2": DeltaPath{ "2": DeltaPath{
delta: "delta_url2", Delta: "delta_url2",
path: idPath2(t, cat).String(), Path: idPath2(t, cat).String(),
}, },
} }
}, },
}, },
{ {
name: "both move to duplicate", name: "both move to duplicate",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": result1, "1": result1,
"2": result2, "2": result2,
}, },
},
resolver: newMockResolver(container1, container2), resolver: newMockResolver(container1, container2),
inputMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { inputMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths {
return DeltaPaths{ return DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "old_delta", Delta: "old_delta",
path: oldPath1(t, cat).String(), Path: oldPath1(t, cat).String(),
}, },
"2": DeltaPath{ "2": DeltaPath{
delta: "old_delta", Delta: "old_delta",
path: oldPath2(t, cat).String(), Path: oldPath2(t, cat).String(),
}, },
} }
}, },
expectMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { expectMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths {
return DeltaPaths{ return DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "delta_url", Delta: "delta_url",
path: idPath1(t, cat).String(), Path: idPath1(t, cat).String(),
}, },
"2": DeltaPath{ "2": DeltaPath{
delta: "delta_url2", Delta: "delta_url2",
path: idPath2(t, cat).String(), Path: idPath2(t, cat).String(),
}, },
} }
}, },
}, },
{ {
name: "both new", name: "both new",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": result1, "1": result1,
"2": result2, "2": result2,
}, },
},
resolver: newMockResolver(container1, container2), resolver: newMockResolver(container1, container2),
inputMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { inputMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths {
return DeltaPaths{} return DeltaPaths{}
@ -564,27 +610,29 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_Dupli
expectMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { expectMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths {
return DeltaPaths{ return DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "delta_url", Delta: "delta_url",
path: idPath1(t, cat).String(), Path: idPath1(t, cat).String(),
}, },
"2": DeltaPath{ "2": DeltaPath{
delta: "delta_url2", Delta: "delta_url2",
path: idPath2(t, cat).String(), Path: idPath2(t, cat).String(),
}, },
} }
}, },
}, },
{ {
name: "add 1 remove 2", name: "add 1 remove 2",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": result1, "1": result1,
}, },
},
resolver: newMockResolver(container1), resolver: newMockResolver(container1),
inputMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { inputMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths {
return DeltaPaths{ return DeltaPaths{
"2": DeltaPath{ "2": DeltaPath{
delta: "old_delta", Delta: "old_delta",
path: idPath2(t, cat).String(), Path: idPath2(t, cat).String(),
}, },
} }
}, },
@ -593,8 +641,8 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_Dupli
expectMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths { expectMetadata: func(t *testing.T, cat path.CategoryType) DeltaPaths {
return DeltaPaths{ return DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "delta_url", Delta: "delta_url",
path: idPath1(t, cat).String(), Path: idPath1(t, cat).String(),
}, },
} }
}, },
@ -649,7 +697,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_Dupli
assert.Equal(t, 1, metadatas, "metadata collections") assert.Equal(t, 1, metadatas, "metadata collections")
// items in collections assertions // items in collections assertions
for k, expect := range test.getter { for k, expect := range test.getter.results {
coll := collections[k] coll := collections[k]
if coll == nil { if coll == nil {
@ -690,12 +738,14 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea
}{ }{
{ {
name: "repeated adds", name: "repeated adds",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": { "1": {
added: []string{"a1", "a2", "a3", "a1"}, added: []string{"a1", "a2", "a3", "a1"},
newDelta: newDelta, newDelta: newDelta,
}, },
}, },
},
expectAdded: map[string]struct{}{ expectAdded: map[string]struct{}{
"a1": {}, "a1": {},
"a2": {}, "a2": {},
@ -705,12 +755,14 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea
}, },
{ {
name: "repeated removes", name: "repeated removes",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": { "1": {
removed: []string{"r1", "r2", "r3", "r1"}, removed: []string{"r1", "r2", "r3", "r1"},
newDelta: newDelta, newDelta: newDelta,
}, },
}, },
},
expectAdded: map[string]struct{}{}, expectAdded: map[string]struct{}{},
expectRemoved: map[string]struct{}{ expectRemoved: map[string]struct{}{
"r1": {}, "r1": {},
@ -720,13 +772,15 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea
}, },
{ {
name: "remove for same item wins", name: "remove for same item wins",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": { "1": {
added: []string{"i1", "a2", "a3"}, added: []string{"i1", "a2", "a3"},
removed: []string{"i1", "r2", "r3"}, removed: []string{"i1", "r2", "r3"},
newDelta: newDelta, newDelta: newDelta,
}, },
}, },
},
expectAdded: map[string]struct{}{ expectAdded: map[string]struct{}{
"a2": {}, "a2": {},
"a3": {}, "a3": {},
@ -806,7 +860,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea
assert.Zero(t, doNotMerges, "doNotMerge collections") assert.Zero(t, doNotMerges, "doNotMerge collections")
// items in collections assertions // items in collections assertions
for k := range test.getter { for k := range test.getter.results {
coll := collections[k] coll := collections[k]
if !assert.NotNilf(t, coll, "missing collection for path %s", k) { if !assert.NotNilf(t, coll, "missing collection for path %s", k) {
continue continue
@ -822,7 +876,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_repea
} }
} }
func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incrementals() { func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incrementals_nondelta() {
var ( var (
userID = "user_id" userID = "user_id"
tenantID = suite.creds.AzureTenantID tenantID = suite.creds.AzureTenantID
@ -865,12 +919,15 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
resolver graph.ContainerResolver resolver graph.ContainerResolver
dps DeltaPaths dps DeltaPaths
expect map[string]endState expect map[string]endState
skipWhenForcedNoDelta bool
}{ }{
{ {
name: "new container", name: "new container",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": commonResults, "1": commonResults,
}, },
},
resolver: newMockResolver(mockContainer{ resolver: newMockResolver(mockContainer{
id: strPtr("1"), id: strPtr("1"),
displayName: strPtr("new"), displayName: strPtr("new"),
@ -884,9 +941,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}, },
{ {
name: "not moved container", name: "not moved container",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": commonResults, "1": commonResults,
}, },
},
resolver: newMockResolver(mockContainer{ resolver: newMockResolver(mockContainer{
id: strPtr("1"), id: strPtr("1"),
displayName: strPtr("not_moved"), displayName: strPtr("not_moved"),
@ -895,8 +954,8 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}), }),
dps: DeltaPaths{ dps: DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "old_delta_url", Delta: "old_delta_url",
path: prevPath(suite.T(), "1", "not_moved").String(), Path: prevPath(suite.T(), "1", "not_moved").String(),
}, },
}, },
expect: map[string]endState{ expect: map[string]endState{
@ -905,9 +964,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}, },
{ {
name: "moved container", name: "moved container",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": commonResults, "1": commonResults,
}, },
},
resolver: newMockResolver(mockContainer{ resolver: newMockResolver(mockContainer{
id: strPtr("1"), id: strPtr("1"),
displayName: strPtr("moved"), displayName: strPtr("moved"),
@ -916,8 +977,8 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}), }),
dps: DeltaPaths{ dps: DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "old_delta_url", Delta: "old_delta_url",
path: prevPath(suite.T(), "1", "prev").String(), Path: prevPath(suite.T(), "1", "prev").String(),
}, },
}, },
expect: map[string]endState{ expect: map[string]endState{
@ -926,12 +987,14 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}, },
{ {
name: "deleted container", name: "deleted container",
getter: map[string]mockGetterResults{}, getter: mockGetter{
results: map[string]mockGetterResults{},
},
resolver: newMockResolver(), resolver: newMockResolver(),
dps: DeltaPaths{ dps: DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "old_delta_url", Delta: "old_delta_url",
path: prevPath(suite.T(), "1", "deleted").String(), Path: prevPath(suite.T(), "1", "deleted").String(),
}, },
}, },
expect: map[string]endState{ expect: map[string]endState{
@ -940,9 +1003,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}, },
{ {
name: "one deleted, one new", name: "one deleted, one new",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"2": commonResults, "2": commonResults,
}, },
},
resolver: newMockResolver(mockContainer{ resolver: newMockResolver(mockContainer{
id: strPtr("2"), id: strPtr("2"),
displayName: strPtr("new"), displayName: strPtr("new"),
@ -951,8 +1016,8 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}), }),
dps: DeltaPaths{ dps: DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "old_delta_url", Delta: "old_delta_url",
path: prevPath(suite.T(), "1", "deleted").String(), Path: prevPath(suite.T(), "1", "deleted").String(),
}, },
}, },
expect: map[string]endState{ expect: map[string]endState{
@ -962,9 +1027,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}, },
{ {
name: "one deleted, one new, same path", name: "one deleted, one new, same path",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"2": commonResults, "2": commonResults,
}, },
},
resolver: newMockResolver(mockContainer{ resolver: newMockResolver(mockContainer{
id: strPtr("2"), id: strPtr("2"),
displayName: strPtr("same"), displayName: strPtr("same"),
@ -973,8 +1040,8 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}), }),
dps: DeltaPaths{ dps: DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "old_delta_url", Delta: "old_delta_url",
path: prevPath(suite.T(), "1", "same").String(), Path: prevPath(suite.T(), "1", "same").String(),
}, },
}, },
expect: map[string]endState{ expect: map[string]endState{
@ -984,10 +1051,12 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}, },
{ {
name: "one moved, one new, same path", name: "one moved, one new, same path",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": commonResults, "1": commonResults,
"2": commonResults, "2": commonResults,
}, },
},
resolver: newMockResolver( resolver: newMockResolver(
mockContainer{ mockContainer{
id: strPtr("1"), id: strPtr("1"),
@ -1004,8 +1073,8 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
), ),
dps: DeltaPaths{ dps: DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "old_delta_url", Delta: "old_delta_url",
path: prevPath(suite.T(), "1", "prev").String(), Path: prevPath(suite.T(), "1", "prev").String(),
}, },
}, },
expect: map[string]endState{ expect: map[string]endState{
@ -1015,9 +1084,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}, },
{ {
name: "bad previous path strings", name: "bad previous path strings",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": commonResults, "1": commonResults,
}, },
},
resolver: newMockResolver(mockContainer{ resolver: newMockResolver(mockContainer{
id: strPtr("1"), id: strPtr("1"),
displayName: strPtr("not_moved"), displayName: strPtr("not_moved"),
@ -1026,12 +1097,12 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}), }),
dps: DeltaPaths{ dps: DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "old_delta_url", Delta: "old_delta_url",
path: "1/fnords/mc/smarfs", Path: "1/fnords/mc/smarfs",
}, },
"2": DeltaPath{ "2": DeltaPath{
delta: "old_delta_url", Delta: "old_delta_url",
path: "2/fnords/mc/smarfs", Path: "2/fnords/mc/smarfs",
}, },
}, },
expect: map[string]endState{ expect: map[string]endState{
@ -1040,9 +1111,11 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}, },
{ {
name: "delta expiration", name: "delta expiration",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": expiredResults, "1": expiredResults,
}, },
},
resolver: newMockResolver(mockContainer{ resolver: newMockResolver(mockContainer{
id: strPtr("1"), id: strPtr("1"),
displayName: strPtr("same"), displayName: strPtr("same"),
@ -1051,23 +1124,26 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}), }),
dps: DeltaPaths{ dps: DeltaPaths{
"1": DeltaPath{ "1": DeltaPath{
delta: "old_delta_url", Delta: "old_delta_url",
path: prevPath(suite.T(), "1", "same").String(), Path: prevPath(suite.T(), "1", "same").String(),
}, },
}, },
expect: map[string]endState{ expect: map[string]endState{
"1": {data.NotMovedState, true}, "1": {data.NotMovedState, true},
}, },
skipWhenForcedNoDelta: true, // this is not a valid test for non-delta
}, },
{ {
name: "a little bit of everything", name: "a little bit of everything",
getter: map[string]mockGetterResults{ getter: mockGetter{
results: map[string]mockGetterResults{
"1": commonResults, // new "1": commonResults, // new
"2": commonResults, // notMoved "2": commonResults, // notMoved
"3": commonResults, // moved "3": commonResults, // moved
"4": expiredResults, // moved "4": expiredResults, // moved
// "5" gets deleted // "5" gets deleted
}, },
},
resolver: newMockResolver( resolver: newMockResolver(
mockContainer{ mockContainer{
id: strPtr("1"), id: strPtr("1"),
@ -1096,20 +1172,20 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
), ),
dps: DeltaPaths{ dps: DeltaPaths{
"2": DeltaPath{ "2": DeltaPath{
delta: "old_delta_url", Delta: "old_delta_url",
path: prevPath(suite.T(), "2", "not_moved").String(), Path: prevPath(suite.T(), "2", "not_moved").String(),
}, },
"3": DeltaPath{ "3": DeltaPath{
delta: "old_delta_url", Delta: "old_delta_url",
path: prevPath(suite.T(), "3", "prev").String(), Path: prevPath(suite.T(), "3", "prev").String(),
}, },
"4": DeltaPath{ "4": DeltaPath{
delta: "old_delta_url", Delta: "old_delta_url",
path: prevPath(suite.T(), "4", "prev").String(), Path: prevPath(suite.T(), "4", "prev").String(),
}, },
"5": DeltaPath{ "5": DeltaPath{
delta: "old_delta_url", Delta: "old_delta_url",
path: prevPath(suite.T(), "5", "deleted").String(), Path: prevPath(suite.T(), "5", "deleted").String(),
}, },
}, },
expect: map[string]endState{ expect: map[string]endState{
@ -1119,15 +1195,45 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
"4": {data.MovedState, true}, "4": {data.MovedState, true},
"5": {data.DeletedState, false}, "5": {data.DeletedState, false},
}, },
skipWhenForcedNoDelta: true,
}, },
} }
for _, test := range table { for _, test := range table {
suite.Run(test.name, func() { for _, deltaBefore := range []bool{true, false} {
for _, deltaAfter := range []bool{true, false} {
name := test.name
if deltaAfter {
name += "-delta"
} else {
if test.skipWhenForcedNoDelta {
suite.T().Skip("intentionally skipped non-delta case")
}
name += "-non-delta"
}
suite.Run(name, func() {
t := suite.T() t := suite.T()
ctx, flush := tester.NewContext() ctx, flush := tester.NewContext()
defer flush() defer flush()
ctrlOpts := control.Defaults()
ctrlOpts.ToggleFeatures.DisableDelta = !deltaAfter
getter := test.getter
if !deltaAfter {
getter.noReturnDelta = false
}
dps := test.dps
if !deltaBefore {
for k, dp := range dps {
dp.Delta = ""
dps[k] = dp
}
}
collections, err := filterContainersAndFillCollections( collections, err := filterContainersAndFillCollections(
ctx, ctx,
qp, qp,
@ -1136,7 +1242,7 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
test.resolver, test.resolver,
allScope, allScope,
test.dps, test.dps,
control.Defaults(), ctrlOpts,
fault.New(true)) fault.New(true))
assert.NoError(t, err, clues.ToCore(err)) assert.NoError(t, err, clues.ToCore(err))
@ -1167,3 +1273,5 @@ func (suite *ServiceIteratorsSuite) TestFilterContainersAndFillCollections_incre
}) })
} }
} }
}
}

View File

@ -36,6 +36,7 @@ const (
mailboxNotEnabledForRESTAPI errorCode = "MailboxNotEnabledForRESTAPI" mailboxNotEnabledForRESTAPI errorCode = "MailboxNotEnabledForRESTAPI"
malwareDetected errorCode = "malwareDetected" malwareDetected errorCode = "malwareDetected"
requestResourceNotFound errorCode = "Request_ResourceNotFound" requestResourceNotFound errorCode = "Request_ResourceNotFound"
quotaExceeded errorCode = "ErrorQuotaExceeded"
resourceNotFound errorCode = "ResourceNotFound" resourceNotFound errorCode = "ResourceNotFound"
resyncRequired errorCode = "ResyncRequired" // alt: resyncRequired resyncRequired errorCode = "ResyncRequired" // alt: resyncRequired
syncFolderNotFound errorCode = "ErrorSyncFolderNotFound" syncFolderNotFound errorCode = "ErrorSyncFolderNotFound"
@ -111,6 +112,10 @@ func IsErrInvalidDelta(err error) bool {
errors.Is(err, ErrInvalidDelta) errors.Is(err, ErrInvalidDelta)
} }
func IsErrQuotaExceeded(err error) bool {
return hasErrorCode(err, quotaExceeded)
}
func IsErrExchangeMailFolderNotFound(err error) bool { func IsErrExchangeMailFolderNotFound(err error) bool {
return hasErrorCode(err, resourceNotFound, mailboxNotEnabledForRESTAPI) return hasErrorCode(err, resourceNotFound, mailboxNotEnabledForRESTAPI)
} }

View File

@ -161,6 +161,45 @@ func (suite *GraphErrorsUnitSuite) TestIsErrInvalidDelta() {
} }
} }
func (suite *GraphErrorsUnitSuite) TestIsErrQuotaExceeded() {
table := []struct {
name string
err error
expect assert.BoolAssertionFunc
}{
{
name: "nil",
err: nil,
expect: assert.False,
},
{
name: "non-matching",
err: assert.AnError,
expect: assert.False,
},
{
name: "as",
err: ErrInvalidDelta,
expect: assert.False,
},
{
name: "non-matching oDataErr",
err: odErr("fnords"),
expect: assert.False,
},
{
name: "quota-exceeded oDataErr",
err: odErr("ErrorQuotaExceeded"),
expect: assert.True,
},
}
for _, test := range table {
suite.Run(test.name, func() {
test.expect(suite.T(), IsErrQuotaExceeded(test.err))
})
}
}
func (suite *GraphErrorsUnitSuite) TestIsErrUserNotFound() { func (suite *GraphErrorsUnitSuite) TestIsErrUserNotFound() {
table := []struct { table := []struct {
name string name string

View File

@ -708,9 +708,15 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() {
} }
} }
// TestBackup_Run ensures that Integration Testing works
// for the following scopes: Contacts, Events, and Mail
func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() { func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
testExchangeContinuousBackups(suite, control.Toggles{})
}
func (suite *BackupOpIntegrationSuite) TestBackup_Run_nonIncrementalExchange() {
testExchangeContinuousBackups(suite, control.Toggles{DisableDelta: true})
}
func testExchangeContinuousBackups(suite *BackupOpIntegrationSuite, toggles control.Toggles) {
ctx, flush := tester.NewContext() ctx, flush := tester.NewContext()
defer flush() defer flush()
@ -719,7 +725,6 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
var ( var (
t = suite.T() t = suite.T()
acct = tester.NewM365Account(t) acct = tester.NewM365Account(t)
ffs = control.Toggles{}
mb = evmock.NewBus() mb = evmock.NewBus()
now = dttm.Now() now = dttm.Now()
service = path.ExchangeService service = path.ExchangeService
@ -860,7 +865,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
} }
} }
bo, acct, kw, ms, ss, gc, sels, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, ffs, version.Backup) bo, acct, kw, ms, ss, gc, sels, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, toggles, version.Backup)
defer closer() defer closer()
// run the initial backup // run the initial backup
@ -947,14 +952,18 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
name string name string
// performs the incremental update required for the test. // performs the incremental update required for the test.
updateUserData func(t *testing.T) updateUserData func(t *testing.T)
itemsRead int deltaItemsRead int
itemsWritten int deltaItemsWritten int
nonDeltaItemsRead int
nonDeltaItemsWritten int
}{ }{
{ {
name: "clean incremental, no changes", name: "clean, no changes",
updateUserData: func(t *testing.T) {}, updateUserData: func(t *testing.T) {},
itemsRead: 0, deltaItemsRead: 0,
itemsWritten: 0, deltaItemsWritten: 0,
nonDeltaItemsRead: 8,
nonDeltaItemsWritten: 0, // unchanged items are not counted towards write
}, },
{ {
name: "move an email folder to a subfolder", name: "move an email folder to a subfolder",
@ -979,8 +988,10 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
newLoc := expectDeets.MoveLocation(cat.String(), from.locRef, to.locRef) newLoc := expectDeets.MoveLocation(cat.String(), from.locRef, to.locRef)
from.locRef = newLoc from.locRef = newLoc
}, },
itemsRead: 0, // zero because we don't count container reads deltaItemsRead: 0, // zero because we don't count container reads
itemsWritten: 2, deltaItemsWritten: 2,
nonDeltaItemsRead: 8,
nonDeltaItemsWritten: 2,
}, },
{ {
name: "delete a folder", name: "delete a folder",
@ -1003,8 +1014,10 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
expectDeets.RemoveLocation(category.String(), d.dests[container2].locRef) expectDeets.RemoveLocation(category.String(), d.dests[container2].locRef)
} }
}, },
itemsRead: 0, deltaItemsRead: 0,
itemsWritten: 0, // deletions are not counted as "writes" deltaItemsWritten: 0, // deletions are not counted as "writes"
nonDeltaItemsRead: 4,
nonDeltaItemsWritten: 0,
}, },
{ {
name: "add a new folder", name: "add a new folder",
@ -1053,8 +1066,10 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
} }
} }
}, },
itemsRead: 4, deltaItemsRead: 4,
itemsWritten: 4, deltaItemsWritten: 4,
nonDeltaItemsRead: 8,
nonDeltaItemsWritten: 4,
}, },
{ {
name: "rename a folder", name: "rename a folder",
@ -1111,10 +1126,12 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
} }
} }
}, },
itemsRead: 0, // containers are not counted as reads deltaItemsRead: 0, // containers are not counted as reads
// Renaming a folder doesn't cause kopia changes as the folder ID doesn't // Renaming a folder doesn't cause kopia changes as the folder ID doesn't
// change. // change.
itemsWritten: 0, deltaItemsWritten: 0, // two items per category
nonDeltaItemsRead: 8,
nonDeltaItemsWritten: 0,
}, },
{ {
name: "add a new item", name: "add a new item",
@ -1165,8 +1182,10 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
} }
} }
}, },
itemsRead: 2, deltaItemsRead: 2,
itemsWritten: 2, deltaItemsWritten: 2,
nonDeltaItemsRead: 10,
nonDeltaItemsWritten: 2,
}, },
{ {
name: "delete an existing item", name: "delete an existing item",
@ -1177,7 +1196,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
switch category { switch category {
case path.EmailCategory: case path.EmailCategory:
ids, _, _, err := ac.Mail().GetAddedAndRemovedItemIDs(ctx, uidn.ID(), containerID, "", false) ids, _, _, err := ac.Mail().GetAddedAndRemovedItemIDs(ctx, uidn.ID(), containerID, "", false, true)
require.NoError(t, err, "getting message ids", clues.ToCore(err)) require.NoError(t, err, "getting message ids", clues.ToCore(err))
require.NotEmpty(t, ids, "message ids in folder") require.NotEmpty(t, ids, "message ids in folder")
@ -1190,7 +1209,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
ids[0]) ids[0])
case path.ContactsCategory: case path.ContactsCategory:
ids, _, _, err := ac.Contacts().GetAddedAndRemovedItemIDs(ctx, uidn.ID(), containerID, "", false) ids, _, _, err := ac.Contacts().GetAddedAndRemovedItemIDs(ctx, uidn.ID(), containerID, "", false, true)
require.NoError(t, err, "getting contact ids", clues.ToCore(err)) require.NoError(t, err, "getting contact ids", clues.ToCore(err))
require.NotEmpty(t, ids, "contact ids in folder") require.NotEmpty(t, ids, "contact ids in folder")
@ -1203,7 +1222,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
ids[0]) ids[0])
case path.EventsCategory: case path.EventsCategory:
ids, _, _, err := ac.Events().GetAddedAndRemovedItemIDs(ctx, uidn.ID(), containerID, "", false) ids, _, _, err := ac.Events().GetAddedAndRemovedItemIDs(ctx, uidn.ID(), containerID, "", false, true)
require.NoError(t, err, "getting event ids", clues.ToCore(err)) require.NoError(t, err, "getting event ids", clues.ToCore(err))
require.NotEmpty(t, ids, "event ids in folder") require.NotEmpty(t, ids, "event ids in folder")
@ -1217,16 +1236,19 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
} }
} }
}, },
itemsRead: 2, deltaItemsRead: 2,
itemsWritten: 0, // deletes are not counted as "writes" deltaItemsWritten: 0, // deletes are not counted as "writes"
nonDeltaItemsRead: 8,
nonDeltaItemsWritten: 0,
}, },
} }
for _, test := range table { for _, test := range table {
suite.Run(test.name, func() { suite.Run(test.name, func() {
var ( var (
t = suite.T() t = suite.T()
incMB = evmock.NewBus() incMB = evmock.NewBus()
incBO = newTestBackupOp(t, ctx, kw, ms, gc, acct, sels, incMB, ffs, closer) incBO = newTestBackupOp(t, ctx, kw, ms, gc, acct, sels, incMB, toggles, closer)
atid = m365.AzureTenantID atid = m365.AzureTenantID
) )
@ -1243,8 +1265,14 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_incrementalExchange() {
// do some additional checks to ensure the incremental dealt with fewer items. // do some additional checks to ensure the incremental dealt with fewer items.
// +4 on read/writes to account for metadata: 1 delta and 1 path for each type. // +4 on read/writes to account for metadata: 1 delta and 1 path for each type.
assert.Equal(t, test.itemsWritten+4, incBO.Results.ItemsWritten, "incremental items written") if !toggles.DisableDelta {
assert.Equal(t, test.itemsRead+4, incBO.Results.ItemsRead, "incremental items read") assert.Equal(t, test.deltaItemsRead+4, incBO.Results.ItemsRead, "incremental items read")
assert.Equal(t, test.deltaItemsWritten+4, incBO.Results.ItemsWritten, "incremental items written")
} else {
assert.Equal(t, test.nonDeltaItemsRead+4, incBO.Results.ItemsRead, "non delta items read")
assert.Equal(t, test.nonDeltaItemsWritten+4, incBO.Results.ItemsWritten, "non delta items written")
}
assert.NoError(t, incBO.Errors.Failure(), "incremental non-recoverable error", clues.ToCore(incBO.Errors.Failure())) assert.NoError(t, incBO.Errors.Failure(), "incremental non-recoverable error", clues.ToCore(incBO.Errors.Failure()))
assert.Empty(t, incBO.Errors.Recovered(), "incremental recoverable/iteration errors") assert.Empty(t, incBO.Errors.Recovered(), "incremental recoverable/iteration errors")
assert.Equal(t, 1, incMB.TimesCalled[events.BackupStart], "incremental backup-start events") assert.Equal(t, 1, incMB.TimesCalled[events.BackupStart], "incremental backup-start events")

View File

@ -626,11 +626,9 @@ const (
func UpdateItem(item *ItemInfo, newLocPath *path.Builder) { func UpdateItem(item *ItemInfo, newLocPath *path.Builder) {
// Only OneDrive and SharePoint have information about parent folders // Only OneDrive and SharePoint have information about parent folders
// contained in them. // contained in them.
var updatePath func(newLocPath *path.Builder)
// Can't switch based on infoType because that's been unstable. // Can't switch based on infoType because that's been unstable.
if item.Exchange != nil { if item.Exchange != nil {
updatePath = item.Exchange.UpdateParentPath item.Exchange.UpdateParentPath(newLocPath)
} else if item.SharePoint != nil { } else if item.SharePoint != nil {
// SharePoint used to store library items with the OneDriveItem ItemType. // SharePoint used to store library items with the OneDriveItem ItemType.
// Start switching them over as we see them since there's no point in // Start switching them over as we see them since there's no point in
@ -639,14 +637,10 @@ func UpdateItem(item *ItemInfo, newLocPath *path.Builder) {
item.SharePoint.ItemType = SharePointLibrary item.SharePoint.ItemType = SharePointLibrary
} }
updatePath = item.SharePoint.UpdateParentPath item.SharePoint.UpdateParentPath(newLocPath)
} else if item.OneDrive != nil { } else if item.OneDrive != nil {
updatePath = item.OneDrive.UpdateParentPath item.OneDrive.UpdateParentPath(newLocPath)
} else {
return
} }
updatePath(newLocPath)
} }
// ItemInfo is a oneOf that contains service specific // ItemInfo is a oneOf that contains service specific

View File

@ -101,6 +101,14 @@ type Toggles struct {
// DisableIncrementals prevents backups from using incremental lookups, // DisableIncrementals prevents backups from using incremental lookups,
// forcing a new, complete backup of all data regardless of prior state. // forcing a new, complete backup of all data regardless of prior state.
DisableIncrementals bool `json:"exchangeIncrementals,omitempty"` DisableIncrementals bool `json:"exchangeIncrementals,omitempty"`
// DisableDelta prevents backups from using delta based lookups,
// forcing a backup by enumerating all items. This is different
// from DisableIncrementals in that this does not even makes use of
// delta endpoints with or without a delta token. This is necessary
// when the user has filled up the mailbox storage available to the
// user as Microsoft prevents the API from being able to make calls
// to delta endpoints.
DisableDelta bool `json:"exchangeDelta,omitempty"`
// ExchangeImmutableIDs denotes whether Corso should store items with // ExchangeImmutableIDs denotes whether Corso should store items with
// immutable Exchange IDs. This is only safe to set if the previous backup for // immutable Exchange IDs. This is only safe to set if the previous backup for
// incremental backups used immutable IDs or if a full backup is being done. // incremental backups used immutable IDs or if a full backup is being done.

View File

@ -58,6 +58,7 @@ type MailboxInfo struct {
Language Language Language Language
WorkingHours WorkingHours WorkingHours WorkingHours
ErrGetMailBoxSetting []error ErrGetMailBoxSetting []error
QuotaExceeded bool
} }
type AutomaticRepliesSettings struct { type AutomaticRepliesSettings struct {
@ -109,6 +110,12 @@ func (ui *UserInfo) ServiceEnabled(service path.ServiceType) bool {
return ok return ok
} }
// Returns if we can run delta queries on a mailbox. We cannot run
// them if the mailbox is full which is indicated by QuotaExceeded.
func (ui *UserInfo) CanMakeDeltaQueries() bool {
return !ui.Mailbox.QuotaExceeded
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// methods // methods
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -260,7 +267,8 @@ func (c Users) GetInfo(ctx context.Context, userID string) (*UserInfo, error) {
QueryParameters: &requestParameters, QueryParameters: &requestParameters,
} }
if _, err := c.GetMailFolders(ctx, userID, options); err != nil { mfs, err := c.GetMailFolders(ctx, userID, options)
if err != nil {
if graph.IsErrUserNotFound(err) { if graph.IsErrUserNotFound(err) {
logger.CtxErr(ctx, err).Error("user not found") logger.CtxErr(ctx, err).Error("user not found")
return nil, graph.Stack(ctx, clues.Stack(graph.ErrResourceOwnerNotFound, err)) return nil, graph.Stack(ctx, clues.Stack(graph.ErrResourceOwnerNotFound, err))
@ -295,6 +303,32 @@ func (c Users) GetInfo(ctx context.Context, userID string) (*UserInfo, error) {
userInfo.Mailbox = mbxInfo userInfo.Mailbox = mbxInfo
// TODO: This tries to determine if the user has hit their mailbox
// limit by trying to fetch an item and seeing if we get the quota
// exceeded error. Ideally(if available) we should convert this to
// pull the user's usage via an api and compare if they have used
// up their quota.
if mfs != nil {
mf := mfs.GetValue()[0] // we will always have one
options := &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetRequestConfiguration{
QueryParameters: &users.ItemMailFoldersItemMessagesDeltaRequestBuilderGetQueryParameters{
Top: ptr.To[int32](1), // just one item is enough
},
}
_, err = c.stable.Client().
UsersById(userID).
MailFoldersById(ptr.Val(mf.GetId())).
Messages().
Delta().
Get(ctx, options)
if err != nil && !graph.IsErrQuotaExceeded(err) {
return nil, err
}
userInfo.Mailbox.QuotaExceeded = graph.IsErrQuotaExceeded(err)
}
return userInfo, nil return userInfo, nil
} }