Fetch mod time when getting added and removed items (#4266)
Also return mod time when available if getting the set of added and removed items. This will be leveraged in later PRs to implement kopia assisted incrementals for exchange Does not change any logic in collections right now, just adds the fields to be returned Also adds an additional return value denoting if the mod times are expected to be valid. This is required because events delta cannot return mod time --- #### Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [ ] 🕐 Yes, but in a later PR - [x] ⛔ No #### Type of change - [ ] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Supportability/Tests - [ ] 💻 CI/Deployment - [x] 🧹 Tech Debt/Cleanup #### Issue(s) * #2023 #### Test Plan - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
647f7326d7
commit
4a9951b876
@ -160,7 +160,7 @@ func populateCollections(
|
||||
|
||||
ictx = clues.Add(ictx, "previous_path", prevPath)
|
||||
|
||||
added, removed, newDelta, err := bh.itemEnumerator().
|
||||
added, _, removed, newDelta, err := bh.itemEnumerator().
|
||||
GetAddedAndRemovedItemIDs(
|
||||
ictx,
|
||||
qp.ProtectedResource.ID(),
|
||||
@ -201,7 +201,7 @@ func populateCollections(
|
||||
|
||||
collections[cID] = &edc
|
||||
|
||||
for _, add := range added {
|
||||
for add := range added {
|
||||
edc.added[add] = struct{}{}
|
||||
}
|
||||
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -72,14 +73,15 @@ func (mg mockGetter) GetAddedAndRemovedItemIDs(
|
||||
_ bool,
|
||||
_ bool,
|
||||
) (
|
||||
[]string,
|
||||
map[string]time.Time,
|
||||
bool,
|
||||
[]string,
|
||||
api.DeltaUpdate,
|
||||
error,
|
||||
) {
|
||||
results, ok := mg.results[cID]
|
||||
if !ok {
|
||||
return nil, nil, api.DeltaUpdate{}, clues.New("mock not found for " + cID)
|
||||
return nil, false, nil, api.DeltaUpdate{}, clues.New("mock not found for " + cID)
|
||||
}
|
||||
|
||||
delta := results.newDelta
|
||||
@ -87,7 +89,12 @@ func (mg mockGetter) GetAddedAndRemovedItemIDs(
|
||||
delta.URL = ""
|
||||
}
|
||||
|
||||
return results.added, results.removed, delta, results.err
|
||||
resAdded := make(map[string]time.Time, len(results.added))
|
||||
for _, add := range results.added {
|
||||
resAdded[add] = time.Time{}
|
||||
}
|
||||
|
||||
return resAdded, false, results.removed, delta, results.err
|
||||
}
|
||||
|
||||
var _ graph.ContainerResolver = &mockResolver{}
|
||||
|
||||
@ -2,6 +2,7 @@ package exchange
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/microsoft/kiota-abstractions-go/serialization"
|
||||
|
||||
@ -30,7 +31,7 @@ type addedAndRemovedItemGetter interface {
|
||||
user, containerID, oldDeltaToken string,
|
||||
immutableIDs bool,
|
||||
canMakeDeltaQueries bool,
|
||||
) ([]string, []string, api.DeltaUpdate, error)
|
||||
) (map[string]time.Time, bool, []string, api.DeltaUpdate, error)
|
||||
}
|
||||
|
||||
type itemGetterSerializer interface {
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common/pii"
|
||||
"github.com/alcionai/corso/src/internal/common/ptr"
|
||||
@ -153,13 +154,13 @@ func populateCollections(
|
||||
// and will return an error if a delta token is queried.
|
||||
canMakeDeltaQueries := len(ptr.Val(c.GetEmail())) > 0
|
||||
|
||||
add, rem, du, err := bh.getChannelMessageIDs(ctx, cID, prevDelta, canMakeDeltaQueries)
|
||||
add, _, rem, du, err := bh.getChannelMessageIDs(ctx, cID, prevDelta, canMakeDeltaQueries)
|
||||
if err != nil {
|
||||
el.AddRecoverable(ctx, clues.Stack(err))
|
||||
continue
|
||||
}
|
||||
|
||||
added := str.SliceToMap(add)
|
||||
added := str.SliceToMap(maps.Keys(add))
|
||||
removed := str.SliceToMap(rem)
|
||||
|
||||
if len(du.URL) > 0 {
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
@ -57,8 +58,14 @@ func (bh mockBackupHandler) getChannelMessageIDs(
|
||||
_ context.Context,
|
||||
_, _ string,
|
||||
_ bool,
|
||||
) ([]string, []string, api.DeltaUpdate, error) {
|
||||
return bh.messageIDs, bh.deletedMsgIDs, api.DeltaUpdate{}, bh.messagesErr
|
||||
) (map[string]time.Time, bool, []string, api.DeltaUpdate, error) {
|
||||
idRes := make(map[string]time.Time, len(bh.messageIDs))
|
||||
|
||||
for _, id := range bh.messageIDs {
|
||||
idRes[id] = time.Time{}
|
||||
}
|
||||
|
||||
return idRes, true, bh.deletedMsgIDs, api.DeltaUpdate{}, bh.messagesErr
|
||||
}
|
||||
|
||||
func (bh mockBackupHandler) includeContainer(
|
||||
|
||||
@ -2,6 +2,7 @@ package groups
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
|
||||
@ -40,7 +41,7 @@ func (bh channelsBackupHandler) getChannelMessageIDs(
|
||||
ctx context.Context,
|
||||
channelID, prevDelta string,
|
||||
canMakeDeltaQueries bool,
|
||||
) ([]string, []string, api.DeltaUpdate, error) {
|
||||
) (map[string]time.Time, bool, []string, api.DeltaUpdate, error) {
|
||||
return bh.ac.GetChannelMessageIDs(ctx, bh.protectedResource, channelID, prevDelta, canMakeDeltaQueries)
|
||||
}
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ package groups
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
|
||||
@ -25,7 +26,7 @@ type backupHandler interface {
|
||||
ctx context.Context,
|
||||
channelID, prevDelta string,
|
||||
canMakeDeltaQueries bool,
|
||||
) ([]string, []string, api.DeltaUpdate, error)
|
||||
) (map[string]time.Time, bool, []string, api.DeltaUpdate, error)
|
||||
|
||||
// includeContainer evaluates whether the channel is included
|
||||
// in the provided scope.
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
@ -381,12 +382,12 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr
|
||||
|
||||
var (
|
||||
err error
|
||||
items []string
|
||||
items map[string]time.Time
|
||||
)
|
||||
|
||||
switch category {
|
||||
case path.EmailCategory:
|
||||
items, _, _, err = ac.Mail().GetAddedAndRemovedItemIDs(
|
||||
items, _, _, _, err = ac.Mail().GetAddedAndRemovedItemIDs(
|
||||
ctx,
|
||||
uidn.ID(),
|
||||
containerID,
|
||||
@ -395,7 +396,7 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr
|
||||
true)
|
||||
|
||||
case path.EventsCategory:
|
||||
items, _, _, err = ac.Events().GetAddedAndRemovedItemIDs(
|
||||
items, _, _, _, err = ac.Events().GetAddedAndRemovedItemIDs(
|
||||
ctx,
|
||||
uidn.ID(),
|
||||
containerID,
|
||||
@ -404,7 +405,7 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr
|
||||
true)
|
||||
|
||||
case path.ContactsCategory:
|
||||
items, _, _, err = ac.Contacts().GetAddedAndRemovedItemIDs(
|
||||
items, _, _, _, err = ac.Contacts().GetAddedAndRemovedItemIDs(
|
||||
ctx,
|
||||
uidn.ID(),
|
||||
containerID,
|
||||
@ -423,7 +424,7 @@ func testExchangeContinuousBackups(suite *ExchangeBackupIntgSuite, toggles contr
|
||||
dest := dataset[category].dests[destName]
|
||||
dest.locRef = locRef.String()
|
||||
dest.containerID = containerID
|
||||
dest.itemRefs = items
|
||||
dest.itemRefs = maps.Keys(items)
|
||||
dataset[category].dests[destName] = dest
|
||||
|
||||
// Add the directory and all its ancestors to the cache so we can compare
|
||||
|
||||
@ -2,6 +2,7 @@ package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
@ -35,6 +36,10 @@ func (p *channelMessagePageCtrl) GetPage(
|
||||
return resp, graph.Stack(ctx, err).OrNil()
|
||||
}
|
||||
|
||||
func (p *channelMessagePageCtrl) ValidModTimes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c Channels) NewChannelMessagePager(
|
||||
teamID, channelID string,
|
||||
selectProps ...string,
|
||||
@ -100,6 +105,10 @@ func (p *channelMessageDeltaPageCtrl) Reset(context.Context) {
|
||||
Delta()
|
||||
}
|
||||
|
||||
func (p *channelMessageDeltaPageCtrl) ValidModTimes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c Channels) NewChannelMessageDeltaPager(
|
||||
teamID, channelID, prevDelta string,
|
||||
selectProps ...string,
|
||||
@ -141,16 +150,16 @@ func (c Channels) GetChannelMessageIDs(
|
||||
ctx context.Context,
|
||||
teamID, channelID, prevDeltaLink string,
|
||||
canMakeDeltaQueries bool,
|
||||
) ([]string, []string, DeltaUpdate, error) {
|
||||
added, removed, du, err := getAddedAndRemovedItemIDs(
|
||||
) (map[string]time.Time, bool, []string, DeltaUpdate, error) {
|
||||
added, validModTimes, removed, du, err := getAddedAndRemovedItemIDs[models.ChatMessageable](
|
||||
ctx,
|
||||
c.NewChannelMessagePager(teamID, channelID),
|
||||
c.NewChannelMessageDeltaPager(teamID, channelID, prevDeltaLink),
|
||||
prevDeltaLink,
|
||||
canMakeDeltaQueries,
|
||||
addedAndRemovedByDeletedDateTime)
|
||||
addedAndRemovedByDeletedDateTime[models.ChatMessageable])
|
||||
|
||||
return added, removed, du, clues.Stack(err).OrNil()
|
||||
return added, validModTimes, removed, du, clues.Stack(err).OrNil()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@ -180,6 +189,10 @@ func (p *channelMessageRepliesPageCtrl) GetOdataNextLink() *string {
|
||||
return ptr.To("")
|
||||
}
|
||||
|
||||
func (p *channelMessageRepliesPageCtrl) ValidModTimes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c Channels) NewChannelMessageRepliesPager(
|
||||
teamID, channelID, messageID string,
|
||||
selectProps ...string,
|
||||
@ -242,6 +255,10 @@ func (p *channelPageCtrl) GetPage(
|
||||
return resp, graph.Stack(ctx, err).OrNil()
|
||||
}
|
||||
|
||||
func (p *channelPageCtrl) ValidModTimes() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c Channels) NewChannelPager(
|
||||
teamID string,
|
||||
) *channelPageCtrl {
|
||||
|
||||
@ -56,7 +56,7 @@ func (suite *ChannelsPagerIntgSuite) TestEnumerateChannelMessages() {
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
addedIDs, _, du, err := ac.GetChannelMessageIDs(
|
||||
addedIDs, _, _, du, err := ac.GetChannelMessageIDs(
|
||||
ctx,
|
||||
suite.its.group.id,
|
||||
suite.its.group.testContainerID,
|
||||
@ -67,7 +67,7 @@ func (suite *ChannelsPagerIntgSuite) TestEnumerateChannelMessages() {
|
||||
require.NotZero(t, du.URL, "delta link")
|
||||
require.True(t, du.Reset, "reset due to empty prev delta link")
|
||||
|
||||
addedIDs, deletedIDs, du, err := ac.GetChannelMessageIDs(
|
||||
addedIDs, _, deletedIDs, du, err := ac.GetChannelMessageIDs(
|
||||
ctx,
|
||||
suite.its.group.id,
|
||||
suite.its.group.testContainerID,
|
||||
@ -79,7 +79,7 @@ func (suite *ChannelsPagerIntgSuite) TestEnumerateChannelMessages() {
|
||||
require.NotZero(t, du.URL, "delta link")
|
||||
require.False(t, du.Reset, "prev delta link should be valid")
|
||||
|
||||
for _, id := range addedIDs {
|
||||
for id := range addedIDs {
|
||||
suite.Run(id+"-replies", func() {
|
||||
testEnumerateChannelMessageReplies(
|
||||
suite.T(),
|
||||
|
||||
@ -25,6 +25,7 @@ const (
|
||||
givenName = "givenName"
|
||||
isCancelled = "isCancelled"
|
||||
isDraft = "isDraft"
|
||||
lastModifiedDateTime = "lastModifiedDateTime"
|
||||
mobilePhone = "mobilePhone"
|
||||
parentFolderID = "parentFolderId"
|
||||
receivedDateTime = "receivedDateTime"
|
||||
|
||||
@ -2,6 +2,7 @@ package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
@ -136,6 +137,10 @@ func (p *contactsPageCtrl) SetNextLink(nextLink string) {
|
||||
p.builder = users.NewItemContactFoldersItemContactsRequestBuilder(nextLink, p.gs.Adapter())
|
||||
}
|
||||
|
||||
func (p *contactsPageCtrl) ValidModTimes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c Contacts) GetItemsInContainerByCollisionKey(
|
||||
ctx context.Context,
|
||||
userID, containerID string,
|
||||
@ -249,12 +254,16 @@ func (p *contactDeltaPager) Reset(ctx context.Context) {
|
||||
p.builder = getContactDeltaBuilder(ctx, p.gs, p.userID, p.containerID)
|
||||
}
|
||||
|
||||
func (p *contactDeltaPager) ValidModTimes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c Contacts) GetAddedAndRemovedItemIDs(
|
||||
ctx context.Context,
|
||||
userID, containerID, prevDeltaLink string,
|
||||
immutableIDs bool,
|
||||
canMakeDeltaQueries bool,
|
||||
) ([]string, []string, DeltaUpdate, error) {
|
||||
) (map[string]time.Time, bool, []string, DeltaUpdate, error) {
|
||||
ctx = clues.Add(
|
||||
ctx,
|
||||
"data_category", path.ContactsCategory,
|
||||
@ -266,12 +275,12 @@ func (c Contacts) GetAddedAndRemovedItemIDs(
|
||||
containerID,
|
||||
prevDeltaLink,
|
||||
immutableIDs,
|
||||
idAnd()...)
|
||||
idAnd(lastModifiedDateTime)...)
|
||||
pager := c.NewContactsPager(
|
||||
userID,
|
||||
containerID,
|
||||
immutableIDs,
|
||||
idAnd()...)
|
||||
idAnd(lastModifiedDateTime)...)
|
||||
|
||||
return getAddedAndRemovedItemIDs[models.Contactable](
|
||||
ctx,
|
||||
@ -279,5 +288,5 @@ func (c Contacts) GetAddedAndRemovedItemIDs(
|
||||
deltaPager,
|
||||
prevDeltaLink,
|
||||
canMakeDeltaQueries,
|
||||
addedAndRemovedByAddtlData)
|
||||
addedAndRemovedByAddtlData[models.Contactable])
|
||||
}
|
||||
|
||||
@ -61,6 +61,10 @@ func (p *driveItemPageCtrl) SetNextLink(nextLink string) {
|
||||
p.builder = drives.NewItemItemsItemChildrenRequestBuilder(nextLink, p.gs.Adapter())
|
||||
}
|
||||
|
||||
func (p *driveItemPageCtrl) ValidModTimes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type DriveItemIDType struct {
|
||||
ItemID string
|
||||
IsFolder bool
|
||||
@ -185,6 +189,10 @@ func (p *DriveItemDeltaPageCtrl) Reset(context.Context) {
|
||||
Delta()
|
||||
}
|
||||
|
||||
func (p *DriveItemDeltaPageCtrl) ValidModTimes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// user's drives pager
|
||||
// ---------------------------------------------------------------------------
|
||||
@ -252,6 +260,10 @@ func (p *userDrivePager) SetNextLink(link string) {
|
||||
p.builder = users.NewItemDrivesRequestBuilder(link, p.gs.Adapter())
|
||||
}
|
||||
|
||||
func (p *userDrivePager) ValidModTimes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// site's libraries pager
|
||||
// ---------------------------------------------------------------------------
|
||||
@ -303,6 +315,10 @@ func (p *siteDrivePager) SetNextLink(link string) {
|
||||
p.builder = sites.NewItemDrivesRequestBuilder(link, p.gs.Adapter())
|
||||
}
|
||||
|
||||
func (p *siteDrivePager) ValidModTimes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// drive pager
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@ -3,6 +3,7 @@ package api
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
@ -141,6 +142,10 @@ func (p *eventsPageCtrl) SetNextLink(nextLink string) {
|
||||
p.builder = users.NewItemCalendarsItemEventsRequestBuilder(nextLink, p.gs.Adapter())
|
||||
}
|
||||
|
||||
func (p *eventsPageCtrl) ValidModTimes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c Events) GetItemsInContainerByCollisionKey(
|
||||
ctx context.Context,
|
||||
userID, containerID string,
|
||||
@ -248,12 +253,16 @@ func (p *eventDeltaPager) Reset(ctx context.Context) {
|
||||
p.builder = getEventDeltaBuilder(ctx, p.gs, p.userID, p.containerID)
|
||||
}
|
||||
|
||||
func (p *eventDeltaPager) ValidModTimes() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c Events) GetAddedAndRemovedItemIDs(
|
||||
ctx context.Context,
|
||||
userID, containerID, prevDeltaLink string,
|
||||
immutableIDs bool,
|
||||
canMakeDeltaQueries bool,
|
||||
) ([]string, []string, DeltaUpdate, error) {
|
||||
) (map[string]time.Time, bool, []string, DeltaUpdate, error) {
|
||||
ctx = clues.Add(
|
||||
ctx,
|
||||
"data_category", path.EventsCategory,
|
||||
@ -270,7 +279,7 @@ func (c Events) GetAddedAndRemovedItemIDs(
|
||||
userID,
|
||||
containerID,
|
||||
immutableIDs,
|
||||
idAnd()...)
|
||||
idAnd(lastModifiedDateTime)...)
|
||||
|
||||
return getAddedAndRemovedItemIDs[models.Eventable](
|
||||
ctx,
|
||||
@ -278,5 +287,5 @@ func (c Events) GetAddedAndRemovedItemIDs(
|
||||
deltaPager,
|
||||
prevDeltaLink,
|
||||
canMakeDeltaQueries,
|
||||
addedAndRemovedByAddtlData)
|
||||
addedAndRemovedByAddtlData[models.Eventable])
|
||||
}
|
||||
|
||||
@ -52,6 +52,10 @@ type Resetter interface {
|
||||
Reset(context.Context)
|
||||
}
|
||||
|
||||
type ValidModTimer interface {
|
||||
ValidModTimes() bool
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// common funcs
|
||||
// ---------------------------------------------------------------------------
|
||||
@ -76,6 +80,7 @@ func NextAndDeltaLink(pl DeltaLinker) (string, string) {
|
||||
type Pager[T any] interface {
|
||||
GetPager[NextLinkValuer[T]]
|
||||
SetNextLinker
|
||||
ValidModTimer
|
||||
}
|
||||
|
||||
func enumerateItems[T any](
|
||||
@ -114,6 +119,7 @@ type DeltaPager[T any] interface {
|
||||
GetPager[DeltaLinkValuer[T]]
|
||||
Resetter
|
||||
SetNextLinker
|
||||
ValidModTimer
|
||||
}
|
||||
|
||||
func deltaEnumerateItems[T any](
|
||||
@ -173,7 +179,7 @@ func deltaEnumerateItems[T any](
|
||||
// shared enumeration runner funcs
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type addedAndRemovedHandler[T any] func(items []T) ([]string, []string, error)
|
||||
type addedAndRemovedHandler[T any] func(items []T) (map[string]time.Time, []string, error)
|
||||
|
||||
func getAddedAndRemovedItemIDs[T any](
|
||||
ctx context.Context,
|
||||
@ -182,16 +188,16 @@ func getAddedAndRemovedItemIDs[T any](
|
||||
prevDeltaLink string,
|
||||
canMakeDeltaQueries bool,
|
||||
aarh addedAndRemovedHandler[T],
|
||||
) ([]string, []string, DeltaUpdate, error) {
|
||||
) (map[string]time.Time, bool, []string, DeltaUpdate, error) {
|
||||
if canMakeDeltaQueries {
|
||||
ts, du, err := deltaEnumerateItems[T](ctx, deltaPager, prevDeltaLink)
|
||||
if err != nil && (!graph.IsErrInvalidDelta(err) || len(prevDeltaLink) == 0) {
|
||||
return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err)
|
||||
return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err)
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
a, r, err := aarh(ts)
|
||||
return a, r, du, graph.Stack(ctx, err).OrNil()
|
||||
return a, deltaPager.ValidModTimes(), r, du, graph.Stack(ctx, err).OrNil()
|
||||
}
|
||||
}
|
||||
|
||||
@ -199,12 +205,12 @@ func getAddedAndRemovedItemIDs[T any](
|
||||
|
||||
ts, err := enumerateItems(ctx, pager)
|
||||
if err != nil {
|
||||
return nil, nil, DeltaUpdate{}, graph.Stack(ctx, err)
|
||||
return nil, false, nil, DeltaUpdate{}, graph.Stack(ctx, err)
|
||||
}
|
||||
|
||||
a, r, err := aarh(ts)
|
||||
|
||||
return a, r, du, graph.Stack(ctx, err).OrNil()
|
||||
return a, pager.ValidModTimes(), r, du, graph.Stack(ctx, err).OrNil()
|
||||
}
|
||||
|
||||
type getIDer interface {
|
||||
@ -218,8 +224,15 @@ type getIDAndAddtler interface {
|
||||
GetAdditionalData() map[string]any
|
||||
}
|
||||
|
||||
func addedAndRemovedByAddtlData[T any](items []T) ([]string, []string, error) {
|
||||
added, removed := []string{}, []string{}
|
||||
type getModTimer interface {
|
||||
GetLastModifiedDateTime() *time.Time
|
||||
}
|
||||
|
||||
func addedAndRemovedByAddtlData[T any](
|
||||
items []T,
|
||||
) (map[string]time.Time, []string, error) {
|
||||
added := map[string]time.Time{}
|
||||
removed := []string{}
|
||||
|
||||
for _, item := range items {
|
||||
giaa, ok := any(item).(getIDAndAddtler)
|
||||
@ -232,7 +245,13 @@ func addedAndRemovedByAddtlData[T any](items []T) ([]string, []string, error) {
|
||||
// be 'changed' or 'deleted'. We don't really care about the cause: both
|
||||
// cases are handled the same way in storage.
|
||||
if giaa.GetAdditionalData()[graph.AddtlDataRemoved] == nil {
|
||||
added = append(added, ptr.Val(giaa.GetId()))
|
||||
var modTime time.Time
|
||||
|
||||
if mt, ok := giaa.(getModTimer); ok {
|
||||
modTime = ptr.Val(mt.GetLastModifiedDateTime())
|
||||
}
|
||||
|
||||
added[ptr.Val(giaa.GetId())] = modTime
|
||||
} else {
|
||||
removed = append(removed, ptr.Val(giaa.GetId()))
|
||||
}
|
||||
@ -248,8 +267,11 @@ type getIDAndDeletedDateTimer interface {
|
||||
GetDeletedDateTime() *time.Time
|
||||
}
|
||||
|
||||
func addedAndRemovedByDeletedDateTime[T any](items []T) ([]string, []string, error) {
|
||||
added, removed := []string{}, []string{}
|
||||
func addedAndRemovedByDeletedDateTime[T any](
|
||||
items []T,
|
||||
) (map[string]time.Time, []string, error) {
|
||||
added := map[string]time.Time{}
|
||||
removed := []string{}
|
||||
|
||||
for _, item := range items {
|
||||
giaddt, ok := any(item).(getIDAndDeletedDateTimer)
|
||||
@ -259,7 +281,13 @@ func addedAndRemovedByDeletedDateTime[T any](items []T) ([]string, []string, err
|
||||
}
|
||||
|
||||
if giaddt.GetDeletedDateTime() == nil {
|
||||
added = append(added, ptr.Val(giaddt.GetId()))
|
||||
var modTime time.Time
|
||||
|
||||
if mt, ok := giaddt.(getModTimer); ok {
|
||||
modTime = ptr.Val(mt.GetLastModifiedDateTime())
|
||||
}
|
||||
|
||||
added[ptr.Val(giaddt.GetId())] = modTime
|
||||
} else {
|
||||
removed = append(removed, ptr.Val(giaddt.GetId()))
|
||||
}
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
@ -76,16 +77,19 @@ func (p *testPager) GetPage(ctx context.Context) (NextLinkValuer[any], error) {
|
||||
|
||||
func (p *testPager) SetNextLink(nextLink string) {}
|
||||
|
||||
func (p testPager) ValidModTimes() bool { return true }
|
||||
|
||||
// mock id pager
|
||||
|
||||
var _ Pager[any] = &testIDsPager{}
|
||||
|
||||
type testIDsPager struct {
|
||||
t *testing.T
|
||||
added []string
|
||||
added map[string]time.Time
|
||||
removed []string
|
||||
errorCode string
|
||||
needsReset bool
|
||||
validModTimes bool
|
||||
}
|
||||
|
||||
func (p *testIDsPager) GetPage(
|
||||
@ -103,10 +107,11 @@ func (p *testIDsPager) GetPage(
|
||||
|
||||
values := make([]any, 0, len(p.added)+len(p.removed))
|
||||
|
||||
for _, a := range p.added {
|
||||
for a, modTime := range p.added {
|
||||
// contact chosen arbitrarily, any exchange model should work
|
||||
itm := models.NewContact()
|
||||
itm.SetId(ptr.To(a))
|
||||
itm.SetLastModifiedDateTime(ptr.To(modTime))
|
||||
values = append(values, itm)
|
||||
}
|
||||
|
||||
@ -132,14 +137,19 @@ func (p *testIDsPager) Reset(context.Context) {
|
||||
p.errorCode = ""
|
||||
}
|
||||
|
||||
func (p testIDsPager) ValidModTimes() bool {
|
||||
return p.validModTimes
|
||||
}
|
||||
|
||||
var _ DeltaPager[any] = &testIDsDeltaPager{}
|
||||
|
||||
type testIDsDeltaPager struct {
|
||||
t *testing.T
|
||||
added []string
|
||||
added map[string]time.Time
|
||||
removed []string
|
||||
errorCode string
|
||||
needsReset bool
|
||||
validModTimes bool
|
||||
}
|
||||
|
||||
func (p *testIDsDeltaPager) GetPage(
|
||||
@ -157,10 +167,11 @@ func (p *testIDsDeltaPager) GetPage(
|
||||
|
||||
values := make([]any, 0, len(p.added)+len(p.removed))
|
||||
|
||||
for _, a := range p.added {
|
||||
for a, modTime := range p.added {
|
||||
// contact chosen arbitrarily, any exchange model should work
|
||||
itm := models.NewContact()
|
||||
itm.SetId(ptr.To(a))
|
||||
itm.SetLastModifiedDateTime(ptr.To(modTime))
|
||||
values = append(values, itm)
|
||||
}
|
||||
|
||||
@ -186,6 +197,10 @@ func (p *testIDsDeltaPager) Reset(context.Context) {
|
||||
p.errorCode = ""
|
||||
}
|
||||
|
||||
func (p testIDsDeltaPager) ValidModTimes() bool {
|
||||
return p.validModTimes
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
@ -252,11 +267,14 @@ func (suite *PagerUnitSuite) TestEnumerateItems() {
|
||||
|
||||
func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
|
||||
type expected struct {
|
||||
added []string
|
||||
added map[string]time.Time
|
||||
removed []string
|
||||
deltaUpdate DeltaUpdate
|
||||
validModTimes bool
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
pagerGetter func(
|
||||
@ -268,6 +286,7 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
|
||||
prevDelta string
|
||||
expect expected
|
||||
canDelta bool
|
||||
validModTimes bool
|
||||
}{
|
||||
{
|
||||
name: "no prev delta",
|
||||
@ -277,12 +296,45 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
|
||||
deltaPagerGetter: func(t *testing.T) DeltaPager[any] {
|
||||
return &testIDsDeltaPager{
|
||||
t: t,
|
||||
added: []string{"uno", "dos"},
|
||||
added: map[string]time.Time{
|
||||
"uno": now.Add(time.Minute),
|
||||
"dos": now.Add(2 * time.Minute),
|
||||
},
|
||||
removed: []string{"tres", "quatro"},
|
||||
validModTimes: true,
|
||||
}
|
||||
},
|
||||
expect: expected{
|
||||
added: map[string]time.Time{
|
||||
"uno": now.Add(time.Minute),
|
||||
"dos": now.Add(2 * time.Minute),
|
||||
},
|
||||
removed: []string{"tres", "quatro"},
|
||||
deltaUpdate: DeltaUpdate{Reset: true},
|
||||
validModTimes: true,
|
||||
},
|
||||
canDelta: true,
|
||||
},
|
||||
{
|
||||
name: "no prev delta invalid mod times",
|
||||
pagerGetter: func(t *testing.T) Pager[any] {
|
||||
return nil
|
||||
},
|
||||
deltaPagerGetter: func(t *testing.T) DeltaPager[any] {
|
||||
return &testIDsDeltaPager{
|
||||
t: t,
|
||||
added: map[string]time.Time{
|
||||
"uno": {},
|
||||
"dos": {},
|
||||
},
|
||||
removed: []string{"tres", "quatro"},
|
||||
}
|
||||
},
|
||||
expect: expected{
|
||||
added: []string{"uno", "dos"},
|
||||
added: map[string]time.Time{
|
||||
"uno": {},
|
||||
"dos": {},
|
||||
},
|
||||
removed: []string{"tres", "quatro"},
|
||||
deltaUpdate: DeltaUpdate{Reset: true},
|
||||
},
|
||||
@ -296,15 +348,23 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
|
||||
deltaPagerGetter: func(t *testing.T) DeltaPager[any] {
|
||||
return &testIDsDeltaPager{
|
||||
t: t,
|
||||
added: []string{"uno", "dos"},
|
||||
added: map[string]time.Time{
|
||||
"uno": now.Add(time.Minute),
|
||||
"dos": now.Add(2 * time.Minute),
|
||||
},
|
||||
removed: []string{"tres", "quatro"},
|
||||
validModTimes: true,
|
||||
}
|
||||
},
|
||||
prevDelta: "delta",
|
||||
expect: expected{
|
||||
added: []string{"uno", "dos"},
|
||||
added: map[string]time.Time{
|
||||
"uno": now.Add(time.Minute),
|
||||
"dos": now.Add(2 * time.Minute),
|
||||
},
|
||||
removed: []string{"tres", "quatro"},
|
||||
deltaUpdate: DeltaUpdate{Reset: false},
|
||||
validModTimes: true,
|
||||
},
|
||||
canDelta: true,
|
||||
},
|
||||
@ -316,17 +376,25 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
|
||||
deltaPagerGetter: func(t *testing.T) DeltaPager[any] {
|
||||
return &testIDsDeltaPager{
|
||||
t: t,
|
||||
added: []string{"uno", "dos"},
|
||||
added: map[string]time.Time{
|
||||
"uno": now.Add(time.Minute),
|
||||
"dos": now.Add(2 * time.Minute),
|
||||
},
|
||||
removed: []string{"tres", "quatro"},
|
||||
errorCode: "SyncStateNotFound",
|
||||
needsReset: true,
|
||||
validModTimes: true,
|
||||
}
|
||||
},
|
||||
prevDelta: "delta",
|
||||
expect: expected{
|
||||
added: []string{"uno", "dos"},
|
||||
added: map[string]time.Time{
|
||||
"uno": now.Add(time.Minute),
|
||||
"dos": now.Add(2 * time.Minute),
|
||||
},
|
||||
removed: []string{"tres", "quatro"},
|
||||
deltaUpdate: DeltaUpdate{Reset: true},
|
||||
validModTimes: true,
|
||||
},
|
||||
canDelta: true,
|
||||
},
|
||||
@ -335,17 +403,25 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
|
||||
pagerGetter: func(t *testing.T) Pager[any] {
|
||||
return &testIDsPager{
|
||||
t: t,
|
||||
added: []string{"uno", "dos"},
|
||||
added: map[string]time.Time{
|
||||
"uno": now.Add(time.Minute),
|
||||
"dos": now.Add(2 * time.Minute),
|
||||
},
|
||||
removed: []string{"tres", "quatro"},
|
||||
validModTimes: true,
|
||||
}
|
||||
},
|
||||
deltaPagerGetter: func(t *testing.T) DeltaPager[any] {
|
||||
return nil
|
||||
},
|
||||
expect: expected{
|
||||
added: []string{"uno", "dos"},
|
||||
added: map[string]time.Time{
|
||||
"uno": now.Add(time.Minute),
|
||||
"dos": now.Add(2 * time.Minute),
|
||||
},
|
||||
removed: []string{"tres", "quatro"},
|
||||
deltaUpdate: DeltaUpdate{Reset: true},
|
||||
validModTimes: true,
|
||||
},
|
||||
canDelta: false,
|
||||
},
|
||||
@ -358,18 +434,19 @@ func (suite *PagerUnitSuite) TestGetAddedAndRemovedItemIDs() {
|
||||
ctx, flush := tester.NewContext(t)
|
||||
defer flush()
|
||||
|
||||
added, removed, deltaUpdate, err := getAddedAndRemovedItemIDs[any](
|
||||
added, validModTimes, removed, deltaUpdate, err := getAddedAndRemovedItemIDs[any](
|
||||
ctx,
|
||||
test.pagerGetter(t),
|
||||
test.deltaPagerGetter(t),
|
||||
test.prevDelta,
|
||||
test.canDelta,
|
||||
addedAndRemovedByAddtlData)
|
||||
addedAndRemovedByAddtlData[any])
|
||||
|
||||
require.NoErrorf(t, err, "getting added and removed item IDs: %+v", clues.ToCore(err))
|
||||
require.EqualValues(t, test.expect.added, added, "added item IDs")
|
||||
require.EqualValues(t, test.expect.removed, removed, "removed item IDs")
|
||||
require.Equal(t, test.expect.deltaUpdate, deltaUpdate, "delta update")
|
||||
assert.Equal(t, test.expect.added, added, "added item IDs and mod times")
|
||||
assert.Equal(t, test.expect.validModTimes, validModTimes, "valid mod times")
|
||||
assert.EqualValues(t, test.expect.removed, removed, "removed item IDs")
|
||||
assert.Equal(t, test.expect.deltaUpdate, deltaUpdate, "delta update")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ package api
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
@ -166,6 +167,10 @@ func (p *mailsPageCtrl) SetNextLink(nextLink string) {
|
||||
p.builder = users.NewItemMailFoldersItemMessagesRequestBuilder(nextLink, p.gs.Adapter())
|
||||
}
|
||||
|
||||
func (p *mailsPageCtrl) ValidModTimes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c Mail) GetItemsInContainerByCollisionKey(
|
||||
ctx context.Context,
|
||||
userID, containerID string,
|
||||
@ -279,12 +284,16 @@ func (p *mailDeltaPager) Reset(ctx context.Context) {
|
||||
p.builder = getMailDeltaBuilder(ctx, p.gs, p.userID, p.containerID)
|
||||
}
|
||||
|
||||
func (p *mailDeltaPager) ValidModTimes() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c Mail) GetAddedAndRemovedItemIDs(
|
||||
ctx context.Context,
|
||||
userID, containerID, prevDeltaLink string,
|
||||
immutableIDs bool,
|
||||
canMakeDeltaQueries bool,
|
||||
) ([]string, []string, DeltaUpdate, error) {
|
||||
) (map[string]time.Time, bool, []string, DeltaUpdate, error) {
|
||||
ctx = clues.Add(
|
||||
ctx,
|
||||
"data_category", path.EmailCategory,
|
||||
@ -296,12 +305,12 @@ func (c Mail) GetAddedAndRemovedItemIDs(
|
||||
containerID,
|
||||
prevDeltaLink,
|
||||
immutableIDs,
|
||||
idAnd()...)
|
||||
idAnd(lastModifiedDateTime)...)
|
||||
pager := c.NewMailPager(
|
||||
userID,
|
||||
containerID,
|
||||
immutableIDs,
|
||||
idAnd()...)
|
||||
idAnd(lastModifiedDateTime)...)
|
||||
|
||||
return getAddedAndRemovedItemIDs[models.Messageable](
|
||||
ctx,
|
||||
@ -309,5 +318,5 @@ func (c Mail) GetAddedAndRemovedItemIDs(
|
||||
deltaPager,
|
||||
prevDeltaLink,
|
||||
canMakeDeltaQueries,
|
||||
addedAndRemovedByAddtlData)
|
||||
addedAndRemovedByAddtlData[models.Messageable])
|
||||
}
|
||||
|
||||
@ -8,6 +8,11 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||
)
|
||||
|
||||
var (
|
||||
_ api.Pager[any] = &Pager[any]{}
|
||||
_ api.DeltaPager[any] = &DeltaPager[any]{}
|
||||
)
|
||||
|
||||
type DeltaNextLinkValues[T any] struct {
|
||||
Next *string
|
||||
Delta *string
|
||||
@ -62,6 +67,7 @@ func (p *Pager[T]) GetPage(
|
||||
}
|
||||
|
||||
func (p *Pager[T]) SetNextLink(string) {}
|
||||
func (p *Pager[T]) ValidModTimes() bool { return true }
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// delta pager
|
||||
@ -94,3 +100,4 @@ func (p *DeltaPager[T]) GetPage(
|
||||
|
||||
func (p *DeltaPager[T]) SetNextLink(string) {}
|
||||
func (p *DeltaPager[T]) Reset(context.Context) {}
|
||||
func (p *DeltaPager[T]) ValidModTimes() bool { return true }
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user