Compare commits

...

12 Commits

Author SHA1 Message Date
Abhishek Pandey
ed6d1066ed Add reset 2023-06-20 14:25:20 -07:00
Abhishek Pandey
1d1603e0ad Add prev delta 2023-06-20 14:25:01 -07:00
Abhishek Pandey
671fd5150d Cherry pick cache integration changes with dummy url cache 2023-06-20 14:00:52 -07:00
Abhishek Pandey
b68336a466 Address review feedback 2023-06-20 13:53:15 -07:00
Abhishek Pandey
099651491d Fix rebase issues 2023-06-20 13:53:15 -07:00
Abhishek Pandey
397a0a6aa3 Remove dummy url cache 2023-06-20 13:53:15 -07:00
Abhishek Pandey
34e1f393cb Add soem more tests 2023-06-20 13:52:57 -07:00
Abhishek Pandey
30e9439287 Utilize mock backup handler 2023-06-20 13:52:40 -07:00
Abhishek Pandey
c13f70ecc7 Add more tests 2023-06-20 13:52:40 -07:00
Abhishek Pandey
01bb562bb5 Misc changes - add urlCacher interface, add some tests for cache integration 2023-06-20 13:52:40 -07:00
Abhishek Pandey
c62bcc6d95 Remove drive ID to url cache map from Collections. Unused 2023-06-20 13:52:03 -07:00
Abhishek Pandey
9bd1b5588a Cherry pick cache integration changes with dummy url cache 2023-06-20 13:52:03 -07:00
7 changed files with 325 additions and 21 deletions

View File

