From ac8fe1e9c142538856e18e1ba03bfab97717bff6 Mon Sep 17 00:00:00 2001 From: Abin Simon Date: Sun, 5 Feb 2023 17:12:48 +0530 Subject: [PATCH] Pass in prev delta to collectItems (#2371) ## Description Pass the previous delta url fetched from metadata to collectItems and make sure we are using that when fetching the pages. ## Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No ## Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Test - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup ## Issue(s) * https://github.com/alcionai/corso/issues/2123 ## Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- .../connector/onedrive/collections.go | 9 +- .../connector/onedrive/collections_test.go | 92 +++++++++++++++++++ src/internal/connector/onedrive/drive.go | 47 ++++++++-- src/internal/connector/onedrive/item_test.go | 1 + 4 files changed, 138 insertions(+), 11 deletions(-) diff --git a/src/internal/connector/onedrive/collections.go b/src/internal/connector/onedrive/collections.go index 50c5323d9..a5adb9d34 100644 --- a/src/internal/connector/onedrive/collections.go +++ b/src/internal/connector/onedrive/collections.go @@ -251,7 +251,7 @@ func (c *Collections) Get( ctx context.Context, prevMetadata []data.Collection, ) ([]data.Collection, map[string]struct{}, error) { - _, _, err := deserializeMetadata(ctx, prevMetadata) + prevDeltas, _, err := deserializeMetadata(ctx, prevMetadata) if err != nil { return nil, nil, err } @@ -287,6 +287,8 @@ func (c *Collections) Get( driveID := *d.GetId() driveName := *d.GetName() + prevDelta := prevDeltas[driveID] + delta, paths, excluded, err := collectItems( ctx, c.itemPagerFunc( @@ -297,6 +299,7 @@ func (c *Collections) Get( driveID, driveName, c.UpdateCollections, + prevDelta, ) if err != nil { return nil, nil, err @@ -307,8 +310,8 @@ func (c *Collections) Get( // remove entries for which there is no corresponding delta token/folder. If // we leave empty delta tokens then we may end up setting the State field // for collections when not actually getting delta results. - if len(delta) > 0 { - deltaURLs[driveID] = delta + if len(delta.URL) > 0 { + deltaURLs[driveID] = delta.URL } // Avoid the edge case where there's no paths but we do have a valid delta diff --git a/src/internal/connector/onedrive/collections_test.go b/src/internal/connector/onedrive/collections_test.go index f784bad62..7b81a5b74 100644 --- a/src/internal/connector/onedrive/collections_test.go +++ b/src/internal/connector/onedrive/collections_test.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/microsoftgraph/msgraph-sdk-go/models/odataerrors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -1482,3 +1483,94 @@ func delItem( return item } + +func (suite *OneDriveCollectionsSuite) TestCollectItems() { + next := "next" + delta := "delta" + + syncStateNotFound := "SyncStateNotFound" // TODO(meain): export graph.errCodeSyncStateNotFound + me := odataerrors.NewMainError() + me.SetCode(&syncStateNotFound) + + deltaError := odataerrors.NewODataError() + deltaError.SetError(me) + + table := []struct { + name string + items []deltaPagerResult + deltaURL string + prevDelta string + prevDeltaSuccess bool + err error + }{ + { + name: "delta on first run", + deltaURL: delta, + items: []deltaPagerResult{ + {deltaLink: &delta}, + }, + prevDeltaSuccess: true, + }, + { + name: "next then delta", + deltaURL: delta, + items: []deltaPagerResult{ + {nextLink: &next}, + {deltaLink: &delta}, + }, + prevDeltaSuccess: true, + }, + { + name: "invalid prev delta", + deltaURL: delta, + items: []deltaPagerResult{ + {nextLink: &next, err: deltaError}, + {deltaLink: &delta}, // works on retry + }, + prevDeltaSuccess: false, + }, + { + name: "fail a normal delta query", + items: []deltaPagerResult{ + {nextLink: &next}, + {nextLink: &next, err: assert.AnError}, + }, + prevDeltaSuccess: true, + err: assert.AnError, + }, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + ctx, flush := tester.NewContext() + defer flush() + + itemPager := &mockItemPager{ + toReturn: test.items, + } + + collectorFunc := func( + ctx context.Context, + driveID, driveName string, + driveItems []models.DriveItemable, + oldPaths map[string]string, + newPaths map[string]string, + excluded map[string]struct{}, + ) error { + return nil + } + + delta, _, _, err := collectItems( + ctx, + itemPager, + "", + "General", + collectorFunc, + "", + ) + + require.ErrorIs(suite.T(), err, test.err) + require.Equal(suite.T(), test.deltaURL, delta.URL) + require.Equal(suite.T(), !test.prevDeltaSuccess, delta.Reset) + }) + } +} diff --git a/src/internal/connector/onedrive/drive.go b/src/internal/connector/onedrive/drive.go index ebcbe8b6f..b06184884 100644 --- a/src/internal/connector/onedrive/drive.go +++ b/src/internal/connector/onedrive/drive.go @@ -35,6 +35,17 @@ const ( contextDeadlineExceeded = "context deadline exceeded" ) +// DeltaUpdate holds the results of a current delta token. It normally +// gets produced when aggregating the addition and removal of items in +// a delta-queriable folder. +// FIXME: This is same as exchange.api.DeltaUpdate +type DeltaUpdate struct { + // the deltaLink itself + URL string + // true if the old delta was marked as invalid + Reset bool +} + type drivePager interface { GetPage(context.Context) (gapi.PageLinker, error) SetNext(nextLink string) @@ -172,22 +183,41 @@ func collectItems( pager itemPager, driveID, driveName string, collector itemCollector, -) (string, map[string]string, map[string]struct{}, error) { + prevDelta string, +) (DeltaUpdate, map[string]string, map[string]struct{}, error) { var ( newDeltaURL = "" // TODO(ashmrtn): Eventually this should probably be a parameter so we can // take in previous paths. - oldPaths = map[string]string{} - newPaths = map[string]string{} - excluded = map[string]struct{}{} + oldPaths = map[string]string{} + newPaths = map[string]string{} + excluded = map[string]struct{}{} + invalidPrevDelta = false + triedPrevDelta = false ) maps.Copy(newPaths, oldPaths) + if len(prevDelta) != 0 { + pager.SetNext(prevDelta) + } + for { page, err := pager.GetPage(ctx) + + if !triedPrevDelta && graph.IsErrInvalidDelta(err) { + logger.Ctx(ctx).Infow("Invalid previous delta link", "link", prevDelta) + + triedPrevDelta = true // TODO(meain): Do we need this check? + invalidPrevDelta = true + + pager.SetNext("") + + continue + } + if err != nil { - return "", nil, nil, errors.Wrapf( + return DeltaUpdate{}, nil, nil, errors.Wrapf( err, "failed to query drive items. details: %s", support.ConnectorStackErrorTrace(err), @@ -196,12 +226,12 @@ func collectItems( vals, err := pager.ValuesIn(page) if err != nil { - return "", nil, nil, errors.Wrap(err, "extracting items from response") + return DeltaUpdate{}, nil, nil, errors.Wrap(err, "extracting items from response") } err = collector(ctx, driveID, driveName, vals, oldPaths, newPaths, excluded) if err != nil { - return "", nil, nil, err + return DeltaUpdate{}, nil, nil, err } nextLink, deltaLink := gapi.NextAndDeltaLink(page) @@ -219,7 +249,7 @@ func collectItems( pager.SetNext(nextLink) } - return newDeltaURL, newPaths, excluded, nil + return DeltaUpdate{URL: newDeltaURL, Reset: invalidPrevDelta}, newPaths, excluded, nil } // getFolder will lookup the specified folder name under `parentFolderID` @@ -379,6 +409,7 @@ func GetAllFolders( return nil }, + "", ) if err != nil { return nil, errors.Wrapf(err, "getting items for drive %s", *d.GetName()) diff --git a/src/internal/connector/onedrive/item_test.go b/src/internal/connector/onedrive/item_test.go index a2e008ec5..aec2f2474 100644 --- a/src/internal/connector/onedrive/item_test.go +++ b/src/internal/connector/onedrive/item_test.go @@ -126,6 +126,7 @@ func (suite *ItemIntegrationSuite) TestItemReader_oneDrive() { suite.userDriveID, "General", itemCollector, + "", ) require.NoError(suite.T(), err)