Add download URL cache for onedrive items (#3459)

This PR introduces an URL cache for onedrive items. Integration with code and additional unit/integration tests will be added in follow up PRs to keep this PR short.

Why do we need an URL cache?
* This is a perf improvement for long running backups. 
* If the time difference between download URL fetch and content download is > 1 hour, the download will fail with a `401: Unauthorized` error. This is because JWT tokens attached with the download URLs are short lived (1 hour). 
* Currently we refresh download URLs after `401` with a per item `GET` . This should be optimized because it's an extra round trip for every item (to get the 401 and then renew).

How does URL cache help?
* URL cache will do a full delta re-enumeration when we hit a 401. It will be cheaper (in terms of Graph requests/tokens) vs calling GetItem for everything left. 
* It relies on lazy refresh. The URLs will only be refreshed once we hit a 401 and the client expliclty requests the cache for the download URL. Any following 401s will be served from the cache.
* 

**Cache flow**

```mermaid
flowchart TD
    A[Download content] -->|401 error| B(Fetch from URL cache)
    B --> |cache.getItemProperties| C{Cache needs refresh?}
    C -->|Yes| D{Refresh in progress?}
    D -->|Yes, Block| D
    C -->|No| E[Read from cache]
    D -->|No| M{Cache needs refresh? 2nd check}
    M -->|Yes|F[Delta Query]
    M -->|No| E
    F --> |success|H[Update cache]
    E --> |not found| I[Return error]
    E --> |item deleted| J[Return Deleted]
    F --> |failure|I
    I --> |fallback|L[GET item]
    H --> E 

```
---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [x] 🕐 Yes, but in a later PR
- [ ]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* https://github.com/alcionai/corso/issues/3069

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [ ]  Unit test
- [x] 💚 E2E
This commit is contained in:
Abhishek Pandey 2023-05-25 17:35:30 -07:00 committed by GitHub
parent 874e3c3a50
commit dae9a95c54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 431 additions and 4 deletions

View File

@ -96,10 +96,7 @@ type Collections struct {
resourceOwner string, resourceOwner string,
fields []string, fields []string,
) (api.DrivePager, error) ) (api.DrivePager, error)
itemPagerFunc func( itemPagerFunc driveItemPagerFunc
servicer graph.Servicer,
driveID, link string,
) itemPager
servicePathPfxFunc pathPrefixerFunc servicePathPfxFunc pathPrefixerFunc
// Track stats from drive enumeration. Represents the items backed up. // Track stats from drive enumeration. Represents the items backed up.

View File