@ -84,6 +84,8 @@ type Collection struct {
// should only be true if the old delta token expired
doNotMergeItems bool
urlCache getItemPropertyer
}
func pathToLocation(p path.Path) (*path.Builder, error) {
@ -109,6 +111,7 @@ func NewCollection(
ctrlOpts control.Options,
colScope collectionScope,
doNotMergeItems bool,
urlCache getItemPropertyer,
) (*Collection, error) {
// TODO(ashmrtn): If OneDrive switches to using folder IDs then this will need
// to be changed as we won't be able to extract path information from the
@ -132,7 +135,8 @@ func NewCollection(
statusUpdater,
ctrlOpts,
colScope,
doNotMergeItems)
doNotMergeItems,
urlCache)
c.locPath = locPath
c.prevLocPath = prevLocPath
@ -149,6 +153,7 @@ func newColl(
ctrlOpts control.Options,
colScope collectionScope,
doNotMergeItems bool,
urlCache getItemPropertyer,
) *Collection {
c := &Collection{
handler: handler,
@ -162,6 +167,7 @@ func newColl(
state: data.StateOf(prevPath, currPath),
scope: colScope,
doNotMergeItems: doNotMergeItems,
urlCache: urlCache,
}
return c
@ -267,7 +273,7 @@ func (oc *Collection) getDriveItemContent(
el = errs.Local()
)
itemData, err := downloadContent(ctx, oc.handler, item, oc.driveID)
itemData, err := downloadContent(ctx, oc.handler, oc.urlCache, item, oc.driveID)
if err != nil {
if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) {
logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipMalware).Info("item flagged as malware")
@ -317,12 +323,16 @@ type itemAndAPIGetter interface {
// downloadContent attempts to fetch the item content. If the content url
// is expired (ie, returns a 401), it re-fetches the item to get a new download
// url and tries again.
func downloadContent(
func (oc *Collection) downloadContent(
ctx context.Context,
iaag itemAndAPIGetter,
uc getItemPropertyer,
item models.DriveItemable,
driveID string,
) (io.ReadCloser, error) {
itemID := ptr.Val(item.GetId())
ctx = clues.Add(ctx, "item_id", itemID)
content, err := downloadItem(ctx, iaag, item)
if err == nil {
return content, nil
@ -332,8 +342,20 @@ func downloadContent(
// Assume unauthorized requests are a sign of an expired jwt
// token, and that we've overrun the available window to
// download the actual file. Re-downloading the item will
// refresh that download url.
// download the file. Get a fresh url from the cache and attempt to
// download again.
content, err = readItemContents(ctx, iaag, uc, itemID)
if err == nil {
logger.Ctx(ctx).Debug("found item in url cache")
return content, nil
}
// Consider cache errors(including deleted items) as cache misses. This is
// to preserve existing behavior. Fallback to refetching the item using the
// API.
logger.CtxErr(ctx, err).Info("url cache miss: refetching from API")
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
if err != nil {
return nil, clues.Wrap(err, "retrieving expired item")
@ -347,6 +369,42 @@ func downloadContent(
return content, nil
}
// readItemContents fetches latest download URL from the cache and attempts to
// download the file using the new URL.
func readItemContents(
ctx context.Context,
iaag itemAndAPIGetter,
uc getItemPropertyer,
itemID string,
) (io.ReadCloser, error) {
if uc == nil {
return nil, clues.New("nil url cache")
}
props, err := uc.getItemProperties(ctx, itemID)
if err != nil {
return nil, err
}
// Handle newly deleted items
if props.isDeleted {
logger.Ctx(ctx).Info("item deleted in cache")
return nil, graph.ErrDeletedInFlight
}
rc, err := downloadFile(ctx, iaag, props.downloadURL)
if graph.IsErrUnauthorized(err) {
logger.CtxErr(ctx, err).Info("stale item in cache")
}
if err != nil {
return nil, err
}
logger.CtxErr(ctx, err).Info("item served from url cache")
return rc, nil
}
// populateItems iterates through items added to the collection
// and uses the collection `itemReader` to read the item
func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {

View File

@ -2,6 +2,7 @@ package onedrive
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
@ -204,7 +205,10 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
suite.testStatusUpdater(&wg, &collStatus),
control.Options{ToggleFeatures: control.Toggles{}},
CollectionScopeFolder,
true)
true,
nil)
true,
nil)
require.NoError(t, err, clues.ToCore(err))
require.NotNil(t, coll)
assert.Equal(t, folderPath, coll.FullPath())
@ -312,7 +316,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() {
suite.testStatusUpdater(&wg, &collStatus),
control.Options{ToggleFeatures: control.Toggles{}},
CollectionScopeFolder,
true)
true,
nil)
require.NoError(t, err, clues.ToCore(err))
stubItem := odTD.NewStubDriveItem(
@ -388,7 +393,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadUnauthorizedErrorRetry()
suite.testStatusUpdater(&wg, &collStatus),
control.Options{ToggleFeatures: control.Toggles{}},
CollectionScopeFolder,
true)
true,
nil)
require.NoError(t, err, clues.ToCore(err))
coll.Add(stubItem)
@ -442,7 +448,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTim
suite.testStatusUpdater(&wg, &collStatus),
control.Options{ToggleFeatures: control.Toggles{}},
CollectionScopeFolder,
true)
true,
nil)
require.NoError(t, err, clues.ToCore(err))
mtime := time.Now().AddDate(0, -1, 0)
@ -600,6 +607,19 @@ func (suite *GetDriveItemUnitTestSuite) TestGetDriveItem_error() {
}
}
var _ getItemPropertyer = &mockURLCache{}
type mockURLCache struct {
Get func(ctx context.Context, itemID string) (itemProps, error)
}
func (muc *mockURLCache) getItemProperties(
ctx context.Context,
itemID string,
) (itemProps, error) {
return muc.Get(ctx, itemID)
}
func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
var (
driveID string
@ -611,6 +631,12 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
itemWID.SetId(ptr.To("brainhooldy"))
m := &mockURLCache{
Get: func(ctx context.Context, itemID string) (itemProps, error) {
return itemProps{}, clues.Stack(assert.AnError)
},
}
table := []struct {
name string
mgi mock.GetsItem
@ -619,6 +645,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
getErr []error
expectErr require.ErrorAssertionFunc
expect require.ValueAssertionFunc
muc *mockURLCache
}{
{
name: "good",
@ -627,6 +654,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
getErr: []error{nil},
expectErr: require.NoError,
expect: require.NotNil,
muc: m,
},
{
name: "expired url redownloads",
@ -636,6 +664,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
getErr: []error{errUnauth, nil},
expectErr: require.NoError,
expect: require.NotNil,
muc: m,
},
{
name: "immediate error",
@ -643,6 +672,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
getErr: []error{assert.AnError},
expectErr: require.Error,
expect: require.Nil,
muc: m,
},
{
name: "re-fetching the item fails",
@ -651,6 +681,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
mgi: mock.GetsItem{Item: nil, Err: assert.AnError},
expectErr: require.Error,
expect: require.Nil,
muc: m,
},
{
name: "expired url fails redownload",
@ -660,6 +691,57 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
getErr: []error{errUnauth, assert.AnError},
expectErr: require.Error,
expect: require.Nil,
muc: m,
},
{
name: "url refreshed from cache",
mgi: mock.GetsItem{Item: itemWID, Err: nil},
itemInfo: details.ItemInfo{},
respBody: []io.ReadCloser{nil, iorc},
getErr: []error{errUnauth, nil},
expectErr: require.NoError,
expect: require.NotNil,
muc: &mockURLCache{
Get: func(ctx context.Context, itemID string) (itemProps, error) {
return itemProps{
downloadURL: "http://example.com",
isDeleted: false,
},
nil
},
},
},
{
name: "url refreshed from cache but item deleted",
mgi: mock.GetsItem{Item: itemWID, Err: graph.ErrDeletedInFlight},
itemInfo: details.ItemInfo{},
respBody: []io.ReadCloser{nil, nil, nil},
getErr: []error{errUnauth, graph.ErrDeletedInFlight, graph.ErrDeletedInFlight},
expectErr: require.Error,
expect: require.Nil,
muc: &mockURLCache{
Get: func(ctx context.Context, itemID string) (itemProps, error) {
return itemProps{
downloadURL: "http://example.com",
isDeleted: true,
},
nil
},
},
},
{
name: "fallback to item fetch on any cache error",
mgi: mock.GetsItem{Item: itemWID, Err: nil},
itemInfo: details.ItemInfo{},
respBody: []io.ReadCloser{nil, iorc},
getErr: []error{errUnauth, nil},
expectErr: require.NoError,
expect: require.NotNil,
muc: &mockURLCache{
Get: func(ctx context.Context, itemID string) (itemProps, error) {
return itemProps{}, assert.AnError
},
},
},
}
for _, test := range table {
@ -685,7 +767,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
mbh.GetResps = resps
mbh.GetErrs = test.getErr
r, err := downloadContent(ctx, mbh, item, driveID)
r, err := downloadContent(ctx, mbh, test.muc, item, driveID)
test.expect(t, r)
test.expectErr(t, err, clues.ToCore(err))
})

View File

@ -63,6 +63,9 @@ type Collections struct {
NumItems int
NumFiles int
NumContainers int
// drive ID -> url cache instance
driveURLCache map[string]*urlCache
}
func NewCollections(
@ -79,6 +82,7 @@ func NewCollections(
CollectionMap: map[string]map[string]*Collection{},
statusUpdater: statusUpdater,
ctrl: ctrlOpts,
driveURLCache: map[string]*urlCache{},
}
}
@ -255,7 +259,8 @@ func (c *Collections) Get(
// Drive ID -> delta URL for drive
deltaURLs = map[string]string{}
// Drive ID -> folder ID -> folder path
folderPaths = map[string]map[string]string{}
folderPaths = map[string]map[string]string{}
numPrevItems = 0
)
for _, d := range drives {
@ -322,6 +327,24 @@ func (c *Collections) Get(
"num_deltas_entries", numDeltas,
"delta_reset", delta.Reset)
numDriveItems := c.NumItems - numPrevItems
numPrevItems = c.NumItems
// Only create a drive cache if there are less than 300k items in the drive.
if numDriveItems < urlCacheDriveItemThreshold {
logger.Ctx(ictx).Info("adding url cache for drive ", driveID)
logger.Ctx(ictx).Info("url cache: new items in drive ", numDriveItems)
err = c.addURLCacheToDriveCollections(
ictx,
driveID,
prevDelta,
errs)
if err != nil {
return nil, false, err
}
}
// For both cases we don't need to do set difference on folder map if the
// delta token was valid because we should see all the changes.
if !delta.Reset {
@ -370,13 +393,31 @@ func (c *Collections) Get(
c.statusUpdater,
c.ctrl,
CollectionScopeUnknown,
true)
true,
nil)
if err != nil {
return nil, false, clues.Wrap(err, "making collection").WithClues(ictx)
}
c.CollectionMap[driveID][fldID] = col
}
numDriveItems := c.NumItems - numPrevItems
numPrevItems = c.NumItems
// Only create a drive cache if there are less than 300k items in the drive.
if numDriveItems < urlCacheDriveItemThreshold {
logger.Ctx(ictx).Info("adding url cache for drive")
err = c.addURLCacheToDriveCollections(
ictx,
driveID,
prevDelta,
errs)
if err != nil {
return nil, false, err
}
}
}
observe.Message(ctx, fmt.Sprintf("Discovered %d items to backup", c.NumItems))
@ -405,7 +446,8 @@ func (c *Collections) Get(
c.statusUpdater,
c.ctrl,
CollectionScopeUnknown,
true)
true,
nil)
if err != nil {
return nil, false, clues.Wrap(err, "making drive tombstone").WithClues(ctx)
}
@ -438,6 +480,33 @@ func (c *Collections) Get(
return collections, canUsePreviousBackup, nil
}
// addURLCacheToDriveCollections adds an URL cache to all collections belonging to
// a drive.
func (c *Collections) addURLCacheToDriveCollections(
ctx context.Context,
driveID, prevDelta string,
errs *fault.Bus,
) error {
uc, err := newURLCache(
driveID,
prevDelta,
urlCacheRefreshInterval,
c.handler.NewItemPager(driveID, "", api.DriveItemSelectDefault()),
errs)
if err != nil {
return err
}
// Set the URL cache for all collections in this drive
for _, driveColls := range c.CollectionMap {
for _, coll := range driveColls {
coll.urlCache = uc
}
}
return nil
}
func updateCollectionPaths(
driveID, itemID string,
cmap map[string]map[string]*Collection,
@ -557,7 +626,8 @@ func (c *Collections) handleDelete(
c.ctrl,
CollectionScopeUnknown,
// DoNotMerge is not checked for deleted items.
false)
false,
nil)
if err != nil {
return clues.Wrap(err, "making collection").With(
"drive_id", driveID,
@ -740,7 +810,8 @@ func (c *Collections) UpdateCollections(
c.statusUpdater,
c.ctrl,
colScope,
invalidPrevDelta)
invalidPrevDelta,
nil)
if err != nil {
return clues.Stack(err).WithClues(ictx)
}

View File

@ -2,6 +2,7 @@ package onedrive
import (
"context"
"strconv"
"testing"
"github.com/alcionai/clues"
@ -2678,3 +2679,83 @@ func (suite *OneDriveCollectionsUnitSuite) TestCollectItems() {
})
}
}
func (suite *OneDriveCollectionsUnitSuite) TestURLCacheIntegration() {
driveID := "test-drive"
collCount := 3
table := []struct {
name string
items []deltaPagerResult
deltaURL string
prevDeltaSuccess bool
prevDelta string
err error
}{
{
name: "cache is attached",
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
mbh := mock.DefaultOneDriveBH()
mbh.ItemPagerV = map[string]api.DriveItemEnumerator{}
c := NewCollections(
mbh,
"test-tenant",
"test-user",
testFolderMatcher{(&selectors.OneDriveBackup{}).Folders(selectors.Any())[0]},
nil,
control.Options{ToggleFeatures: control.Toggles{}})
if _, ok := c.CollectionMap[driveID]; !ok {
c.CollectionMap[driveID] = map[string]*Collection{}
}
// Add a few collections
for i := 0; i < collCount; i++ {
coll, err := NewCollection(
&itemBackupHandler{api.Drives{}},
nil,
nil,
driveID,
nil,
control.Options{ToggleFeatures: control.Toggles{}},
CollectionScopeFolder,
true,
nil)
require.NoError(t, err, clues.ToCore(err))
c.CollectionMap[driveID][strconv.Itoa(i)] = coll
require.Equal(t, nil, coll.urlCache, "cache not nil")
}
err := c.addURLCacheToDriveCollections(
ctx,
driveID,
"",
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
// Check that all collections have the same cache instance attached
// to them
var uc *urlCache
for _, driveColls := range c.CollectionMap {
for _, coll := range driveColls {
require.NotNil(t, coll.urlCache, "cache is nil")
if uc == nil {
uc = coll.urlCache.(*urlCache)
} else {
require.Equal(t, uc, coll.urlCache, "cache not equal")
}
}
}
})
}
}

