apply channels and streaming to the url cache

updates the url cache item enumeration with stream processing
using channels.  Plus other changes to accomodate the
new pattern such as mock and interface updates.

This is the second of a multipart update that has been separated
for ease of review.  CI is not expected to pass until the final
PR.
This commit is contained in:
ryanfkeepers 2023-09-30 10:13:46 -06:00
parent a81f526cc6
commit 2b7d76faef
7 changed files with 411 additions and 210 deletions

View File

@ -85,6 +85,7 @@ type GetItemer interface {
type EnumerateDriveItemsDeltaer interface { type EnumerateDriveItemsDeltaer interface {
EnumerateDriveItemsDelta( EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
ch chan<- api.NextPage[models.DriveItemable],
driveID, prevDeltaLink string, driveID, prevDeltaLink string,
) api.NextPageResulter[models.DriveItemable] ) api.NextPageResulter[models.DriveItemable]
} }

View File

@ -150,8 +150,10 @@ func (suite *ItemCollectorUnitSuite) TestDrives() {
Err: assert.AnError, Err: assert.AnError,
}, },
}, },
expectedErr: assert.Error, expectedErr: assert.Error,
expectedResults: nil, // even though we error, the func will return both the
// error and the prior results
expectedResults: resultDrives,
}, },
{ {
name: "MySiteURLNotFound", name: "MySiteURLNotFound",

View File

@ -136,6 +136,7 @@ func (h itemBackupHandler) IncludesDir(dir string) bool {
func (h itemBackupHandler) EnumerateDriveItemsDelta( func (h itemBackupHandler) EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
ch chan<- api.NextPage[models.DriveItemable],
driveID, prevDeltaLink string, driveID, prevDeltaLink string,
selectProps []string, selectProps []string,
) (api.NextPageResulter[models.DriveItemable], error) { ) (api.NextPageResulter[models.DriveItemable], error) {

View File

@ -139,6 +139,7 @@ func (h libraryBackupHandler) IncludesDir(dir string) bool {
func (h libraryBackupHandler) EnumerateDriveItemsDelta( func (h libraryBackupHandler) EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
ch chan<- api.NextPage[models.DriveItemable],
driveID, prevDeltaLink string, driveID, prevDeltaLink string,
selectProps []string, selectProps []string,
) api.NextPageResulter[models.DriveItemable] { ) api.NextPageResulter[models.DriveItemable] {

View File

@ -47,7 +47,7 @@ type urlCache struct {
refreshMu sync.Mutex refreshMu sync.Mutex
deltaQueryCount int deltaQueryCount int
edid EnumerateDriveItemsDeltaer enumerator EnumerateDriveItemsDeltaer
errs *fault.Bus errs *fault.Bus
} }
@ -56,10 +56,10 @@ type urlCache struct {
func newURLCache( func newURLCache(
driveID, prevDelta string, driveID, prevDelta string,
refreshInterval time.Duration, refreshInterval time.Duration,
edid EnumerateDriveItemsDeltaer, enumerator EnumerateDriveItemsDeltaer,
errs *fault.Bus, errs *fault.Bus,
) (*urlCache, error) { ) (*urlCache, error) {
err := validateCacheParams(driveID, refreshInterval, edid) err := validateCacheParams(driveID, refreshInterval, enumerator)
if err != nil { if err != nil {
return nil, clues.Wrap(err, "cache params") return nil, clues.Wrap(err, "cache params")
} }
@ -68,7 +68,7 @@ func newURLCache(
idToProps: make(map[string]itemProps), idToProps: make(map[string]itemProps),
lastRefreshTime: time.Time{}, lastRefreshTime: time.Time{},
driveID: driveID, driveID: driveID,
edid: edid, enumerator: enumerator,
prevDelta: prevDelta, prevDelta: prevDelta,
refreshInterval: refreshInterval, refreshInterval: refreshInterval,
errs: errs, errs: errs,
@ -80,7 +80,7 @@ func newURLCache(
func validateCacheParams( func validateCacheParams(
driveID string, driveID string,
refreshInterval time.Duration, refreshInterval time.Duration,
edid EnumerateDriveItemsDeltaer, enumerator EnumerateDriveItemsDeltaer,
) error { ) error {
if len(driveID) == 0 { if len(driveID) == 0 {
return clues.New("drive id is empty") return clues.New("drive id is empty")
@ -90,8 +90,8 @@ func validateCacheParams(
return clues.New("invalid refresh interval") return clues.New("invalid refresh interval")
} }
if edid == nil { if enumerator == nil {
return clues.New("nil item enumerator") return clues.New("missing item enumerator")
} }
return nil return nil
@ -157,20 +157,45 @@ func (uc *urlCache) refreshCache(
// Issue a delta query to graph // Issue a delta query to graph
logger.Ctx(ctx).Info("refreshing url cache") logger.Ctx(ctx).Info("refreshing url cache")
items, du, err := uc.edid.EnumerateDriveItemsDelta( var (
ctx, ch = make(chan api.NextPage[models.DriveItemable], 1)
uc.driveID, cacheErr error
uc.prevDelta, wg = sync.WaitGroup{}
api.URLCacheDriveItemProps()) )
go func() {
defer wg.Done()
for pg := range ch {
if cacheErr != nil {
continue
}
uc.deltaQueryCount++
err := uc.updateCache(
ctx,
pg.Items,
pg.Reset,
uc.errs)
if err != nil {
cacheErr = clues.Wrap(err, "updating cache")
}
}
}()
wg.Add(1)
du, err := uc.enumerator.EnumerateDriveItemsDelta(ctx, ch, uc.driveID, uc.prevDelta)
if err != nil { if err != nil {
uc.idToProps = make(map[string]itemProps) uc.idToProps = make(map[string]itemProps)
return clues.Stack(err) return clues.Stack(err)
} }
uc.deltaQueryCount++ wg.Wait()
if err := uc.updateCache(ctx, items, uc.errs); err != nil { if cacheErr != nil {
return clues.Stack(err) return clues.Stack(cacheErr)
} }
logger.Ctx(ctx).Info("url cache refreshed") logger.Ctx(ctx).Info("url cache refreshed")
@ -205,10 +230,15 @@ func (uc *urlCache) readCache(
func (uc *urlCache) updateCache( func (uc *urlCache) updateCache(
ctx context.Context, ctx context.Context,
items []models.DriveItemable, items []models.DriveItemable,
reset bool,
errs *fault.Bus, errs *fault.Bus,
) error { ) error {
el := errs.Local() el := errs.Local()
if reset {
uc.idToProps = map[string]itemProps{}
}
for _, item := range items { for _, item := range items {
if el.Failure() != nil { if el.Failure() != nil {
break break

View File

@ -2,6 +2,7 @@ package drive
import ( import (
"errors" "errors"
"fmt"
"io" "io"
"math/rand" "math/rand"
"net/http" "net/http"
@ -11,6 +12,7 @@ import (
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -91,10 +93,22 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
api.NewDriveItem(newFolderName, true), api.NewDriveItem(newFolderName, true),
control.Copy) control.Copy)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
require.NotNil(t, newFolder.GetId()) require.NotNil(t, newFolder.GetId())
nfid := ptr.Val(newFolder.GetId()) nfid := ptr.Val(newFolder.GetId())
ch := make(chan api.NextPage[models.DriveItemable], 1)
go func() {
for {
// no-op, we just need the previous delta
// but also need to drain the channel to
// prevent deadlock.
_, ok := <-ch
if !ok {
return
}
}
}()
// Get the previous delta to feed into url cache // Get the previous delta to feed into url cache
_, du, err := ac.EnumerateDriveItemsDelta( _, du, err := ac.EnumerateDriveItemsDelta(
@ -196,16 +210,18 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
table := []struct { table := []struct {
name string name string
pagerItems map[string][]models.DriveItemable pages []api.NextPage[models.DriveItemable]
pagerErr map[string]error pagerErr error
expectedItemProps map[string]itemProps expectedItemProps map[string]itemProps
expectedErr require.ErrorAssertionFunc expectErr assert.ErrorAssertionFunc
cacheAssert func(*urlCache, time.Time) expect func(*testing.T, *urlCache, time.Time)
}{ }{
{ {
name: "single item in cache", name: "single item in cache",
pagerItems: map[string][]models.DriveItemable{ pages: []api.NextPage[models.DriveItemable]{
driveID: {fileItem("1", "file1", "root", "root", "https://dummy1.com", false)}, {Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
}},
}, },
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
"1": { "1": {
@ -213,22 +229,121 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
isDeleted: false, isDeleted: false,
}, },
}, },
expectedErr: require.NoError, expectErr: assert.NoError,
cacheAssert: func(uc *urlCache, startTime time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Greater(suite.T(), uc.lastRefreshTime, startTime) assert.Greater(t, uc.lastRefreshTime, startTime)
require.Equal(suite.T(), 1, uc.deltaQueryCount) assert.Equal(t, 1, uc.deltaQueryCount)
require.Equal(suite.T(), 1, len(uc.idToProps)) assert.Equal(t, 1, len(uc.idToProps))
}, },
}, },
{ {
name: "multiple items in cache", name: "multiple items in cache",
pagerItems: map[string][]models.DriveItemable{ pages: []api.NextPage[models.DriveItemable]{
driveID: { {Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false), fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false), fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
fileItem("4", "file4", "root", "root", "https://dummy4.com", false), fileItem("4", "file4", "root", "root", "https://dummy4.com", false),
fileItem("5", "file5", "root", "root", "https://dummy5.com", false), fileItem("5", "file5", "root", "root", "https://dummy5.com", false),
}},
},
expectedItemProps: map[string]itemProps{
"1": {
downloadURL: "https://dummy1.com",
isDeleted: false,
},
"2": {
downloadURL: "https://dummy2.com",
isDeleted: false,
},
"3": {
downloadURL: "https://dummy3.com",
isDeleted: false,
},
"4": {
downloadURL: "https://dummy4.com",
isDeleted: false,
},
"5": {
downloadURL: "https://dummy5.com",
isDeleted: false,
},
},
expectErr: assert.NoError,
expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
assert.Greater(t, uc.lastRefreshTime, startTime)
assert.Equal(t, 1, uc.deltaQueryCount)
assert.Equal(t, 5, len(uc.idToProps))
},
},
{
name: "multiple pages",
pages: []api.NextPage[models.DriveItemable]{
{Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
}},
{Items: []models.DriveItemable{
fileItem("4", "file4", "root", "root", "https://dummy4.com", false),
fileItem("5", "file5", "root", "root", "https://dummy5.com", false),
}},
},
expectedItemProps: map[string]itemProps{
"1": {
downloadURL: "https://dummy1.com",
isDeleted: false,
},
"2": {
downloadURL: "https://dummy2.com",
isDeleted: false,
},
"3": {
downloadURL: "https://dummy3.com",
isDeleted: false,
},
"4": {
downloadURL: "https://dummy4.com",
isDeleted: false,
},
"5": {
downloadURL: "https://dummy5.com",
isDeleted: false,
},
},
expectErr: assert.NoError,
expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
assert.Greater(t, uc.lastRefreshTime, startTime)
assert.Equal(t, 2, uc.deltaQueryCount)
assert.Equal(t, 5, len(uc.idToProps))
},
},
{
name: "multiple pages with resets",
pages: []api.NextPage[models.DriveItemable]{
{
Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
},
},
{
Items: []models.DriveItemable{},
Reset: true,
},
{
Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
},
},
{
Items: []models.DriveItemable{
fileItem("4", "file4", "root", "root", "https://dummy4.com", false),
fileItem("5", "file5", "root", "root", "https://dummy5.com", false),
},
}, },
}, },
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
@ -253,23 +368,77 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
isDeleted: false, isDeleted: false,
}, },
}, },
expectedErr: require.NoError, expectErr: assert.NoError,
cacheAssert: func(uc *urlCache, startTime time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Greater(suite.T(), uc.lastRefreshTime, startTime) assert.Greater(t, uc.lastRefreshTime, startTime)
require.Equal(suite.T(), 1, uc.deltaQueryCount) assert.Equal(t, 4, uc.deltaQueryCount)
require.Equal(suite.T(), 5, len(uc.idToProps)) assert.Equal(t, 5, len(uc.idToProps))
},
},
{
name: "multiple pages with resets and combo reset+items in page",
pages: []api.NextPage[models.DriveItemable]{
{
Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
},
},
{
Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
},
Reset: true,
},
{
Items: []models.DriveItemable{
fileItem("4", "file4", "root", "root", "https://dummy4.com", false),
fileItem("5", "file5", "root", "root", "https://dummy5.com", false),
},
},
},
expectedItemProps: map[string]itemProps{
"1": {
downloadURL: "https://dummy1.com",
isDeleted: false,
},
"2": {
downloadURL: "https://dummy2.com",
isDeleted: false,
},
"3": {
downloadURL: "https://dummy3.com",
isDeleted: false,
},
"4": {
downloadURL: "https://dummy4.com",
isDeleted: false,
},
"5": {
downloadURL: "https://dummy5.com",
isDeleted: false,
},
},
expectErr: assert.NoError,
expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
assert.Greater(t, uc.lastRefreshTime, startTime)
assert.Equal(t, 3, uc.deltaQueryCount)
assert.Equal(t, 5, len(uc.idToProps))
}, },
}, },
{ {
name: "duplicate items with potentially new urls", name: "duplicate items with potentially new urls",
pagerItems: map[string][]models.DriveItemable{ pages: []api.NextPage[models.DriveItemable]{
driveID: { {Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false), fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("3", "file3", "root", "root", "https://dummy3.com", false), fileItem("3", "file3", "root", "root", "https://dummy3.com", false),
fileItem("1", "file1", "root", "root", "https://test1.com", false), fileItem("1", "file1", "root", "root", "https://test1.com", false),
fileItem("2", "file2", "root", "root", "https://test2.com", false), fileItem("2", "file2", "root", "root", "https://test2.com", false),
}, }},
}, },
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
"1": { "1": {
@ -285,21 +454,21 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
isDeleted: false, isDeleted: false,
}, },
}, },
expectedErr: require.NoError, expectErr: assert.NoError,
cacheAssert: func(uc *urlCache, startTime time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Greater(suite.T(), uc.lastRefreshTime, startTime) assert.Greater(t, uc.lastRefreshTime, startTime)
require.Equal(suite.T(), 1, uc.deltaQueryCount) assert.Equal(t, 1, uc.deltaQueryCount)
require.Equal(suite.T(), 3, len(uc.idToProps)) assert.Equal(t, 3, len(uc.idToProps))
}, },
}, },
{ {
name: "deleted items", name: "deleted items",
pagerItems: map[string][]models.DriveItemable{ pages: []api.NextPage[models.DriveItemable]{
driveID: { {Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
fileItem("2", "file2", "root", "root", "https://dummy2.com", false), fileItem("2", "file2", "root", "root", "https://dummy2.com", false),
fileItem("1", "file1", "root", "root", "https://dummy1.com", true), fileItem("1", "file1", "root", "root", "https://dummy1.com", true),
}, }},
}, },
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
"1": { "1": {
@ -311,111 +480,122 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
isDeleted: false, isDeleted: false,
}, },
}, },
expectedErr: require.NoError, expectErr: assert.NoError,
cacheAssert: func(uc *urlCache, startTime time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Greater(suite.T(), uc.lastRefreshTime, startTime) assert.Greater(t, uc.lastRefreshTime, startTime)
require.Equal(suite.T(), 1, uc.deltaQueryCount) assert.Equal(t, 1, uc.deltaQueryCount)
require.Equal(suite.T(), 2, len(uc.idToProps)) assert.Equal(t, 2, len(uc.idToProps))
}, },
}, },
{ {
name: "item not found in cache", name: "item not found in cache",
pagerItems: map[string][]models.DriveItemable{ pages: []api.NextPage[models.DriveItemable]{
driveID: {fileItem("1", "file1", "root", "root", "https://dummy1.com", false)}, {Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
}},
}, },
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
"2": {}, "2": {},
}, },
expectedErr: require.Error, expectErr: assert.Error,
cacheAssert: func(uc *urlCache, startTime time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Greater(suite.T(), uc.lastRefreshTime, startTime) assert.Greater(t, uc.lastRefreshTime, startTime)
require.Equal(suite.T(), 1, uc.deltaQueryCount) assert.Equal(t, 1, uc.deltaQueryCount)
require.Equal(suite.T(), 1, len(uc.idToProps)) assert.Equal(t, 1, len(uc.idToProps))
}, },
}, },
{ {
name: "delta query error", name: "delta query error",
pagerItems: map[string][]models.DriveItemable{}, pages: []api.NextPage[models.DriveItemable]{
pagerErr: map[string]error{ {Items: []models.DriveItemable{}},
driveID: errors.New("delta query error"),
}, },
pagerErr: errors.New("delta query error"),
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
"1": {}, "1": {},
"2": {}, "2": {},
}, },
expectedErr: require.Error, expectErr: assert.Error,
cacheAssert: func(uc *urlCache, _ time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Equal(suite.T(), time.Time{}, uc.lastRefreshTime) assert.Equal(t, time.Time{}, uc.lastRefreshTime)
require.Equal(suite.T(), 0, uc.deltaQueryCount) assert.NotZero(t, uc.deltaQueryCount)
require.Equal(suite.T(), 0, len(uc.idToProps)) assert.Equal(t, 0, len(uc.idToProps))
}, },
}, },
{ {
name: "folder item", name: "folder item",
pagerItems: map[string][]models.DriveItemable{ pages: []api.NextPage[models.DriveItemable]{
driveID: { {Items: []models.DriveItemable{
fileItem("1", "file1", "root", "root", "https://dummy1.com", false), fileItem("1", "file1", "root", "root", "https://dummy1.com", false),
driveItem("2", "folder2", "root", "root", false, true, false), driveItem("2", "folder2", "root", "root", false, true, false),
}, }},
}, },
expectedItemProps: map[string]itemProps{ expectedItemProps: map[string]itemProps{
"2": {}, "2": {},
}, },
expectedErr: require.Error, expectErr: assert.Error,
cacheAssert: func(uc *urlCache, startTime time.Time) { expect: func(t *testing.T, uc *urlCache, startTime time.Time) {
require.Greater(suite.T(), uc.lastRefreshTime, startTime) assert.Greater(t, uc.lastRefreshTime, startTime)
require.Equal(suite.T(), 1, uc.deltaQueryCount) assert.Equal(t, 1, uc.deltaQueryCount)
require.Equal(suite.T(), 1, len(uc.idToProps)) assert.Equal(t, 1, len(uc.idToProps))
}, },
}, },
} }
for _, test := range table { for _, test := range table {
suite.Run(test.name, func() { suite.Run(test.name, func() {
t := suite.T() for _, numConcurrentReqs := range []int{1, 2, 32} {
ctx, flush := tester.NewContext(t) crTestName := fmt.Sprintf("%d_concurrent_reqs", numConcurrentReqs)
defer flush() suite.Run(crTestName, func() {
t := suite.T()
medi := mock.EnumeratesDriveItemsDelta{ ctx, flush := tester.NewContext(t)
Items: test.pagerItems, defer flush()
Err: test.pagerErr,
DeltaUpdate: map[string]api.DeltaUpdate{driveID: {URL: deltaString}},
}
cache, err := newURLCache( medi := mock.EnumeratesDriveItemsDelta[models.DriveItemable]{
driveID, Pages: map[string][]api.NextPage[models.DriveItemable]{
"", driveID: test.pages,
1*time.Hour, },
&medi, Err: map[string]error{
fault.New(true)) driveID: test.pagerErr,
},
require.NoError(suite.T(), err, clues.ToCore(err)) DeltaUpdate: map[string]api.DeltaUpdate{
driveID: {URL: deltaString},
numConcurrentReq := 100 },
var wg sync.WaitGroup
wg.Add(numConcurrentReq)
startTime := time.Now()
for i := 0; i < numConcurrentReq; i++ {
go func() {
defer wg.Done()
for id, expected := range test.expectedItemProps {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
props, err := cache.getItemProperties(ctx, id)
test.expectedErr(suite.T(), err, clues.ToCore(err))
require.Equal(suite.T(), expected, props)
} }
}()
cache, err := newURLCache(
driveID,
"",
1*time.Hour,
&medi,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
var wg sync.WaitGroup
wg.Add(numConcurrentReqs)
startTime := time.Now()
for i := 0; i < numConcurrentReqs; i++ {
go func(ti int) {
defer wg.Done()
for id, expected := range test.expectedItemProps {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
props, err := cache.getItemProperties(ctx, id)
test.expectErr(t, err, clues.ToCore(err))
assert.Equal(t, expected, props)
}
}(i)
}
wg.Wait()
test.expect(t, cache, startTime)
})
} }
wg.Wait()
test.cacheAssert(cache, startTime)
}) })
} }
} }
@ -432,7 +612,7 @@ func (suite *URLCacheUnitSuite) TestNeedsRefresh() {
driveID, driveID,
"", "",
refreshInterval, refreshInterval,
&mock.EnumeratesDriveItemsDelta{}, &mock.EnumeratesDriveItemsDelta[models.DriveItemable]{},
fault.New(true)) fault.New(true))
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
@ -456,44 +636,44 @@ func (suite *URLCacheUnitSuite) TestNeedsRefresh() {
func (suite *URLCacheUnitSuite) TestNewURLCache() { func (suite *URLCacheUnitSuite) TestNewURLCache() {
table := []struct { table := []struct {
name string name string
driveID string driveID string
refreshInt time.Duration refreshInt time.Duration
itemPager EnumerateDriveItemsDeltaer itemPager EnumerateDriveItemsDeltaer
errors *fault.Bus errors *fault.Bus
expectedErr require.ErrorAssertionFunc expectErr require.ErrorAssertionFunc
}{ }{
{ {
name: "invalid driveID", name: "invalid driveID",
driveID: "", driveID: "",
refreshInt: 1 * time.Hour, refreshInt: 1 * time.Hour,
itemPager: &mock.EnumeratesDriveItemsDelta{}, itemPager: &mock.EnumeratesDriveItemsDelta[models.DriveItemable]{},
errors: fault.New(true), errors: fault.New(true),
expectedErr: require.Error, expectErr: require.Error,
}, },
{ {
name: "invalid refresh interval", name: "invalid refresh interval",
driveID: "drive1", driveID: "drive1",
refreshInt: 100 * time.Millisecond, refreshInt: 100 * time.Millisecond,
itemPager: &mock.EnumeratesDriveItemsDelta{}, itemPager: &mock.EnumeratesDriveItemsDelta[models.DriveItemable]{},
errors: fault.New(true), errors: fault.New(true),
expectedErr: require.Error, expectErr: require.Error,
}, },
{ {
name: "invalid item enumerator", name: "invalid item enumerator",
driveID: "drive1", driveID: "drive1",
refreshInt: 1 * time.Hour, refreshInt: 1 * time.Hour,
itemPager: nil, itemPager: nil,
errors: fault.New(true), errors: fault.New(true),
expectedErr: require.Error, expectErr: require.Error,
}, },
{ {
name: "valid", name: "valid",
driveID: "drive1", driveID: "drive1",
refreshInt: 1 * time.Hour, refreshInt: 1 * time.Hour,
itemPager: &mock.EnumeratesDriveItemsDelta{}, itemPager: &mock.EnumeratesDriveItemsDelta[models.DriveItemable]{},
errors: fault.New(true), errors: fault.New(true),
expectedErr: require.NoError, expectErr: require.NoError,
}, },
} }
@ -507,7 +687,7 @@ func (suite *URLCacheUnitSuite) TestNewURLCache() {
test.itemPager, test.itemPager,
test.errors) test.errors)
test.expectedErr(t, err, clues.ToCore(err)) test.expectErr(t, err, clues.ToCore(err))
}) })
} }
} }

