Refresh URL cache using previous delta token (#3649)
<!-- PR description--> Currently we do full delta enumeration during cache refresh events. It has some disadvantages as listed in https://github.com/alcionai/corso/issues/3482 . This PR allows us to instantiate the URL cache with a delta token. This delta token is identical to the one used during onedrive item enumeration. The URL cache might do multiple delta queries(once every hour) using the same delta token. --- #### Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [x] 🕐 Yes, but in a later PR - [ ] ⛔ No #### Type of change <!--- Please check the type of change your PR introduces: ---> - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Supportability/Tests - [ ] 💻 CI/Deployment - [ ] 🧹 Tech Debt/Cleanup #### Issue(s) * https://github.com/alcionai/corso/issues/3482 #### Test Plan <!-- How will this be tested prior to merging.--> - [ ] 💪 Manual - [ ] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
3bcc405327
commit
c8ae50cb2e
@ -323,6 +323,23 @@ func (c *Collections) Get(
|
|||||||
"num_deltas_entries", numDeltas,
|
"num_deltas_entries", numDeltas,
|
||||||
"delta_reset", delta.Reset)
|
"delta_reset", delta.Reset)
|
||||||
|
|
||||||
|
numDriveItems := c.NumItems - numPrevItems
|
||||||
|
numPrevItems = c.NumItems
|
||||||
|
|
||||||
|
// Attach an url cache
|
||||||
|
if numDriveItems < urlCacheDriveItemThreshold {
|
||||||
|
logger.Ctx(ictx).Info("adding url cache for drive")
|
||||||
|
|
||||||
|
err = c.addURLCacheToDriveCollections(
|
||||||
|
ictx,
|
||||||
|
driveID,
|
||||||
|
prevDelta,
|
||||||
|
errs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// For both cases we don't need to do set difference on folder map if the
|
// For both cases we don't need to do set difference on folder map if the
|
||||||
// delta token was valid because we should see all the changes.
|
// delta token was valid because we should see all the changes.
|
||||||
if !delta.Reset {
|
if !delta.Reset {
|
||||||
@ -379,22 +396,6 @@ func (c *Collections) Get(
|
|||||||
|
|
||||||
c.CollectionMap[driveID][fldID] = col
|
c.CollectionMap[driveID][fldID] = col
|
||||||
}
|
}
|
||||||
|
|
||||||
numDriveItems := c.NumItems - numPrevItems
|
|
||||||
numPrevItems = c.NumItems
|
|
||||||
|
|
||||||
// Only create a drive cache if there are less than 300k items in the drive.
|
|
||||||
if numDriveItems < urlCacheDriveItemThreshold {
|
|
||||||
logger.Ctx(ictx).Info("adding url cache for drive")
|
|
||||||
|
|
||||||
err = c.addURLCacheToDriveCollections(
|
|
||||||
ictx,
|
|
||||||
driveID,
|
|
||||||
errs)
|
|
||||||
if err != nil {
|
|
||||||
return nil, false, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
observe.Message(ctx, fmt.Sprintf("Discovered %d items to backup", c.NumItems))
|
observe.Message(ctx, fmt.Sprintf("Discovered %d items to backup", c.NumItems))
|
||||||
@ -461,11 +462,12 @@ func (c *Collections) Get(
|
|||||||
// a drive.
|
// a drive.
|
||||||
func (c *Collections) addURLCacheToDriveCollections(
|
func (c *Collections) addURLCacheToDriveCollections(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
driveID string,
|
driveID, prevDelta string,
|
||||||
errs *fault.Bus,
|
errs *fault.Bus,
|
||||||
) error {
|
) error {
|
||||||
uc, err := newURLCache(
|
uc, err := newURLCache(
|
||||||
driveID,
|
driveID,
|
||||||
|
prevDelta,
|
||||||
urlCacheRefreshInterval,
|
urlCacheRefreshInterval,
|
||||||
c.handler.NewItemPager(driveID, "", api.DriveItemSelectDefault()),
|
c.handler.NewItemPager(driveID, "", api.DriveItemSelectDefault()),
|
||||||
errs)
|
errs)
|
||||||
|
|||||||
@ -2742,6 +2742,7 @@ func (suite *OneDriveCollectionsUnitSuite) TestAddURLCacheToDriveCollections() {
|
|||||||
err := c.addURLCacheToDriveCollections(
|
err := c.addURLCacheToDriveCollections(
|
||||||
ctx,
|
ctx,
|
||||||
driveID,
|
driveID,
|
||||||
|
"",
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
|
|||||||
@ -37,6 +37,7 @@ var _ getItemPropertyer = &urlCache{}
|
|||||||
// urlCache caches download URLs for drive items
|
// urlCache caches download URLs for drive items
|
||||||
type urlCache struct {
|
type urlCache struct {
|
||||||
driveID string
|
driveID string
|
||||||
|
prevDelta string
|
||||||
idToProps map[string]itemProps
|
idToProps map[string]itemProps
|
||||||
lastRefreshTime time.Time
|
lastRefreshTime time.Time
|
||||||
refreshInterval time.Duration
|
refreshInterval time.Duration
|
||||||
@ -53,7 +54,7 @@ type urlCache struct {
|
|||||||
|
|
||||||
// newURLache creates a new URL cache for the specified drive ID
|
// newURLache creates a new URL cache for the specified drive ID
|
||||||
func newURLCache(
|
func newURLCache(
|
||||||
driveID string,
|
driveID, prevDelta string,
|
||||||
refreshInterval time.Duration,
|
refreshInterval time.Duration,
|
||||||
itemPager api.DriveItemEnumerator,
|
itemPager api.DriveItemEnumerator,
|
||||||
errs *fault.Bus,
|
errs *fault.Bus,
|
||||||
@ -70,6 +71,7 @@ func newURLCache(
|
|||||||
idToProps: make(map[string]itemProps),
|
idToProps: make(map[string]itemProps),
|
||||||
lastRefreshTime: time.Time{},
|
lastRefreshTime: time.Time{},
|
||||||
driveID: driveID,
|
driveID: driveID,
|
||||||
|
prevDelta: prevDelta,
|
||||||
refreshInterval: refreshInterval,
|
refreshInterval: refreshInterval,
|
||||||
itemPager: itemPager,
|
itemPager: itemPager,
|
||||||
errs: errs,
|
errs: errs,
|
||||||
@ -179,6 +181,8 @@ func (uc *urlCache) deltaQuery(
|
|||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
) error {
|
) error {
|
||||||
logger.Ctx(ctx).Debug("starting delta query")
|
logger.Ctx(ctx).Debug("starting delta query")
|
||||||
|
// Reset item pager to remove any previous state
|
||||||
|
uc.itemPager.Reset()
|
||||||
|
|
||||||
_, _, _, err := collectItems(
|
_, _, _, err := collectItems(
|
||||||
ctx,
|
ctx,
|
||||||
@ -187,7 +191,7 @@ func (uc *urlCache) deltaQuery(
|
|||||||
"",
|
"",
|
||||||
uc.updateCache,
|
uc.updateCache,
|
||||||
map[string]string{},
|
map[string]string{},
|
||||||
"",
|
uc.prevDelta,
|
||||||
uc.errs)
|
uc.errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return clues.Wrap(err, "delta query")
|
return clues.Wrap(err, "delta query")
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package onedrive
|
package onedrive
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -89,10 +90,38 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
|
|||||||
|
|
||||||
nfid := ptr.Val(newFolder.GetId())
|
nfid := ptr.Val(newFolder.GetId())
|
||||||
|
|
||||||
|
collectorFunc := func(
|
||||||
|
context.Context,
|
||||||
|
string,
|
||||||
|
string,
|
||||||
|
[]models.DriveItemable,
|
||||||
|
map[string]string,
|
||||||
|
map[string]string,
|
||||||
|
map[string]struct{},
|
||||||
|
map[string]map[string]string,
|
||||||
|
bool,
|
||||||
|
*fault.Bus,
|
||||||
|
) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the previous delta to feed into url cache
|
||||||
|
prevDelta, _, _, err := collectItems(
|
||||||
|
ctx,
|
||||||
|
suite.ac.Drives().NewItemPager(driveID, "", api.DriveItemSelectDefault()),
|
||||||
|
suite.driveID,
|
||||||
|
"drive-name",
|
||||||
|
collectorFunc,
|
||||||
|
map[string]string{},
|
||||||
|
"",
|
||||||
|
fault.New(true))
|
||||||
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
require.NotNil(t, prevDelta.URL)
|
||||||
|
|
||||||
// Create a bunch of files in the new folder
|
// Create a bunch of files in the new folder
|
||||||
var items []models.DriveItemable
|
var items []models.DriveItemable
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
newItemName := "test_url_cache_basic_" + dttm.FormatNow(dttm.SafeForTesting)
|
newItemName := "test_url_cache_basic_" + dttm.FormatNow(dttm.SafeForTesting)
|
||||||
|
|
||||||
item, err := ac.Drives().PostItemInContainer(
|
item, err := ac.Drives().PostItemInContainer(
|
||||||
@ -110,12 +139,12 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create a new URL cache with a long TTL
|
// Create a new URL cache with a long TTL
|
||||||
cache, err := newURLCache(
|
uc, err := newURLCache(
|
||||||
suite.driveID,
|
suite.driveID,
|
||||||
|
prevDelta.URL,
|
||||||
1*time.Hour,
|
1*time.Hour,
|
||||||
driveItemPager,
|
driveItemPager,
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
|
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
// Launch parallel requests to the cache, one per item
|
// Launch parallel requests to the cache, one per item
|
||||||
@ -127,11 +156,11 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
// Read item from URL cache
|
// Read item from URL cache
|
||||||
props, err := cache.getItemProperties(
|
props, err := uc.getItemProperties(
|
||||||
ctx,
|
ctx,
|
||||||
ptr.Val(items[i].GetId()))
|
ptr.Val(items[i].GetId()))
|
||||||
|
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
require.NotNil(t, props)
|
require.NotNil(t, props)
|
||||||
require.NotEmpty(t, props.downloadURL)
|
require.NotEmpty(t, props.downloadURL)
|
||||||
require.Equal(t, false, props.isDeleted)
|
require.Equal(t, false, props.isDeleted)
|
||||||
@ -145,15 +174,14 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
|
|||||||
props.downloadURL,
|
props.downloadURL,
|
||||||
nil,
|
nil,
|
||||||
nil)
|
nil)
|
||||||
|
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
require.Equal(t, http.StatusOK, resp.StatusCode)
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
// Validate that <= 1 delta queries were made
|
// Validate that <= 1 delta queries were made by url cache
|
||||||
require.LessOrEqual(t, cache.deltaQueryCount, 1)
|
require.LessOrEqual(t, uc.deltaQueryCount, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
type URLCacheUnitSuite struct {
|
type URLCacheUnitSuite struct {
|
||||||
@ -404,6 +432,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() {
|
|||||||
|
|
||||||
cache, err := newURLCache(
|
cache, err := newURLCache(
|
||||||
driveID,
|
driveID,
|
||||||
|
"",
|
||||||
1*time.Hour,
|
1*time.Hour,
|
||||||
itemPager,
|
itemPager,
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
@ -446,6 +475,7 @@ func (suite *URLCacheUnitSuite) TestNeedsRefresh() {
|
|||||||
|
|
||||||
cache, err := newURLCache(
|
cache, err := newURLCache(
|
||||||
driveID,
|
driveID,
|
||||||
|
"",
|
||||||
refreshInterval,
|
refreshInterval,
|
||||||
&mockItemPager{},
|
&mockItemPager{},
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
@ -519,6 +549,7 @@ func (suite *URLCacheUnitSuite) TestNewURLCache() {
|
|||||||
t := suite.T()
|
t := suite.T()
|
||||||
_, err := newURLCache(
|
_, err := newURLCache(
|
||||||
test.driveID,
|
test.driveID,
|
||||||
|
"",
|
||||||
test.refreshInt,
|
test.refreshInt,
|
||||||
test.itemPager,
|
test.itemPager,
|
||||||
test.errors)
|
test.errors)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user