From 2b7d76faef50e7e27cff156e8ab3b4b60143f4a4 Mon Sep 17 00:00:00 2001 From: ryanfkeepers Date: Sat, 30 Sep 2023 10:13:46 -0600 Subject: [PATCH] apply channels and streaming to the url cache updates the url cache item enumeration with stream processing using channels. Plus other changes to accomodate the new pattern such as mock and interface updates. This is the second of a multipart update that has been separated for ease of review. CI is not expected to pass until the final PR. --- .../m365/collection/drive/handlers.go | 1 + .../collection/drive/item_collector_test.go | 6 +- .../m365/collection/drive/item_handler.go | 1 + .../m365/collection/drive/library_handler.go | 1 + .../m365/collection/drive/url_cache.go | 60 ++- .../m365/collection/drive/url_cache_test.go | 440 ++++++++++++------ .../m365/service/onedrive/mock/handlers.go | 112 ++--- 7 files changed, 411 insertions(+), 210 deletions(-) diff --git a/src/internal/m365/collection/drive/handlers.go b/src/internal/m365/collection/drive/handlers.go index 4d0b3fc4e..dbe5d8110 100644 --- a/src/internal/m365/collection/drive/handlers.go +++ b/src/internal/m365/collection/drive/handlers.go @@ -85,6 +85,7 @@ type GetItemer interface { type EnumerateDriveItemsDeltaer interface { EnumerateDriveItemsDelta( ctx context.Context, + ch chan<- api.NextPage[models.DriveItemable], driveID, prevDeltaLink string, ) api.NextPageResulter[models.DriveItemable] } diff --git a/src/internal/m365/collection/drive/item_collector_test.go b/src/internal/m365/collection/drive/item_collector_test.go index b6f32a5bc..3b17cbaff 100644 --- a/src/internal/m365/collection/drive/item_collector_test.go +++ b/src/internal/m365/collection/drive/item_collector_test.go @@ -150,8 +150,10 @@ func (suite *ItemCollectorUnitSuite) TestDrives() { Err: assert.AnError, }, }, - expectedErr: assert.Error, - expectedResults: nil, + expectedErr: assert.Error, + // even though we error, the func will return both the + // error and the prior results + expectedResults: resultDrives, }, { name: "MySiteURLNotFound", diff --git a/src/internal/m365/collection/drive/item_handler.go b/src/internal/m365/collection/drive/item_handler.go index 1f7bf328d..7f2878c44 100644 --- a/src/internal/m365/collection/drive/item_handler.go +++ b/src/internal/m365/collection/drive/item_handler.go @@ -136,6 +136,7 @@ func (h itemBackupHandler) IncludesDir(dir string) bool { func (h itemBackupHandler) EnumerateDriveItemsDelta( ctx context.Context, + ch chan<- api.NextPage[models.DriveItemable], driveID, prevDeltaLink string, selectProps []string, ) (api.NextPageResulter[models.DriveItemable], error) { diff --git a/src/internal/m365/collection/drive/library_handler.go b/src/internal/m365/collection/drive/library_handler.go index 42aefa507..763f31e77 100644 --- a/src/internal/m365/collection/drive/library_handler.go +++ b/src/internal/m365/collection/drive/library_handler.go @@ -139,6 +139,7 @@ func (h libraryBackupHandler) IncludesDir(dir string) bool { func (h libraryBackupHandler) EnumerateDriveItemsDelta( ctx context.Context, + ch chan<- api.NextPage[models.DriveItemable], driveID, prevDeltaLink string, selectProps []string, ) api.NextPageResulter[models.DriveItemable] { diff --git a/src/internal/m365/collection/drive/url_cache.go b/src/internal/m365/collection/drive/url_cache.go index 391382dbe..4937e3e90 100644 --- a/src/internal/m365/collection/drive/url_cache.go +++ b/src/internal/m365/collection/drive/url_cache.go @@ -47,7 +47,7 @@ type urlCache struct { refreshMu sync.Mutex deltaQueryCount int - edid EnumerateDriveItemsDeltaer + enumerator EnumerateDriveItemsDeltaer errs *fault.Bus } @@ -56,10 +56,10 @@ type urlCache struct { func newURLCache( driveID, prevDelta string, refreshInterval time.Duration, - edid EnumerateDriveItemsDeltaer, + enumerator EnumerateDriveItemsDeltaer, errs *fault.Bus, ) (*urlCache, error) { - err := validateCacheParams(driveID, refreshInterval, edid) + err := validateCacheParams(driveID, refreshInterval, enumerator) if err != nil { return nil, clues.Wrap(err, "cache params") } @@ -68,7 +68,7 @@ func newURLCache( idToProps: make(map[string]itemProps), lastRefreshTime: time.Time{}, driveID: driveID, - edid: edid, + enumerator: enumerator, prevDelta: prevDelta, refreshInterval: refreshInterval, errs: errs, @@ -80,7 +80,7 @@ func newURLCache( func validateCacheParams( driveID string, refreshInterval time.Duration, - edid EnumerateDriveItemsDeltaer, + enumerator EnumerateDriveItemsDeltaer, ) error { if len(driveID) == 0 { return clues.New("drive id is empty") @@ -90,8 +90,8 @@ func validateCacheParams( return clues.New("invalid refresh interval") } - if edid == nil { - return clues.New("nil item enumerator") + if enumerator == nil { + return clues.New("missing item enumerator") } return nil @@ -157,20 +157,45 @@ func (uc *urlCache) refreshCache( // Issue a delta query to graph logger.Ctx(ctx).Info("refreshing url cache") - items, du, err := uc.edid.EnumerateDriveItemsDelta( - ctx, - uc.driveID, - uc.prevDelta, - api.URLCacheDriveItemProps()) + var ( + ch = make(chan api.NextPage[models.DriveItemable], 1) + cacheErr error + wg = sync.WaitGroup{} + ) + + go func() { + defer wg.Done() + + for pg := range ch { + if cacheErr != nil { + continue + } + + uc.deltaQueryCount++ + + err := uc.updateCache( + ctx, + pg.Items, + pg.Reset, + uc.errs) + if err != nil { + cacheErr = clues.Wrap(err, "updating cache") + } + } + }() + + wg.Add(1) + + du, err := uc.enumerator.EnumerateDriveItemsDelta(ctx, ch, uc.driveID, uc.prevDelta) if err != nil { uc.idToProps = make(map[string]itemProps) return clues.Stack(err) } - uc.deltaQueryCount++ + wg.Wait() - if err := uc.updateCache(ctx, items, uc.errs); err != nil { - return clues.Stack(err) + if cacheErr != nil { + return clues.Stack(cacheErr) } logger.Ctx(ctx).Info("url cache refreshed") @@ -205,10 +230,15 @@ func (uc *urlCache) readCache( func (uc *urlCache) updateCache( ctx context.Context, items []models.DriveItemable, + reset bool, errs *fault.Bus, ) error { el := errs.Local() + if reset { + uc.idToProps = map[string]itemProps{} + } + for _, item := range items { if el.Failure() != nil { break diff --git a/src/internal/m365/collection/drive/url_cache_test.go b/src/internal/m365/collection/drive/url_cache_test.go index 4fa0043fb..61a5f6bf1 100644 --- a/src/internal/m365/collection/drive/url_cache_test.go +++ b/src/internal/m365/collection/drive/url_cache_test.go @@ -2,6 +2,7 @@ package drive import ( "errors" + "fmt" "io" "math/rand" "net/http" @@ -11,6 +12,7 @@ import ( "github.com/alcionai/clues" "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -91,10 +93,22 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() { api.NewDriveItem(newFolderName, true), control.Copy) require.NoError(t, err, clues.ToCore(err)) - require.NotNil(t, newFolder.GetId()) nfid := ptr.Val(newFolder.GetId()) + ch := make(chan api.NextPage[models.DriveItemable], 1) + + go func() { + for { + // no-op, we just need the previous delta + // but also need to drain the channel to + // prevent deadlock. + _, ok := <-ch + if !ok { + return + } + } + }() // Get the previous delta to feed into url cache _, du, err := ac.EnumerateDriveItemsDelta( @@ -196,16 +210,18 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() { table := []struct { name string - pagerItems map[string][]models.DriveItemable - pagerErr map[string]error + pages []api.NextPage[models.DriveItemable] + pagerErr error expectedItemProps map[string]itemProps - expectedErr require.ErrorAssertionFunc - cacheAssert func(*urlCache, time.Time) + expectErr assert.ErrorAssertionFunc + expect func(*testing.T, *urlCache, time.Time) }{ { name: "single item in cache", - pagerItems: map[string][]models.DriveItemable{ - driveID: {fileItem("1", "file1", "root", "root", "https://dummy1.com", false)}, + pages: []api.NextPage[models.DriveItemable]{ + {Items: []models.DriveItemable{ + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + }}, }, expectedItemProps: map[string]itemProps{ "1": { @@ -213,22 +229,121 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() { isDeleted: false, }, }, - expectedErr: require.NoError, - cacheAssert: func(uc *urlCache, startTime time.Time) { - require.Greater(suite.T(), uc.lastRefreshTime, startTime) - require.Equal(suite.T(), 1, uc.deltaQueryCount) - require.Equal(suite.T(), 1, len(uc.idToProps)) + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.deltaQueryCount) + assert.Equal(t, 1, len(uc.idToProps)) }, }, { name: "multiple items in cache", - pagerItems: map[string][]models.DriveItemable{ - driveID: { + pages: []api.NextPage[models.DriveItemable]{ + {Items: []models.DriveItemable{ fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("2", "file2", "root", "root", "https://dummy2.com", false), fileItem("3", "file3", "root", "root", "https://dummy3.com", false), fileItem("4", "file4", "root", "root", "https://dummy4.com", false), fileItem("5", "file5", "root", "root", "https://dummy5.com", false), + }}, + }, + expectedItemProps: map[string]itemProps{ + "1": { + downloadURL: "https://dummy1.com", + isDeleted: false, + }, + "2": { + downloadURL: "https://dummy2.com", + isDeleted: false, + }, + "3": { + downloadURL: "https://dummy3.com", + isDeleted: false, + }, + "4": { + downloadURL: "https://dummy4.com", + isDeleted: false, + }, + "5": { + downloadURL: "https://dummy5.com", + isDeleted: false, + }, + }, + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.deltaQueryCount) + assert.Equal(t, 5, len(uc.idToProps)) + }, + }, + { + name: "multiple pages", + pages: []api.NextPage[models.DriveItemable]{ + {Items: []models.DriveItemable{ + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + fileItem("2", "file2", "root", "root", "https://dummy2.com", false), + fileItem("3", "file3", "root", "root", "https://dummy3.com", false), + }}, + {Items: []models.DriveItemable{ + fileItem("4", "file4", "root", "root", "https://dummy4.com", false), + fileItem("5", "file5", "root", "root", "https://dummy5.com", false), + }}, + }, + expectedItemProps: map[string]itemProps{ + "1": { + downloadURL: "https://dummy1.com", + isDeleted: false, + }, + "2": { + downloadURL: "https://dummy2.com", + isDeleted: false, + }, + "3": { + downloadURL: "https://dummy3.com", + isDeleted: false, + }, + "4": { + downloadURL: "https://dummy4.com", + isDeleted: false, + }, + "5": { + downloadURL: "https://dummy5.com", + isDeleted: false, + }, + }, + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 2, uc.deltaQueryCount) + assert.Equal(t, 5, len(uc.idToProps)) + }, + }, + { + name: "multiple pages with resets", + pages: []api.NextPage[models.DriveItemable]{ + { + Items: []models.DriveItemable{ + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + fileItem("2", "file2", "root", "root", "https://dummy2.com", false), + fileItem("3", "file3", "root", "root", "https://dummy3.com", false), + }, + }, + { + Items: []models.DriveItemable{}, + Reset: true, + }, + { + Items: []models.DriveItemable{ + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + fileItem("2", "file2", "root", "root", "https://dummy2.com", false), + fileItem("3", "file3", "root", "root", "https://dummy3.com", false), + }, + }, + { + Items: []models.DriveItemable{ + fileItem("4", "file4", "root", "root", "https://dummy4.com", false), + fileItem("5", "file5", "root", "root", "https://dummy5.com", false), + }, }, }, expectedItemProps: map[string]itemProps{ @@ -253,23 +368,77 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() { isDeleted: false, }, }, - expectedErr: require.NoError, - cacheAssert: func(uc *urlCache, startTime time.Time) { - require.Greater(suite.T(), uc.lastRefreshTime, startTime) - require.Equal(suite.T(), 1, uc.deltaQueryCount) - require.Equal(suite.T(), 5, len(uc.idToProps)) + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 4, uc.deltaQueryCount) + assert.Equal(t, 5, len(uc.idToProps)) + }, + }, + { + name: "multiple pages with resets and combo reset+items in page", + pages: []api.NextPage[models.DriveItemable]{ + { + Items: []models.DriveItemable{ + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + fileItem("2", "file2", "root", "root", "https://dummy2.com", false), + fileItem("3", "file3", "root", "root", "https://dummy3.com", false), + }, + }, + { + Items: []models.DriveItemable{ + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + fileItem("2", "file2", "root", "root", "https://dummy2.com", false), + fileItem("3", "file3", "root", "root", "https://dummy3.com", false), + }, + Reset: true, + }, + { + Items: []models.DriveItemable{ + fileItem("4", "file4", "root", "root", "https://dummy4.com", false), + fileItem("5", "file5", "root", "root", "https://dummy5.com", false), + }, + }, + }, + expectedItemProps: map[string]itemProps{ + "1": { + downloadURL: "https://dummy1.com", + isDeleted: false, + }, + "2": { + downloadURL: "https://dummy2.com", + isDeleted: false, + }, + "3": { + downloadURL: "https://dummy3.com", + isDeleted: false, + }, + "4": { + downloadURL: "https://dummy4.com", + isDeleted: false, + }, + "5": { + downloadURL: "https://dummy5.com", + isDeleted: false, + }, + }, + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 3, uc.deltaQueryCount) + assert.Equal(t, 5, len(uc.idToProps)) }, }, { name: "duplicate items with potentially new urls", - pagerItems: map[string][]models.DriveItemable{ - driveID: { + pages: []api.NextPage[models.DriveItemable]{ + {Items: []models.DriveItemable{ fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("2", "file2", "root", "root", "https://dummy2.com", false), fileItem("3", "file3", "root", "root", "https://dummy3.com", false), fileItem("1", "file1", "root", "root", "https://test1.com", false), fileItem("2", "file2", "root", "root", "https://test2.com", false), - }, + }}, }, expectedItemProps: map[string]itemProps{ "1": { @@ -285,21 +454,21 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() { isDeleted: false, }, }, - expectedErr: require.NoError, - cacheAssert: func(uc *urlCache, startTime time.Time) { - require.Greater(suite.T(), uc.lastRefreshTime, startTime) - require.Equal(suite.T(), 1, uc.deltaQueryCount) - require.Equal(suite.T(), 3, len(uc.idToProps)) + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.deltaQueryCount) + assert.Equal(t, 3, len(uc.idToProps)) }, }, { name: "deleted items", - pagerItems: map[string][]models.DriveItemable{ - driveID: { + pages: []api.NextPage[models.DriveItemable]{ + {Items: []models.DriveItemable{ fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("2", "file2", "root", "root", "https://dummy2.com", false), fileItem("1", "file1", "root", "root", "https://dummy1.com", true), - }, + }}, }, expectedItemProps: map[string]itemProps{ "1": { @@ -311,111 +480,122 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() { isDeleted: false, }, }, - expectedErr: require.NoError, - cacheAssert: func(uc *urlCache, startTime time.Time) { - require.Greater(suite.T(), uc.lastRefreshTime, startTime) - require.Equal(suite.T(), 1, uc.deltaQueryCount) - require.Equal(suite.T(), 2, len(uc.idToProps)) + expectErr: assert.NoError, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.deltaQueryCount) + assert.Equal(t, 2, len(uc.idToProps)) }, }, { name: "item not found in cache", - pagerItems: map[string][]models.DriveItemable{ - driveID: {fileItem("1", "file1", "root", "root", "https://dummy1.com", false)}, + pages: []api.NextPage[models.DriveItemable]{ + {Items: []models.DriveItemable{ + fileItem("1", "file1", "root", "root", "https://dummy1.com", false), + }}, }, expectedItemProps: map[string]itemProps{ "2": {}, }, - expectedErr: require.Error, - cacheAssert: func(uc *urlCache, startTime time.Time) { - require.Greater(suite.T(), uc.lastRefreshTime, startTime) - require.Equal(suite.T(), 1, uc.deltaQueryCount) - require.Equal(suite.T(), 1, len(uc.idToProps)) + expectErr: assert.Error, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.deltaQueryCount) + assert.Equal(t, 1, len(uc.idToProps)) }, }, { - name: "delta query error", - pagerItems: map[string][]models.DriveItemable{}, - pagerErr: map[string]error{ - driveID: errors.New("delta query error"), + name: "delta query error", + pages: []api.NextPage[models.DriveItemable]{ + {Items: []models.DriveItemable{}}, }, + pagerErr: errors.New("delta query error"), expectedItemProps: map[string]itemProps{ "1": {}, "2": {}, }, - expectedErr: require.Error, - cacheAssert: func(uc *urlCache, _ time.Time) { - require.Equal(suite.T(), time.Time{}, uc.lastRefreshTime) - require.Equal(suite.T(), 0, uc.deltaQueryCount) - require.Equal(suite.T(), 0, len(uc.idToProps)) + expectErr: assert.Error, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Equal(t, time.Time{}, uc.lastRefreshTime) + assert.NotZero(t, uc.deltaQueryCount) + assert.Equal(t, 0, len(uc.idToProps)) }, }, { name: "folder item", - pagerItems: map[string][]models.DriveItemable{ - driveID: { + pages: []api.NextPage[models.DriveItemable]{ + {Items: []models.DriveItemable{ fileItem("1", "file1", "root", "root", "https://dummy1.com", false), driveItem("2", "folder2", "root", "root", false, true, false), - }, + }}, }, expectedItemProps: map[string]itemProps{ "2": {}, }, - expectedErr: require.Error, - cacheAssert: func(uc *urlCache, startTime time.Time) { - require.Greater(suite.T(), uc.lastRefreshTime, startTime) - require.Equal(suite.T(), 1, uc.deltaQueryCount) - require.Equal(suite.T(), 1, len(uc.idToProps)) + expectErr: assert.Error, + expect: func(t *testing.T, uc *urlCache, startTime time.Time) { + assert.Greater(t, uc.lastRefreshTime, startTime) + assert.Equal(t, 1, uc.deltaQueryCount) + assert.Equal(t, 1, len(uc.idToProps)) }, }, } for _, test := range table { suite.Run(test.name, func() { - t := suite.T() - ctx, flush := tester.NewContext(t) - defer flush() + for _, numConcurrentReqs := range []int{1, 2, 32} { + crTestName := fmt.Sprintf("%d_concurrent_reqs", numConcurrentReqs) + suite.Run(crTestName, func() { + t := suite.T() - medi := mock.EnumeratesDriveItemsDelta{ - Items: test.pagerItems, - Err: test.pagerErr, - DeltaUpdate: map[string]api.DeltaUpdate{driveID: {URL: deltaString}}, - } + ctx, flush := tester.NewContext(t) + defer flush() - cache, err := newURLCache( - driveID, - "", - 1*time.Hour, - &medi, - fault.New(true)) - - require.NoError(suite.T(), err, clues.ToCore(err)) - - numConcurrentReq := 100 - var wg sync.WaitGroup - wg.Add(numConcurrentReq) - - startTime := time.Now() - - for i := 0; i < numConcurrentReq; i++ { - go func() { - defer wg.Done() - - for id, expected := range test.expectedItemProps { - time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) - - props, err := cache.getItemProperties(ctx, id) - - test.expectedErr(suite.T(), err, clues.ToCore(err)) - require.Equal(suite.T(), expected, props) + medi := mock.EnumeratesDriveItemsDelta[models.DriveItemable]{ + Pages: map[string][]api.NextPage[models.DriveItemable]{ + driveID: test.pages, + }, + Err: map[string]error{ + driveID: test.pagerErr, + }, + DeltaUpdate: map[string]api.DeltaUpdate{ + driveID: {URL: deltaString}, + }, } - }() + + cache, err := newURLCache( + driveID, + "", + 1*time.Hour, + &medi, + fault.New(true)) + require.NoError(t, err, clues.ToCore(err)) + + var wg sync.WaitGroup + wg.Add(numConcurrentReqs) + + startTime := time.Now() + + for i := 0; i < numConcurrentReqs; i++ { + go func(ti int) { + defer wg.Done() + + for id, expected := range test.expectedItemProps { + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + + props, err := cache.getItemProperties(ctx, id) + test.expectErr(t, err, clues.ToCore(err)) + assert.Equal(t, expected, props) + } + }(i) + } + + wg.Wait() + + test.expect(t, cache, startTime) + }) } - - wg.Wait() - - test.cacheAssert(cache, startTime) }) } } @@ -432,7 +612,7 @@ func (suite *URLCacheUnitSuite) TestNeedsRefresh() { driveID, "", refreshInterval, - &mock.EnumeratesDriveItemsDelta{}, + &mock.EnumeratesDriveItemsDelta[models.DriveItemable]{}, fault.New(true)) require.NoError(t, err, clues.ToCore(err)) @@ -456,44 +636,44 @@ func (suite *URLCacheUnitSuite) TestNeedsRefresh() { func (suite *URLCacheUnitSuite) TestNewURLCache() { table := []struct { - name string - driveID string - refreshInt time.Duration - itemPager EnumerateDriveItemsDeltaer - errors *fault.Bus - expectedErr require.ErrorAssertionFunc + name string + driveID string + refreshInt time.Duration + itemPager EnumerateDriveItemsDeltaer + errors *fault.Bus + expectErr require.ErrorAssertionFunc }{ { - name: "invalid driveID", - driveID: "", - refreshInt: 1 * time.Hour, - itemPager: &mock.EnumeratesDriveItemsDelta{}, - errors: fault.New(true), - expectedErr: require.Error, + name: "invalid driveID", + driveID: "", + refreshInt: 1 * time.Hour, + itemPager: &mock.EnumeratesDriveItemsDelta[models.DriveItemable]{}, + errors: fault.New(true), + expectErr: require.Error, }, { - name: "invalid refresh interval", - driveID: "drive1", - refreshInt: 100 * time.Millisecond, - itemPager: &mock.EnumeratesDriveItemsDelta{}, - errors: fault.New(true), - expectedErr: require.Error, + name: "invalid refresh interval", + driveID: "drive1", + refreshInt: 100 * time.Millisecond, + itemPager: &mock.EnumeratesDriveItemsDelta[models.DriveItemable]{}, + errors: fault.New(true), + expectErr: require.Error, }, { - name: "invalid item enumerator", - driveID: "drive1", - refreshInt: 1 * time.Hour, - itemPager: nil, - errors: fault.New(true), - expectedErr: require.Error, + name: "invalid item enumerator", + driveID: "drive1", + refreshInt: 1 * time.Hour, + itemPager: nil, + errors: fault.New(true), + expectErr: require.Error, }, { - name: "valid", - driveID: "drive1", - refreshInt: 1 * time.Hour, - itemPager: &mock.EnumeratesDriveItemsDelta{}, - errors: fault.New(true), - expectedErr: require.NoError, + name: "valid", + driveID: "drive1", + refreshInt: 1 * time.Hour, + itemPager: &mock.EnumeratesDriveItemsDelta[models.DriveItemable]{}, + errors: fault.New(true), + expectErr: require.NoError, }, } @@ -507,7 +687,7 @@ func (suite *URLCacheUnitSuite) TestNewURLCache() { test.itemPager, test.errors) - test.expectedErr(t, err, clues.ToCore(err)) + test.expectErr(t, err, clues.ToCore(err)) }) } } diff --git a/src/internal/m365/service/onedrive/mock/handlers.go b/src/internal/m365/service/onedrive/mock/handlers.go index c0bdd81f0..d3b71e3c5 100644 --- a/src/internal/m365/service/onedrive/mock/handlers.go +++ b/src/internal/m365/service/onedrive/mock/handlers.go @@ -14,18 +14,21 @@ import ( "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/services/m365/api" - apiMock "github.com/alcionai/corso/src/pkg/services/m365/api/mock" ) // --------------------------------------------------------------------------- // Backup Handler // --------------------------------------------------------------------------- -type BackupHandler struct { +type BackupHandler[T any] struct { ItemInfo details.ItemInfo + // FIXME: this is a hacky solution. Better to use an interface + // and plug in the selector scope there. + Sel selectors.Selector - DriveItemEnumeration EnumeratesDriveItemsDelta + DriveItemEnumeration EnumeratesDriveItemsDelta[T] GI GetsItem GIP GetsItemPermission @@ -54,13 +57,17 @@ type BackupHandler struct { GetErrs []error } -func DefaultOneDriveBH(resourceOwner string) *BackupHandler { - return &BackupHandler{ +func DefaultOneDriveBH(resourceOwner string) *BackupHandler[models.DriveItemable] { + sel := selectors.NewOneDriveBackup([]string{resourceOwner}) + sel.Include(sel.AllData()) + + return &BackupHandler[models.DriveItemable]{ ItemInfo: details.ItemInfo{ OneDrive: &details.OneDriveInfo{}, Extension: &details.ExtensionData{}, }, - DriveItemEnumeration: EnumeratesDriveItemsDelta{}, + Sel: sel.Selector, + DriveItemEnumeration: EnumeratesDriveItemsDelta[models.DriveItemable]{}, GI: GetsItem{Err: clues.New("not defined")}, GIP: GetsItemPermission{Err: clues.New("not defined")}, PathPrefixFn: defaultOneDrivePathPrefixer, @@ -75,12 +82,16 @@ func DefaultOneDriveBH(resourceOwner string) *BackupHandler { } } -func DefaultSharePointBH(resourceOwner string) *BackupHandler { - return &BackupHandler{ +func DefaultSharePointBH(resourceOwner string) *BackupHandler[models.DriveItemable] { + sel := selectors.NewOneDriveBackup([]string{resourceOwner}) + sel.Include(sel.AllData()) + + return &BackupHandler[models.DriveItemable]{ ItemInfo: details.ItemInfo{ SharePoint: &details.SharePointInfo{}, Extension: &details.ExtensionData{}, }, + Sel: sel.Selector, GI: GetsItem{Err: clues.New("not defined")}, GIP: GetsItemPermission{Err: clues.New("not defined")}, PathPrefixFn: defaultSharePointPathPrefixer, @@ -95,8 +106,8 @@ func DefaultSharePointBH(resourceOwner string) *BackupHandler { } } -func (h BackupHandler) PathPrefix(tID, driveID string) (path.Path, error) { - pp, err := h.PathPrefixFn(tID, h.ProtectedResource.ID(), driveID) +func (h BackupHandler[T]) PathPrefix(tID, driveID string) (path.Path, error) { + pp, err := h.PathPrefixFn(tID, h.ResourceOwner, driveID) if err != nil { return nil, err } @@ -104,8 +115,8 @@ func (h BackupHandler) PathPrefix(tID, driveID string) (path.Path, error) { return pp, h.PathPrefixErr } -func (h BackupHandler) MetadataPathPrefix(tID string) (path.Path, error) { - pp, err := h.MetadataPathPrefixFn(tID, h.ProtectedResource.ID()) +func (h BackupHandler[T]) MetadataPathPrefix(tID string) (path.Path, error) { + pp, err := h.MetadataPathPrefixFn(tID, h.ResourceOwner) if err != nil { return nil, err } @@ -113,8 +124,8 @@ func (h BackupHandler) MetadataPathPrefix(tID string) (path.Path, error) { return pp, h.PathPrefixErr } -func (h BackupHandler) CanonicalPath(pb *path.Builder, tID string) (path.Path, error) { - cp, err := h.CanonPathFn(pb, tID, h.ProtectedResource.ID()) +func (h BackupHandler[T]) CanonicalPath(pb *path.Builder, tID string) (path.Path, error) { + cp, err := h.CanonPathFn(pb, tID, h.ProtectedResource) if err != nil { return nil, err } @@ -122,19 +133,19 @@ func (h BackupHandler) CanonicalPath(pb *path.Builder, tID string) (path.Path, e return cp, h.CanonPathErr } -func (h BackupHandler) ServiceCat() (path.ServiceType, path.CategoryType) { +func (h BackupHandler[T]) ServiceCat() (path.ServiceType, path.CategoryType) { return h.Service, h.Category } -func (h BackupHandler) NewDrivePager(string, []string) api.Pager[models.Driveable] { +func (h BackupHandler[T]) NewDrivePager(string, []string) api.Pager[models.Driveable] { return h.DrivePagerV } -func (h BackupHandler) FormatDisplayPath(_ string, pb *path.Builder) string { +func (h BackupHandler[T]) FormatDisplayPath(_ string, pb *path.Builder) string { return "/" + pb.String() } -func (h BackupHandler) NewLocationIDer(driveID string, elems ...string) details.LocationIDer { +func (h BackupHandler[T]) NewLocationIDer(driveID string, elems ...string) details.LocationIDer { return h.LocationIDFn(driveID, elems...) } @@ -148,7 +159,7 @@ func (h BackupHandler) AugmentItemInfo( return h.ItemInfo } -func (h *BackupHandler) Get(context.Context, string, map[string]string) (*http.Response, error) { +func (h *BackupHandler[T]) Get(context.Context, string, map[string]string) (*http.Response, error) { c := h.getCall h.getCall++ @@ -160,8 +171,9 @@ func (h *BackupHandler) Get(context.Context, string, map[string]string) (*http.R return h.GetResps[c], h.GetErrs[c] } -func (h BackupHandler) EnumerateDriveItemsDelta( +func (h BackupHandler[T]) EnumerateDriveItemsDelta( ctx context.Context, + ch chan<- api.NextPage[T], driveID, prevDeltaLink string, selectProps []string, ) ([]models.DriveItemable, api.DeltaUpdate, error) { @@ -172,11 +184,11 @@ func (h BackupHandler) EnumerateDriveItemsDelta( selectProps) } -func (h BackupHandler) GetItem(ctx context.Context, _, _ string) (models.DriveItemable, error) { +func (h BackupHandler[T]) GetItem(ctx context.Context, _, _ string) (models.DriveItemable, error) { return h.GI.GetItem(ctx, "", "") } -func (h BackupHandler) GetItemPermission( +func (h BackupHandler[T]) GetItemPermission( ctx context.Context, _, _ string, ) (models.PermissionCollectionResponseable, error) { @@ -250,12 +262,16 @@ var defaultSharePointLocationIDer = func(driveID string, elems ...string) detail return details.NewSharePointLocationIDer(driveID, elems...) } -func (h BackupHandler) IsAllPass() bool { - return true +func (h BackupHandler[T]) IsAllPass() bool { + scope := h.Sel.Includes[0] + return selectors.IsAnyTarget(selectors.SharePointScope(scope), selectors.SharePointLibraryFolder) || + selectors.IsAnyTarget(selectors.OneDriveScope(scope), selectors.OneDriveFolder) } -func (h BackupHandler) IncludesDir(string) bool { - return true +func (h BackupHandler[T]) IncludesDir(dir string) bool { + scope := h.Sel.Includes[0] + return selectors.SharePointScope(scope).Matches(selectors.SharePointLibraryFolder, dir) || + selectors.OneDriveScope(scope).Matches(selectors.OneDriveFolder, dir) } // --------------------------------------------------------------------------- @@ -278,56 +294,26 @@ func (m GetsItem) GetItem( // Enumerates Drive Items // --------------------------------------------------------------------------- -type EnumeratesDriveItemsDelta struct { - Items map[string][]models.DriveItemable +type EnumeratesDriveItemsDelta[T any] struct { + Pages map[string][]api.NextPage[T] DeltaUpdate map[string]api.DeltaUpdate Err map[string]error } -func (edi EnumeratesDriveItemsDelta) EnumerateDriveItemsDelta( +func (edi EnumeratesDriveItemsDelta[T]) EnumerateDriveItemsDelta( _ context.Context, + ch chan<- api.NextPage[T], driveID, _ string, _ []string, ) api.NextPageResulter[models.DriveItemable] { return edi.Items[driveID], edi.DeltaUpdate[driveID], edi.Err[driveID] } -func PagerResultToEDID( - m map[string][]apiMock.PagerResult[models.DriveItemable], -) EnumeratesDriveItemsDelta { - edi := EnumeratesDriveItemsDelta{ - Items: map[string][]models.DriveItemable{}, - DeltaUpdate: map[string]api.DeltaUpdate{}, - Err: map[string]error{}, + for _, page := range edi.Pages[driveID] { + ch <- page } - for driveID, results := range m { - var ( - err error - items = []models.DriveItemable{} - deltaUpdate api.DeltaUpdate - ) - - for _, pr := range results { - items = append(items, pr.Values...) - - if pr.DeltaLink != nil { - deltaUpdate = api.DeltaUpdate{URL: ptr.Val(pr.DeltaLink)} - } - - if pr.Err != nil { - err = pr.Err - } - - deltaUpdate.Reset = deltaUpdate.Reset || pr.ResetDelta - } - - edi.Items[driveID] = items - edi.Err[driveID] = err - edi.DeltaUpdate[driveID] = deltaUpdate - } - - return edi + return edi.DeltaUpdate[driveID], edi.Err[driveID] } // ---------------------------------------------------------------------------