View File

@ -14,18 +14,21 @@ import (
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/services/m365/api" "github.com/alcionai/corso/src/pkg/services/m365/api"
apiMock "github.com/alcionai/corso/src/pkg/services/m365/api/mock"
) )
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Backup Handler // Backup Handler
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
type BackupHandler struct { type BackupHandler[T any] struct {
ItemInfo details.ItemInfo ItemInfo details.ItemInfo
// FIXME: this is a hacky solution. Better to use an interface
// and plug in the selector scope there.
Sel selectors.Selector
DriveItemEnumeration EnumeratesDriveItemsDelta DriveItemEnumeration EnumeratesDriveItemsDelta[T]
GI GetsItem GI GetsItem
GIP GetsItemPermission GIP GetsItemPermission
@ -54,13 +57,17 @@ type BackupHandler struct {
GetErrs []error GetErrs []error
} }
func DefaultOneDriveBH(resourceOwner string) *BackupHandler { func DefaultOneDriveBH(resourceOwner string) *BackupHandler[models.DriveItemable] {
return &BackupHandler{ sel := selectors.NewOneDriveBackup([]string{resourceOwner})
sel.Include(sel.AllData())
return &BackupHandler[models.DriveItemable]{
ItemInfo: details.ItemInfo{ ItemInfo: details.ItemInfo{
OneDrive: &details.OneDriveInfo{}, OneDrive: &details.OneDriveInfo{},
Extension: &details.ExtensionData{}, Extension: &details.ExtensionData{},
}, },
DriveItemEnumeration: EnumeratesDriveItemsDelta{}, Sel: sel.Selector,
DriveItemEnumeration: EnumeratesDriveItemsDelta[models.DriveItemable]{},
GI: GetsItem{Err: clues.New("not defined")}, GI: GetsItem{Err: clues.New("not defined")},
GIP: GetsItemPermission{Err: clues.New("not defined")}, GIP: GetsItemPermission{Err: clues.New("not defined")},
PathPrefixFn: defaultOneDrivePathPrefixer, PathPrefixFn: defaultOneDrivePathPrefixer,
@ -75,12 +82,16 @@ func DefaultOneDriveBH(resourceOwner string) *BackupHandler {
} }
} }
func DefaultSharePointBH(resourceOwner string) *BackupHandler { func DefaultSharePointBH(resourceOwner string) *BackupHandler[models.DriveItemable] {
return &BackupHandler{ sel := selectors.NewOneDriveBackup([]string{resourceOwner})
sel.Include(sel.AllData())
return &BackupHandler[models.DriveItemable]{
ItemInfo: details.ItemInfo{ ItemInfo: details.ItemInfo{
SharePoint: &details.SharePointInfo{}, SharePoint: &details.SharePointInfo{},
Extension: &details.ExtensionData{}, Extension: &details.ExtensionData{},
}, },
Sel: sel.Selector,
GI: GetsItem{Err: clues.New("not defined")}, GI: GetsItem{Err: clues.New("not defined")},
GIP: GetsItemPermission{Err: clues.New("not defined")}, GIP: GetsItemPermission{Err: clues.New("not defined")},
PathPrefixFn: defaultSharePointPathPrefixer, PathPrefixFn: defaultSharePointPathPrefixer,
@ -95,8 +106,8 @@ func DefaultSharePointBH(resourceOwner string) *BackupHandler {
} }
} }
func (h BackupHandler) PathPrefix(tID, driveID string) (path.Path, error) { func (h BackupHandler[T]) PathPrefix(tID, driveID string) (path.Path, error) {
pp, err := h.PathPrefixFn(tID, h.ProtectedResource.ID(), driveID) pp, err := h.PathPrefixFn(tID, h.ResourceOwner, driveID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -104,8 +115,8 @@ func (h BackupHandler) PathPrefix(tID, driveID string) (path.Path, error) {
return pp, h.PathPrefixErr return pp, h.PathPrefixErr
} }
func (h BackupHandler) MetadataPathPrefix(tID string) (path.Path, error) { func (h BackupHandler[T]) MetadataPathPrefix(tID string) (path.Path, error) {
pp, err := h.MetadataPathPrefixFn(tID, h.ProtectedResource.ID()) pp, err := h.MetadataPathPrefixFn(tID, h.ResourceOwner)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -113,8 +124,8 @@ func (h BackupHandler) MetadataPathPrefix(tID string) (path.Path, error) {
return pp, h.PathPrefixErr return pp, h.PathPrefixErr
} }
func (h BackupHandler) CanonicalPath(pb *path.Builder, tID string) (path.Path, error) { func (h BackupHandler[T]) CanonicalPath(pb *path.Builder, tID string) (path.Path, error) {
cp, err := h.CanonPathFn(pb, tID, h.ProtectedResource.ID()) cp, err := h.CanonPathFn(pb, tID, h.ProtectedResource)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -122,19 +133,19 @@ func (h BackupHandler) CanonicalPath(pb *path.Builder, tID string) (path.Path, e
return cp, h.CanonPathErr return cp, h.CanonPathErr
} }
func (h BackupHandler) ServiceCat() (path.ServiceType, path.CategoryType) { func (h BackupHandler[T]) ServiceCat() (path.ServiceType, path.CategoryType) {
return h.Service, h.Category return h.Service, h.Category
} }
func (h BackupHandler) NewDrivePager(string, []string) api.Pager[models.Driveable] { func (h BackupHandler[T]) NewDrivePager(string, []string) api.Pager[models.Driveable] {
return h.DrivePagerV return h.DrivePagerV
} }
func (h BackupHandler) FormatDisplayPath(_ string, pb *path.Builder) string { func (h BackupHandler[T]) FormatDisplayPath(_ string, pb *path.Builder) string {
return "/" + pb.String() return "/" + pb.String()
} }
func (h BackupHandler) NewLocationIDer(driveID string, elems ...string) details.LocationIDer { func (h BackupHandler[T]) NewLocationIDer(driveID string, elems ...string) details.LocationIDer {
return h.LocationIDFn(driveID, elems...) return h.LocationIDFn(driveID, elems...)
} }
@ -148,7 +159,7 @@ func (h BackupHandler) AugmentItemInfo(
return h.ItemInfo return h.ItemInfo
} }
func (h *BackupHandler) Get(context.Context, string, map[string]string) (*http.Response, error) { func (h *BackupHandler[T]) Get(context.Context, string, map[string]string) (*http.Response, error) {
c := h.getCall c := h.getCall
h.getCall++ h.getCall++
@ -160,8 +171,9 @@ func (h *BackupHandler) Get(context.Context, string, map[string]string) (*http.R
return h.GetResps[c], h.GetErrs[c] return h.GetResps[c], h.GetErrs[c]
} }
func (h BackupHandler) EnumerateDriveItemsDelta( func (h BackupHandler[T]) EnumerateDriveItemsDelta(
ctx context.Context, ctx context.Context,
ch chan<- api.NextPage[T],
driveID, prevDeltaLink string, driveID, prevDeltaLink string,
selectProps []string, selectProps []string,
) ([]models.DriveItemable, api.DeltaUpdate, error) { ) ([]models.DriveItemable, api.DeltaUpdate, error) {
@ -172,11 +184,11 @@ func (h BackupHandler) EnumerateDriveItemsDelta(
selectProps) selectProps)
} }
func (h BackupHandler) GetItem(ctx context.Context, _, _ string) (models.DriveItemable, error) { func (h BackupHandler[T]) GetItem(ctx context.Context, _, _ string) (models.DriveItemable, error) {
return h.GI.GetItem(ctx, "", "") return h.GI.GetItem(ctx, "", "")
} }
func (h BackupHandler) GetItemPermission( func (h BackupHandler[T]) GetItemPermission(
ctx context.Context, ctx context.Context,
_, _ string, _, _ string,
) (models.PermissionCollectionResponseable, error) { ) (models.PermissionCollectionResponseable, error) {
@ -250,12 +262,16 @@ var defaultSharePointLocationIDer = func(driveID string, elems ...string) detail
return details.NewSharePointLocationIDer(driveID, elems...) return details.NewSharePointLocationIDer(driveID, elems...)
} }
func (h BackupHandler) IsAllPass() bool { func (h BackupHandler[T]) IsAllPass() bool {
return true scope := h.Sel.Includes[0]
return selectors.IsAnyTarget(selectors.SharePointScope(scope), selectors.SharePointLibraryFolder) ||
selectors.IsAnyTarget(selectors.OneDriveScope(scope), selectors.OneDriveFolder)
} }
func (h BackupHandler) IncludesDir(string) bool { func (h BackupHandler[T]) IncludesDir(dir string) bool {
return true scope := h.Sel.Includes[0]
return selectors.SharePointScope(scope).Matches(selectors.SharePointLibraryFolder, dir) ||
selectors.OneDriveScope(scope).Matches(selectors.OneDriveFolder, dir)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -278,56 +294,26 @@ func (m GetsItem) GetItem(
// Enumerates Drive Items // Enumerates Drive Items
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
type EnumeratesDriveItemsDelta struct { type EnumeratesDriveItemsDelta[T any] struct {
Items map[string][]models.DriveItemable Pages map[string][]api.NextPage[T]
DeltaUpdate map[string]api.DeltaUpdate DeltaUpdate map[string]api.DeltaUpdate
Err map[string]error Err map[string]error
} }
func (edi EnumeratesDriveItemsDelta) EnumerateDriveItemsDelta( func (edi EnumeratesDriveItemsDelta[T]) EnumerateDriveItemsDelta(
_ context.Context, _ context.Context,
ch chan<- api.NextPage[T],
driveID, _ string, driveID, _ string,
_ []string, _ []string,
) api.NextPageResulter[models.DriveItemable] { ) api.NextPageResulter[models.DriveItemable] {
return edi.Items[driveID], edi.DeltaUpdate[driveID], edi.Err[driveID] return edi.Items[driveID], edi.DeltaUpdate[driveID], edi.Err[driveID]
} }
func PagerResultToEDID( for _, page := range edi.Pages[driveID] {
m map[string][]apiMock.PagerResult[models.DriveItemable], ch <- page
) EnumeratesDriveItemsDelta {
edi := EnumeratesDriveItemsDelta{
Items: map[string][]models.DriveItemable{},
DeltaUpdate: map[string]api.DeltaUpdate{},
Err: map[string]error{},
} }
for driveID, results := range m { return edi.DeltaUpdate[driveID], edi.Err[driveID]
var (
err error
items = []models.DriveItemable{}
deltaUpdate api.DeltaUpdate
)
for _, pr := range results {
items = append(items, pr.Values...)
if pr.DeltaLink != nil {
deltaUpdate = api.DeltaUpdate{URL: ptr.Val(pr.DeltaLink)}
}
if pr.Err != nil {
err = pr.Err
}
deltaUpdate.Reset = deltaUpdate.Reset || pr.ResetDelta
}
edi.Items[driveID] = items
edi.Err[driveID] = err
edi.DeltaUpdate[driveID] = deltaUpdate
}
return edi
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------