Integrate URL cache into onedrive backup flows (#3542)

Changes:

* Attach URL cache to a collection only if it's parent drive has < 300k items. Higher item count reduces the ROI of using cache, and can even degrade performance in severe cases. We will stick to per item GETs for those scenarios, until we figure out a longer term solution.
* 
---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* https://github.com/alcionai/corso/issues/3069

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Abhishek Pandey 2023-06-20 16:18:01 -07:00 committed by GitHub
parent 45a70fe1c4
commit 479f114514
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 295 additions and 18 deletions

View File

@ -84,6 +84,8 @@ type Collection struct {
// should only be true if the old delta token expired
doNotMergeItems bool
urlCache getItemPropertyer
}
func pathToLocation(p path.Path) (*path.Builder, error) {
@ -109,6 +111,7 @@ func NewCollection(
ctrlOpts control.Options,
colScope collectionScope,
doNotMergeItems bool,
urlCache getItemPropertyer,
) (*Collection, error) {
// TODO(ashmrtn): If OneDrive switches to using folder IDs then this will need
// to be changed as we won't be able to extract path information from the
@ -132,7 +135,8 @@ func NewCollection(
statusUpdater,
ctrlOpts,
colScope,
doNotMergeItems)
doNotMergeItems,
urlCache)
c.locPath = locPath
c.prevLocPath = prevLocPath
@ -149,6 +153,7 @@ func newColl(
ctrlOpts control.Options,
colScope collectionScope,
doNotMergeItems bool,
urlCache getItemPropertyer,
) *Collection {
c := &Collection{
handler: handler,
@ -162,6 +167,7 @@ func newColl(
state: data.StateOf(prevPath, currPath),
scope: colScope,
doNotMergeItems: doNotMergeItems,
urlCache: urlCache,
}
return c
@ -267,7 +273,7 @@ func (oc *Collection) getDriveItemContent(
el = errs.Local()
)
itemData, err := downloadContent(ctx, oc.handler, item, oc.driveID)
itemData, err := downloadContent(ctx, oc.handler, oc.urlCache, item, oc.driveID)
if err != nil {
if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) {
logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipMalware).Info("item flagged as malware")
@ -320,9 +326,13 @@ type itemAndAPIGetter interface {
func downloadContent(
ctx context.Context,
iaag itemAndAPIGetter,
uc getItemPropertyer,
item models.DriveItemable,
driveID string,
) (io.ReadCloser, error) {
itemID := ptr.Val(item.GetId())
ctx = clues.Add(ctx, "item_id", itemID)
content, err := downloadItem(ctx, iaag, item)
if err == nil {
return content, nil
@ -332,8 +342,19 @@ func downloadContent(
// Assume unauthorized requests are a sign of an expired jwt
// token, and that we've overrun the available window to
// download the actual file. Re-downloading the item will
// refresh that download url.
// download the file. Get a fresh url from the cache and attempt to
// download again.
content, err = readItemContents(ctx, iaag, uc, itemID)
if err == nil {
logger.Ctx(ctx).Debug("found item in url cache")
return content, nil
}
// Consider cache errors(including deleted items) as cache misses. This is
// to preserve existing behavior. Fallback to refetching the item using the
// API.
logger.CtxErr(ctx, err).Info("url cache miss: refetching from API")
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
if err != nil {
return nil, clues.Wrap(err, "retrieving expired item")
@ -347,6 +368,41 @@ func downloadContent(
return content, nil
}
// readItemContents fetches latest download URL from the cache and attempts to
// download the file using the new URL.
func readItemContents(
ctx context.Context,
iaag itemAndAPIGetter,
uc getItemPropertyer,
itemID string,
) (io.ReadCloser, error) {
if uc == nil {
return nil, clues.New("nil url cache")
}
props, err := uc.getItemProperties(ctx, itemID)
if err != nil {
return nil, err
}
// Handle newly deleted items
if props.isDeleted {
logger.Ctx(ctx).Info("item deleted in cache")
return nil, graph.ErrDeletedInFlight
}
rc, err := downloadFile(ctx, iaag, props.downloadURL)
if graph.IsErrUnauthorized(err) {
logger.CtxErr(ctx, err).Info("stale item in cache")
}
if err != nil {
return nil, err
}
return rc, nil
}
// populateItems iterates through items added to the collection
// and uses the collection `itemReader` to read the item
func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {

View File

@ -2,6 +2,7 @@ package onedrive
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
@ -204,7 +205,8 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
suite.testStatusUpdater(&wg, &collStatus),
control.Options{ToggleFeatures: control.Toggles{}},
CollectionScopeFolder,
true)
true,
nil)
require.NoError(t, err, clues.ToCore(err))
require.NotNil(t, coll)
assert.Equal(t, folderPath, coll.FullPath())
@ -312,7 +314,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() {
suite.testStatusUpdater(&wg, &collStatus),
control.Options{ToggleFeatures: control.Toggles{}},
CollectionScopeFolder,
true)
true,
nil)
require.NoError(t, err, clues.ToCore(err))
stubItem := odTD.NewStubDriveItem(
@ -388,7 +391,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadUnauthorizedErrorRetry()
suite.testStatusUpdater(&wg, &collStatus),
control.Options{ToggleFeatures: control.Toggles{}},
CollectionScopeFolder,
true)
true,
nil)
require.NoError(t, err, clues.ToCore(err))
coll.Add(stubItem)
@ -442,7 +446,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTim
suite.testStatusUpdater(&wg, &collStatus),
control.Options{ToggleFeatures: control.Toggles{}},
CollectionScopeFolder,
true)
true,
nil)
require.NoError(t, err, clues.ToCore(err))
mtime := time.Now().AddDate(0, -1, 0)
@ -600,6 +605,19 @@ func (suite *GetDriveItemUnitTestSuite) TestGetDriveItem_error() {
}
}
var _ getItemPropertyer = &mockURLCache{}
type mockURLCache struct {
Get func(ctx context.Context, itemID string) (itemProps, error)
}
func (muc *mockURLCache) getItemProperties(
ctx context.Context,
itemID string,
) (itemProps, error) {
return muc.Get(ctx, itemID)
}
func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
var (
driveID string
@ -611,6 +629,12 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
itemWID.SetId(ptr.To("brainhooldy"))
m := &mockURLCache{
Get: func(ctx context.Context, itemID string) (itemProps, error) {
return itemProps{}, clues.Stack(assert.AnError)
},
}
table := []struct {
name string
mgi mock.GetsItem
@ -619,6 +643,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
getErr []error
expectErr require.ErrorAssertionFunc
expect require.ValueAssertionFunc
muc *mockURLCache
}{
{
name: "good",
@ -627,6 +652,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
getErr: []error{nil},
expectErr: require.NoError,
expect: require.NotNil,
muc: m,
},
{
name: "expired url redownloads",
@ -636,6 +662,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
getErr: []error{errUnauth, nil},
expectErr: require.NoError,
expect: require.NotNil,
muc: m,
},
{
name: "immediate error",
@ -643,6 +670,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
getErr: []error{assert.AnError},
expectErr: require.Error,
expect: require.Nil,
muc: m,
},
{
name: "re-fetching the item fails",
@ -651,6 +679,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
mgi: mock.GetsItem{Item: nil, Err: assert.AnError},
expectErr: require.Error,
expect: require.Nil,
muc: m,
},
{
name: "expired url fails redownload",
@ -660,6 +689,57 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
getErr: []error{errUnauth, assert.AnError},
expectErr: require.Error,
expect: require.Nil,
muc: m,
},
{
name: "url refreshed from cache",
mgi: mock.GetsItem{Item: itemWID, Err: nil},
itemInfo: details.ItemInfo{},
respBody: []io.ReadCloser{nil, iorc},
getErr: []error{errUnauth, nil},
expectErr: require.NoError,
expect: require.NotNil,
muc: &mockURLCache{
Get: func(ctx context.Context, itemID string) (itemProps, error) {
return itemProps{
downloadURL: "http://example.com",
isDeleted: false,
},
nil
},
},
},
{
name: "url refreshed from cache but item deleted",
mgi: mock.GetsItem{Item: itemWID, Err: graph.ErrDeletedInFlight},
itemInfo: details.ItemInfo{},
respBody: []io.ReadCloser{nil, nil, nil},
getErr: []error{errUnauth, graph.ErrDeletedInFlight, graph.ErrDeletedInFlight},
expectErr: require.Error,
expect: require.Nil,
muc: &mockURLCache{
Get: func(ctx context.Context, itemID string) (itemProps, error) {
return itemProps{
downloadURL: "http://example.com",
isDeleted: true,
},
nil
},
},
},
{
name: "fallback to item fetch on any cache error",
mgi: mock.GetsItem{Item: itemWID, Err: nil},
itemInfo: details.ItemInfo{},
respBody: []io.ReadCloser{nil, iorc},
getErr: []error{errUnauth, nil},
expectErr: require.NoError,
expect: require.NotNil,
muc: &mockURLCache{
Get: func(ctx context.Context, itemID string) (itemProps, error) {
return itemProps{}, assert.AnError
},
},
},
}
for _, test := range table {
@ -685,7 +765,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
mbh.GetResps = resps
mbh.GetErrs = test.getErr
r, err := downloadContent(ctx, mbh, item, driveID)
r, err := downloadContent(ctx, mbh, test.muc, item, driveID)
test.expect(t, r)
test.expectErr(t, err, clues.ToCore(err))
})

View File

@ -256,6 +256,7 @@ func (c *Collections) Get(
deltaURLs = map[string]string{}
// Drive ID -> folder ID -> folder path
folderPaths = map[string]map[string]string{}
numPrevItems = 0
)
for _, d := range drives {
@ -370,13 +371,30 @@ func (c *Collections) Get(
c.statusUpdater,
c.ctrl,
CollectionScopeUnknown,
true)
true,
nil)
if err != nil {
return nil, false, clues.Wrap(err, "making collection").WithClues(ictx)
}
c.CollectionMap[driveID][fldID] = col
}
numDriveItems := c.NumItems - numPrevItems
numPrevItems = c.NumItems
// Only create a drive cache if there are less than 300k items in the drive.
if numDriveItems < urlCacheDriveItemThreshold {
logger.Ctx(ictx).Info("adding url cache for drive")
err = c.addURLCacheToDriveCollections(
ictx,
driveID,
errs)
if err != nil {
return nil, false, err
}
}
}
observe.Message(ctx, fmt.Sprintf("Discovered %d items to backup", c.NumItems))
@ -405,7 +423,8 @@ func (c *Collections) Get(
c.statusUpdater,
c.ctrl,
CollectionScopeUnknown,
true)
true,
nil)
if err != nil {
return nil, false, clues.Wrap(err, "making drive tombstone").WithClues(ctx)
}
@ -438,6 +457,32 @@ func (c *Collections) Get(
return collections, canUsePreviousBackup, nil
}
// addURLCacheToDriveCollections adds an URL cache to all collections belonging to
// a drive.
func (c *Collections) addURLCacheToDriveCollections(
ctx context.Context,
driveID string,
errs *fault.Bus,
) error {
uc, err := newURLCache(
driveID,
urlCacheRefreshInterval,
c.handler.NewItemPager(driveID, "", api.DriveItemSelectDefault()),
errs)
if err != nil {
return err
}
// Set the URL cache for all collections in this drive
for _, driveColls := range c.CollectionMap {
for _, coll := range driveColls {
coll.urlCache = uc
}
}
return nil
}
func updateCollectionPaths(
driveID, itemID string,
cmap map[string]map[string]*Collection,
@ -557,7 +602,8 @@ func (c *Collections) handleDelete(
c.ctrl,
CollectionScopeUnknown,
// DoNotMerge is not checked for deleted items.
false)
false,
nil)
if err != nil {
return clues.Wrap(err, "making collection").With(
"drive_id", driveID,
@ -740,7 +786,8 @@ func (c *Collections) UpdateCollections(
c.statusUpdater,
c.ctrl,
colScope,
invalidPrevDelta)
invalidPrevDelta,
nil)
if err != nil {
return clues.Stack(err).WithClues(ictx)
}

View File

@ -2,6 +2,7 @@ package onedrive
import (
"context"
"strconv"
"testing"
"github.com/alcionai/clues"
@ -2678,3 +2679,85 @@ func (suite *OneDriveCollectionsUnitSuite) TestCollectItems() {
})
}
}
func (suite *OneDriveCollectionsUnitSuite) TestAddURLCacheToDriveCollections() {
driveID := "test-drive"
collCount := 3
anyFolder := (&selectors.OneDriveBackup{}).Folders(selectors.Any())[0]
table := []struct {
name string
items []deltaPagerResult
deltaURL string
prevDeltaSuccess bool
prevDelta string
err error
}{
{
name: "cache is attached",
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
itemPagers := map[string]api.DriveItemEnumerator{}
itemPagers[driveID] = &mockItemPager{}
mbh := mock.DefaultOneDriveBH()
mbh.ItemPagerV = itemPagers
c := NewCollections(
mbh,
"test-tenant",
"test-user",
nil,
control.Options{ToggleFeatures: control.Toggles{}})
if _, ok := c.CollectionMap[driveID]; !ok {
c.CollectionMap[driveID] = map[string]*Collection{}
}
// Add a few collections
for i := 0; i < collCount; i++ {
coll, err := NewCollection(
&itemBackupHandler{api.Drives{}, anyFolder},
nil,
nil,
driveID,
nil,
control.Options{ToggleFeatures: control.Toggles{}},
CollectionScopeFolder,
true,
nil)
require.NoError(t, err, clues.ToCore(err))
c.CollectionMap[driveID][strconv.Itoa(i)] = coll
require.Equal(t, nil, coll.urlCache, "cache not nil")
}
err := c.addURLCacheToDriveCollections(
ctx,
driveID,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
// Check that all collections have the same cache instance attached
// to them
var uc *urlCache
for _, driveColls := range c.CollectionMap {
for _, coll := range driveColls {
require.NotNil(t, coll.urlCache, "cache is nil")
if uc == nil {
uc = coll.urlCache.(*urlCache)
} else {
require.Equal(t, uc, coll.urlCache, "cache not equal")
}
}
}
})
}
}

View File

@ -77,7 +77,7 @@ func downloadFile(
return nil, clues.New("malware detected").Label(graph.LabelsMalware)
}
if (resp.StatusCode / 100) != 2 {
if resp != nil && (resp.StatusCode/100) != 2 {
// upstream error checks can compare the status with
// clues.HasLabel(err, graph.LabelStatus(http.KnownStatusCode))
return nil, clues.

View File

@ -15,11 +15,25 @@ import (
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
const (
urlCacheDriveItemThreshold = 300 * 1000
urlCacheRefreshInterval = 1 * time.Hour
)
type getItemPropertyer interface {
getItemProperties(
ctx context.Context,
itemID string,
) (itemProps, error)
}
type itemProps struct {
downloadURL string
isDeleted bool
}
var _ getItemPropertyer = &urlCache{}
// urlCache caches download URLs for drive items
type urlCache struct {
driveID string

View File

@ -118,9 +118,6 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
require.NoError(t, err, clues.ToCore(err))
err = cache.refreshCache(ctx)
require.NoError(t, err, clues.ToCore(err))
// Launch parallel requests to the cache, one per item
var wg sync.WaitGroup
for i := 0; i < len(items); i++ {