View File

@ -77,7 +77,7 @@ func downloadFile(
return nil, clues.New("malware detected").Label(graph.LabelsMalware)
}
if (resp.StatusCode / 100) != 2 {
if resp != nil && (resp.StatusCode/100) != 2 {
// upstream error checks can compare the status with
// clues.HasLabel(err, graph.LabelStatus(http.KnownStatusCode))
return nil, clues.

View File

@ -15,14 +15,24 @@ import (
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
type getItemPropertyer interface {
getItemProperties(
ctx context.Context,
itemID string,
) (itemProps, error)
}
type itemProps struct {
downloadURL string
isDeleted bool
}
var _ getItemPropertyer = &urlCache{}
// urlCache caches download URLs for drive items
type urlCache struct {
driveID string
prevDelta string
idToProps map[string]itemProps
lastRefreshTime time.Time
refreshInterval time.Duration
@ -39,7 +49,7 @@ type urlCache struct {
// newURLache creates a new URL cache for the specified drive ID
func newURLCache(
driveID string,
driveID, prevDelta string,
refreshInterval time.Duration,
itemPager api.DriveItemEnumerator,
errs *fault.Bus,
@ -56,6 +66,7 @@ func newURLCache(
idToProps: make(map[string]itemProps),
lastRefreshTime: time.Time{},
driveID: driveID,
prevDelta: prevDelta,
refreshInterval: refreshInterval,
itemPager: itemPager,
errs: errs,
@ -166,6 +177,9 @@ func (uc *urlCache) deltaQuery(
) error {
logger.Ctx(ctx).Debug("starting delta query")
// Reset item pager to remove any previous state
uc.itemPager.Reset()
_, _, _, err := collectItems(
ctx,
uc.itemPager,
@ -173,7 +187,7 @@ func (uc *urlCache) deltaQuery(
"",
uc.updateCache,
map[string]string{},
"",
uc.prevDelta,
uc.errs)
if err != nil {
return clues.Wrap(err, "delta query")

View File

@ -112,15 +112,13 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
// Create a new URL cache with a long TTL
cache, err := newURLCache(
suite.driveID,
"",
1*time.Hour,
driveItemPager,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
err = cache.refreshCache(ctx)
require.NoError(t, err, clues.ToCore(err))
// Launch parallel requests to the cache, one per item
var wg sync.WaitGroup
for i := 0; i < len(items); i++ {