Pass in prev delta to collectItems (#2371)
## Description Pass the previous delta url fetched from metadata to collectItems and make sure we are using that when fetching the pages. ## Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [ ] 🕐 Yes, but in a later PR - [x] ⛔ No ## Type of change <!--- Please check the type of change your PR introduces: ---> - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Test - [ ] 💻 CI/Deployment - [ ] 🧹 Tech Debt/Cleanup ## Issue(s) <!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. --> * https://github.com/alcionai/corso/issues/2123 ## Test Plan <!-- How will this be tested prior to merging.--> - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
c4b2de5c6c
commit
ac8fe1e9c1
@ -251,7 +251,7 @@ func (c *Collections) Get(
|
|||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
prevMetadata []data.Collection,
|
prevMetadata []data.Collection,
|
||||||
) ([]data.Collection, map[string]struct{}, error) {
|
) ([]data.Collection, map[string]struct{}, error) {
|
||||||
_, _, err := deserializeMetadata(ctx, prevMetadata)
|
prevDeltas, _, err := deserializeMetadata(ctx, prevMetadata)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -287,6 +287,8 @@ func (c *Collections) Get(
|
|||||||
driveID := *d.GetId()
|
driveID := *d.GetId()
|
||||||
driveName := *d.GetName()
|
driveName := *d.GetName()
|
||||||
|
|
||||||
|
prevDelta := prevDeltas[driveID]
|
||||||
|
|
||||||
delta, paths, excluded, err := collectItems(
|
delta, paths, excluded, err := collectItems(
|
||||||
ctx,
|
ctx,
|
||||||
c.itemPagerFunc(
|
c.itemPagerFunc(
|
||||||
@ -297,6 +299,7 @@ func (c *Collections) Get(
|
|||||||
driveID,
|
driveID,
|
||||||
driveName,
|
driveName,
|
||||||
c.UpdateCollections,
|
c.UpdateCollections,
|
||||||
|
prevDelta,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
@ -307,8 +310,8 @@ func (c *Collections) Get(
|
|||||||
// remove entries for which there is no corresponding delta token/folder. If
|
// remove entries for which there is no corresponding delta token/folder. If
|
||||||
// we leave empty delta tokens then we may end up setting the State field
|
// we leave empty delta tokens then we may end up setting the State field
|
||||||
// for collections when not actually getting delta results.
|
// for collections when not actually getting delta results.
|
||||||
if len(delta) > 0 {
|
if len(delta.URL) > 0 {
|
||||||
deltaURLs[driveID] = delta
|
deltaURLs[driveID] = delta.URL
|
||||||
}
|
}
|
||||||
|
|
||||||
// Avoid the edge case where there's no paths but we do have a valid delta
|
// Avoid the edge case where there's no paths but we do have a valid delta
|
||||||
|
|||||||
@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||||
|
"github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
@ -1482,3 +1483,94 @@ func delItem(
|
|||||||
|
|
||||||
return item
|
return item
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *OneDriveCollectionsSuite) TestCollectItems() {
|
||||||
|
next := "next"
|
||||||
|
delta := "delta"
|
||||||
|
|
||||||
|
syncStateNotFound := "SyncStateNotFound" // TODO(meain): export graph.errCodeSyncStateNotFound
|
||||||
|
me := odataerrors.NewMainError()
|
||||||
|
me.SetCode(&syncStateNotFound)
|
||||||
|
|
||||||
|
deltaError := odataerrors.NewODataError()
|
||||||
|
deltaError.SetError(me)
|
||||||
|
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
items []deltaPagerResult
|
||||||
|
deltaURL string
|
||||||
|
prevDelta string
|
||||||
|
prevDeltaSuccess bool
|
||||||
|
err error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "delta on first run",
|
||||||
|
deltaURL: delta,
|
||||||
|
items: []deltaPagerResult{
|
||||||
|
{deltaLink: &delta},
|
||||||
|
},
|
||||||
|
prevDeltaSuccess: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "next then delta",
|
||||||
|
deltaURL: delta,
|
||||||
|
items: []deltaPagerResult{
|
||||||
|
{nextLink: &next},
|
||||||
|
{deltaLink: &delta},
|
||||||
|
},
|
||||||
|
prevDeltaSuccess: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid prev delta",
|
||||||
|
deltaURL: delta,
|
||||||
|
items: []deltaPagerResult{
|
||||||
|
{nextLink: &next, err: deltaError},
|
||||||
|
{deltaLink: &delta}, // works on retry
|
||||||
|
},
|
||||||
|
prevDeltaSuccess: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "fail a normal delta query",
|
||||||
|
items: []deltaPagerResult{
|
||||||
|
{nextLink: &next},
|
||||||
|
{nextLink: &next, err: assert.AnError},
|
||||||
|
},
|
||||||
|
prevDeltaSuccess: true,
|
||||||
|
err: assert.AnError,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range table {
|
||||||
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
|
ctx, flush := tester.NewContext()
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
itemPager := &mockItemPager{
|
||||||
|
toReturn: test.items,
|
||||||
|
}
|
||||||
|
|
||||||
|
collectorFunc := func(
|
||||||
|
ctx context.Context,
|
||||||
|
driveID, driveName string,
|
||||||
|
driveItems []models.DriveItemable,
|
||||||
|
oldPaths map[string]string,
|
||||||
|
newPaths map[string]string,
|
||||||
|
excluded map[string]struct{},
|
||||||
|
) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
delta, _, _, err := collectItems(
|
||||||
|
ctx,
|
||||||
|
itemPager,
|
||||||
|
"",
|
||||||
|
"General",
|
||||||
|
collectorFunc,
|
||||||
|
"",
|
||||||
|
)
|
||||||
|
|
||||||
|
require.ErrorIs(suite.T(), err, test.err)
|
||||||
|
require.Equal(suite.T(), test.deltaURL, delta.URL)
|
||||||
|
require.Equal(suite.T(), !test.prevDeltaSuccess, delta.Reset)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -35,6 +35,17 @@ const (
|
|||||||
contextDeadlineExceeded = "context deadline exceeded"
|
contextDeadlineExceeded = "context deadline exceeded"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DeltaUpdate holds the results of a current delta token. It normally
|
||||||
|
// gets produced when aggregating the addition and removal of items in
|
||||||
|
// a delta-queriable folder.
|
||||||
|
// FIXME: This is same as exchange.api.DeltaUpdate
|
||||||
|
type DeltaUpdate struct {
|
||||||
|
// the deltaLink itself
|
||||||
|
URL string
|
||||||
|
// true if the old delta was marked as invalid
|
||||||
|
Reset bool
|
||||||
|
}
|
||||||
|
|
||||||
type drivePager interface {
|
type drivePager interface {
|
||||||
GetPage(context.Context) (gapi.PageLinker, error)
|
GetPage(context.Context) (gapi.PageLinker, error)
|
||||||
SetNext(nextLink string)
|
SetNext(nextLink string)
|
||||||
@ -172,22 +183,41 @@ func collectItems(
|
|||||||
pager itemPager,
|
pager itemPager,
|
||||||
driveID, driveName string,
|
driveID, driveName string,
|
||||||
collector itemCollector,
|
collector itemCollector,
|
||||||
) (string, map[string]string, map[string]struct{}, error) {
|
prevDelta string,
|
||||||
|
) (DeltaUpdate, map[string]string, map[string]struct{}, error) {
|
||||||
var (
|
var (
|
||||||
newDeltaURL = ""
|
newDeltaURL = ""
|
||||||
// TODO(ashmrtn): Eventually this should probably be a parameter so we can
|
// TODO(ashmrtn): Eventually this should probably be a parameter so we can
|
||||||
// take in previous paths.
|
// take in previous paths.
|
||||||
oldPaths = map[string]string{}
|
oldPaths = map[string]string{}
|
||||||
newPaths = map[string]string{}
|
newPaths = map[string]string{}
|
||||||
excluded = map[string]struct{}{}
|
excluded = map[string]struct{}{}
|
||||||
|
invalidPrevDelta = false
|
||||||
|
triedPrevDelta = false
|
||||||
)
|
)
|
||||||
|
|
||||||
maps.Copy(newPaths, oldPaths)
|
maps.Copy(newPaths, oldPaths)
|
||||||
|
|
||||||
|
if len(prevDelta) != 0 {
|
||||||
|
pager.SetNext(prevDelta)
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
page, err := pager.GetPage(ctx)
|
page, err := pager.GetPage(ctx)
|
||||||
|
|
||||||
|
if !triedPrevDelta && graph.IsErrInvalidDelta(err) {
|
||||||
|
logger.Ctx(ctx).Infow("Invalid previous delta link", "link", prevDelta)
|
||||||
|
|
||||||
|
triedPrevDelta = true // TODO(meain): Do we need this check?
|
||||||
|
invalidPrevDelta = true
|
||||||
|
|
||||||
|
pager.SetNext("")
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, nil, errors.Wrapf(
|
return DeltaUpdate{}, nil, nil, errors.Wrapf(
|
||||||
err,
|
err,
|
||||||
"failed to query drive items. details: %s",
|
"failed to query drive items. details: %s",
|
||||||
support.ConnectorStackErrorTrace(err),
|
support.ConnectorStackErrorTrace(err),
|
||||||
@ -196,12 +226,12 @@ func collectItems(
|
|||||||
|
|
||||||
vals, err := pager.ValuesIn(page)
|
vals, err := pager.ValuesIn(page)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, nil, errors.Wrap(err, "extracting items from response")
|
return DeltaUpdate{}, nil, nil, errors.Wrap(err, "extracting items from response")
|
||||||
}
|
}
|
||||||
|
|
||||||
err = collector(ctx, driveID, driveName, vals, oldPaths, newPaths, excluded)
|
err = collector(ctx, driveID, driveName, vals, oldPaths, newPaths, excluded)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, nil, err
|
return DeltaUpdate{}, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
nextLink, deltaLink := gapi.NextAndDeltaLink(page)
|
nextLink, deltaLink := gapi.NextAndDeltaLink(page)
|
||||||
@ -219,7 +249,7 @@ func collectItems(
|
|||||||
pager.SetNext(nextLink)
|
pager.SetNext(nextLink)
|
||||||
}
|
}
|
||||||
|
|
||||||
return newDeltaURL, newPaths, excluded, nil
|
return DeltaUpdate{URL: newDeltaURL, Reset: invalidPrevDelta}, newPaths, excluded, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getFolder will lookup the specified folder name under `parentFolderID`
|
// getFolder will lookup the specified folder name under `parentFolderID`
|
||||||
@ -379,6 +409,7 @@ func GetAllFolders(
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
"",
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "getting items for drive %s", *d.GetName())
|
return nil, errors.Wrapf(err, "getting items for drive %s", *d.GetName())
|
||||||
|
|||||||
@ -126,6 +126,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() {
|
|||||||
suite.userDriveID,
|
suite.userDriveID,
|
||||||
"General",
|
"General",
|
||||||
itemCollector,
|
itemCollector,
|
||||||
|
"",
|
||||||
)
|
)
|
||||||
require.NoError(suite.T(), err)
|
require.NoError(suite.T(), err)
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user