@ -88,6 +88,11 @@ type itemCollector func(
errs *fault.Bus, errs *fault.Bus,
) error ) error
type driveItemPagerFunc func(
servicer graph.Servicer,
driveID, link string,
) itemPager
type itemPager interface { type itemPager interface {
GetPage(context.Context) (api.DeltaPageLinker, error) GetPage(context.Context) (api.DeltaPageLinker, error)
SetNext(nextLink string) SetNext(nextLink string)

View File

@ -0,0 +1,263 @@
package onedrive
import (
"context"
"sync"
"time"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
)
type itemProps struct {
downloadURL string
isDeleted bool
}
// urlCache caches download URLs for drive items
type urlCache struct {
driveID string
idToProps map[string]itemProps
lastRefreshTime time.Time
refreshInterval time.Duration
// cacheMu protects idToProps and lastRefreshTime
cacheMu sync.RWMutex
// refreshMu serializes cache refresh attempts by potential writers
refreshMu sync.Mutex
deltaQueryCount int
svc graph.Servicer
itemPagerFunc driveItemPagerFunc
errors *fault.Bus
}
// newURLache creates a new URL cache for the specified drive ID
func newURLCache(
driveID string,
refreshInterval time.Duration,
svc graph.Servicer,
errors *fault.Bus,
itemPagerFunc driveItemPagerFunc,
) (*urlCache, error) {
err := validateCacheParams(
driveID,
refreshInterval,
svc,
itemPagerFunc)
if err != nil {
return nil, clues.Wrap(err, "cache params")
}
return &urlCache{
idToProps: make(map[string]itemProps),
lastRefreshTime: time.Time{},
driveID: driveID,
refreshInterval: refreshInterval,
svc: svc,
itemPagerFunc: itemPagerFunc,
errors: errors,
},
nil
}
// validateCacheParams validates input params
func validateCacheParams(
driveID string,
refreshInterval time.Duration,
svc graph.Servicer,
itemPagerFunc driveItemPagerFunc,
) error {
if len(driveID) == 0 {
return clues.New("drive id is empty")
}
if refreshInterval <= 1*time.Second {
return clues.New("invalid refresh interval")
}
if svc == nil {
return clues.New("nil graph servicer")
}
if itemPagerFunc == nil {
return clues.New("nil item pager")
}
return nil
}
// getItemProps returns the item properties for the specified drive item ID
func (uc *urlCache) getItemProperties(
ctx context.Context,
itemID string,
) (itemProps, error) {
if len(itemID) == 0 {
return itemProps{}, clues.New("item id is empty")
}
ctx = clues.Add(ctx, "drive_id", uc.driveID)
// Lazy refresh
if uc.needsRefresh() {
err := uc.refreshCache(ctx)
if err != nil {
return itemProps{}, err
}
}
props, err := uc.readCache(ctx, itemID)
if err != nil {
return itemProps{}, err
}
return props, nil
}
// needsRefresh returns true if the cache is empty or if refresh interval has
// elapsed
func (uc *urlCache) needsRefresh() bool {
uc.cacheMu.RLock()
defer uc.cacheMu.RUnlock()
return len(uc.idToProps) == 0 ||
time.Since(uc.lastRefreshTime) > uc.refreshInterval
}
// refreshCache refreshes the URL cache by performing a delta query.
func (uc *urlCache) refreshCache(
ctx context.Context,
) error {
// Acquire mutex to prevent multiple threads from refreshing the
// cache at the same time
uc.refreshMu.Lock()
defer uc.refreshMu.Unlock()
// If the cache was refreshed by another thread while we were waiting
// to acquire mutex, return
if !uc.needsRefresh() {
return nil
}
// Hold cache lock in write mode for the entire duration of the refresh.
// This is to prevent other threads from reading the cache while it is
// being updated page by page
uc.cacheMu.Lock()
defer uc.cacheMu.Unlock()
// Issue a delta query to graph
logger.Ctx(ctx).Info("refreshing url cache")
err := uc.deltaQuery(ctx)
if err != nil {
return err
}
logger.Ctx(ctx).Info("url cache refreshed")
// Update last refresh time
uc.lastRefreshTime = time.Now()
return nil
}
// deltaQuery performs a delta query on the drive and update the cache
func (uc *urlCache) deltaQuery(
ctx context.Context,
) error {
logger.Ctx(ctx).Debug("starting delta query")
_, _, _, err := collectItems(
ctx,
uc.itemPagerFunc(uc.svc, uc.driveID, ""),
uc.driveID,
"",
uc.updateCache,
map[string]string{},
"",
uc.errors)
if err != nil {
return clues.Wrap(err, "delta query")
}
uc.deltaQueryCount++
return nil
}
// readCache returns the item properties for the specified item
func (uc *urlCache) readCache(
ctx context.Context,
itemID string,
) (itemProps, error) {
uc.cacheMu.RLock()
defer uc.cacheMu.RUnlock()
ctx = clues.Add(ctx, "item_id", itemID)
props, ok := uc.idToProps[itemID]
if !ok {
return itemProps{}, clues.New("item not found in cache").WithClues(ctx)
}
return props, nil
}
// updateCache consumes a slice of drive items and updates the url cache.
// It assumes that cacheMu is held by caller in write mode
func (uc *urlCache) updateCache(
ctx context.Context,
_, _ string,
items []models.DriveItemable,
_ map[string]string,
_ map[string]string,
_ map[string]struct{},
_ map[string]map[string]string,
_ bool,
errs *fault.Bus,
) error {
el := errs.Local()
for _, item := range items {
if el.Failure() != nil {
break
}
// Skip if not a file
if item.GetFile() == nil {
continue
}
var url string
for _, key := range downloadURLKeys {
tmp, ok := item.GetAdditionalData()[key].(*string)
if ok {
url = ptr.Val(tmp)
break
}
}
itemID := ptr.Val(item.GetId())
uc.idToProps[itemID] = itemProps{
downloadURL: url,
isDeleted: false,
}
// Mark deleted items in cache
if item.GetDeleted() != nil {
uc.idToProps[itemID] = itemProps{
downloadURL: "",
isDeleted: true,
}
}
}
return el.Failure()
}

View File

@ -0,0 +1,162 @@
package onedrive
import (
"net/http"
"sync"
"testing"
"time"
"github.com/alcionai/clues"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/dttm"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
type URLCacheIntegrationSuite struct {
tester.Suite
service graph.Servicer
user string
driveID string
}
func TestURLCacheIntegrationSuite(t *testing.T) {
suite.Run(t, &URLCacheIntegrationSuite{
Suite: tester.NewIntegrationSuite(
t,
[][]string{tester.M365AcctCredEnvs}),
})
}
func (suite *URLCacheIntegrationSuite) SetupSuite() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
suite.service = loadTestService(t)
suite.user = tester.SecondaryM365UserID(t)
pager, err := PagerForSource(OneDriveSource, suite.service, suite.user, nil)
require.NoError(t, err, clues.ToCore(err))
odDrives, err := api.GetAllDrives(ctx, pager, true, maxDrivesRetries)
require.NoError(t, err, clues.ToCore(err))
require.Greaterf(t, len(odDrives), 0, "user %s does not have a drive", suite.user)
suite.driveID = ptr.Val(odDrives[0].GetId())
}
// Basic test for urlCache. Create some files in onedrive, then access them via
// url cache
func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
svc := suite.service
driveID := suite.driveID
// Create a new test folder
root, err := svc.Client().Drives().ByDriveId(driveID).Root().Get(ctx, nil)
require.NoError(t, err, clues.ToCore(err))
newFolderName := tester.DefaultTestRestoreDestination("folder").ContainerName
newFolder, err := CreateItem(
ctx,
svc,
driveID,
ptr.Val(root.GetId()),
newItem(newFolderName, true))
require.NoError(t, err, clues.ToCore(err))
require.NotNil(t, newFolder.GetId())
// Delete folder on exit
defer func() {
ictx := clues.Add(ctx, "folder_id", ptr.Val(newFolder.GetId()))
err := api.DeleteDriveItem(
ictx,
loadTestService(t),
driveID,
ptr.Val(newFolder.GetId()))
if err != nil {
logger.CtxErr(ictx, err).Errorw("deleting folder")
}
}()
// Create a bunch of files in the new folder
var items []models.DriveItemable
for i := 0; i < 10; i++ {
newItemName := "testItem_" + dttm.FormatNow(dttm.SafeForTesting)
item, err := CreateItem(
ctx,
svc,
driveID,
ptr.Val(newFolder.GetId()),
newItem(newItemName, false))
if err != nil {
// Something bad happened, skip this item
continue
}
items = append(items, item)
}
// Create a new URL cache with a long TTL
cache, err := newURLCache(
suite.driveID,
1*time.Hour,
svc,
fault.New(true),
defaultItemPager)
require.NoError(t, err, clues.ToCore(err))
// Launch parallel requests to the cache, one per item
var wg sync.WaitGroup
for i := 0; i < len(items); i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
// Read item from URL cache
props, err := cache.getItemProperties(
ctx,
ptr.Val(items[i].GetId()))
require.NoError(t, err, clues.ToCore(err))
require.NotNil(t, props)
require.NotEmpty(t, props.downloadURL)
require.Equal(t, false, props.isDeleted)
// Validate download URL
c := graph.NewNoTimeoutHTTPWrapper()
resp, err := c.Request(
ctx,
http.MethodGet,
props.downloadURL,
nil,
nil)
require.NoError(t, err, clues.ToCore(err))
require.Equal(t, http.StatusOK, resp.StatusCode)
}(i)
}
wg.Wait()
// Validate that <= 1 delta queries were made
require.LessOrEqual(t, cache.deltaQueryCount, 1)
}