From 479f114514305a2a2a539f007b40dc4022aaf5b2 Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Tue, 20 Jun 2023 16:18:01 -0700 Subject: [PATCH] Integrate URL cache into onedrive backup flows (#3542) Changes: * Attach URL cache to a collection only if it's parent drive has < 300k items. Higher item count reduces the ROI of using cache, and can even degrade performance in severe cases. We will stick to per item GETs for those scenarios, until we figure out a longer term solution. * --- #### Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No #### Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Supportability/Tests - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup #### Issue(s) * https://github.com/alcionai/corso/issues/3069 #### Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [x] :green_heart: E2E --- src/internal/m365/onedrive/collection.go | 64 ++++++++++++- src/internal/m365/onedrive/collection_test.go | 90 +++++++++++++++++-- src/internal/m365/onedrive/collections.go | 57 ++++++++++-- .../m365/onedrive/collections_test.go | 83 +++++++++++++++++ src/internal/m365/onedrive/item.go | 2 +- src/internal/m365/onedrive/url_cache.go | 14 +++ src/internal/m365/onedrive/url_cache_test.go | 3 - 7 files changed, 295 insertions(+), 18 deletions(-) diff --git a/src/internal/m365/onedrive/collection.go b/src/internal/m365/onedrive/collection.go index afeb0bcb0..197bee01f 100644 --- a/src/internal/m365/onedrive/collection.go +++ b/src/internal/m365/onedrive/collection.go @@ -84,6 +84,8 @@ type Collection struct { // should only be true if the old delta token expired doNotMergeItems bool + + urlCache getItemPropertyer } func pathToLocation(p path.Path) (*path.Builder, error) { @@ -109,6 +111,7 @@ func NewCollection( ctrlOpts control.Options, colScope collectionScope, doNotMergeItems bool, + urlCache getItemPropertyer, ) (*Collection, error) { // TODO(ashmrtn): If OneDrive switches to using folder IDs then this will need // to be changed as we won't be able to extract path information from the @@ -132,7 +135,8 @@ func NewCollection( statusUpdater, ctrlOpts, colScope, - doNotMergeItems) + doNotMergeItems, + urlCache) c.locPath = locPath c.prevLocPath = prevLocPath @@ -149,6 +153,7 @@ func newColl( ctrlOpts control.Options, colScope collectionScope, doNotMergeItems bool, + urlCache getItemPropertyer, ) *Collection { c := &Collection{ handler: handler, @@ -162,6 +167,7 @@ func newColl( state: data.StateOf(prevPath, currPath), scope: colScope, doNotMergeItems: doNotMergeItems, + urlCache: urlCache, } return c @@ -267,7 +273,7 @@ func (oc *Collection) getDriveItemContent( el = errs.Local() ) - itemData, err := downloadContent(ctx, oc.handler, item, oc.driveID) + itemData, err := downloadContent(ctx, oc.handler, oc.urlCache, item, oc.driveID) if err != nil { if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) { logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipMalware).Info("item flagged as malware") @@ -320,9 +326,13 @@ type itemAndAPIGetter interface { func downloadContent( ctx context.Context, iaag itemAndAPIGetter, + uc getItemPropertyer, item models.DriveItemable, driveID string, ) (io.ReadCloser, error) { + itemID := ptr.Val(item.GetId()) + ctx = clues.Add(ctx, "item_id", itemID) + content, err := downloadItem(ctx, iaag, item) if err == nil { return content, nil @@ -332,8 +342,19 @@ func downloadContent( // Assume unauthorized requests are a sign of an expired jwt // token, and that we've overrun the available window to - // download the actual file. Re-downloading the item will - // refresh that download url. + // download the file. Get a fresh url from the cache and attempt to + // download again. + content, err = readItemContents(ctx, iaag, uc, itemID) + if err == nil { + logger.Ctx(ctx).Debug("found item in url cache") + return content, nil + } + + // Consider cache errors(including deleted items) as cache misses. This is + // to preserve existing behavior. Fallback to refetching the item using the + // API. + logger.CtxErr(ctx, err).Info("url cache miss: refetching from API") + di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId())) if err != nil { return nil, clues.Wrap(err, "retrieving expired item") @@ -347,6 +368,41 @@ func downloadContent( return content, nil } +// readItemContents fetches latest download URL from the cache and attempts to +// download the file using the new URL. +func readItemContents( + ctx context.Context, + iaag itemAndAPIGetter, + uc getItemPropertyer, + itemID string, +) (io.ReadCloser, error) { + if uc == nil { + return nil, clues.New("nil url cache") + } + + props, err := uc.getItemProperties(ctx, itemID) + if err != nil { + return nil, err + } + + // Handle newly deleted items + if props.isDeleted { + logger.Ctx(ctx).Info("item deleted in cache") + return nil, graph.ErrDeletedInFlight + } + + rc, err := downloadFile(ctx, iaag, props.downloadURL) + if graph.IsErrUnauthorized(err) { + logger.CtxErr(ctx, err).Info("stale item in cache") + } + + if err != nil { + return nil, err + } + + return rc, nil +} + // populateItems iterates through items added to the collection // and uses the collection `itemReader` to read the item func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { diff --git a/src/internal/m365/onedrive/collection_test.go b/src/internal/m365/onedrive/collection_test.go index 2cfb65cae..4b8cc53b6 100644 --- a/src/internal/m365/onedrive/collection_test.go +++ b/src/internal/m365/onedrive/collection_test.go @@ -2,6 +2,7 @@ package onedrive import ( "bytes" + "context" "encoding/json" "io" "net/http" @@ -204,7 +205,8 @@ func (suite *CollectionUnitTestSuite) TestCollection() { suite.testStatusUpdater(&wg, &collStatus), control.Options{ToggleFeatures: control.Toggles{}}, CollectionScopeFolder, - true) + true, + nil) require.NoError(t, err, clues.ToCore(err)) require.NotNil(t, coll) assert.Equal(t, folderPath, coll.FullPath()) @@ -312,7 +314,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() { suite.testStatusUpdater(&wg, &collStatus), control.Options{ToggleFeatures: control.Toggles{}}, CollectionScopeFolder, - true) + true, + nil) require.NoError(t, err, clues.ToCore(err)) stubItem := odTD.NewStubDriveItem( @@ -388,7 +391,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadUnauthorizedErrorRetry() suite.testStatusUpdater(&wg, &collStatus), control.Options{ToggleFeatures: control.Toggles{}}, CollectionScopeFolder, - true) + true, + nil) require.NoError(t, err, clues.ToCore(err)) coll.Add(stubItem) @@ -442,7 +446,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTim suite.testStatusUpdater(&wg, &collStatus), control.Options{ToggleFeatures: control.Toggles{}}, CollectionScopeFolder, - true) + true, + nil) require.NoError(t, err, clues.ToCore(err)) mtime := time.Now().AddDate(0, -1, 0) @@ -600,6 +605,19 @@ func (suite *GetDriveItemUnitTestSuite) TestGetDriveItem_error() { } } +var _ getItemPropertyer = &mockURLCache{} + +type mockURLCache struct { + Get func(ctx context.Context, itemID string) (itemProps, error) +} + +func (muc *mockURLCache) getItemProperties( + ctx context.Context, + itemID string, +) (itemProps, error) { + return muc.Get(ctx, itemID) +} + func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { var ( driveID string @@ -611,6 +629,12 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { itemWID.SetId(ptr.To("brainhooldy")) + m := &mockURLCache{ + Get: func(ctx context.Context, itemID string) (itemProps, error) { + return itemProps{}, clues.Stack(assert.AnError) + }, + } + table := []struct { name string mgi mock.GetsItem @@ -619,6 +643,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { getErr []error expectErr require.ErrorAssertionFunc expect require.ValueAssertionFunc + muc *mockURLCache }{ { name: "good", @@ -627,6 +652,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { getErr: []error{nil}, expectErr: require.NoError, expect: require.NotNil, + muc: m, }, { name: "expired url redownloads", @@ -636,6 +662,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { getErr: []error{errUnauth, nil}, expectErr: require.NoError, expect: require.NotNil, + muc: m, }, { name: "immediate error", @@ -643,6 +670,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { getErr: []error{assert.AnError}, expectErr: require.Error, expect: require.Nil, + muc: m, }, { name: "re-fetching the item fails", @@ -651,6 +679,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { mgi: mock.GetsItem{Item: nil, Err: assert.AnError}, expectErr: require.Error, expect: require.Nil, + muc: m, }, { name: "expired url fails redownload", @@ -660,6 +689,57 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { getErr: []error{errUnauth, assert.AnError}, expectErr: require.Error, expect: require.Nil, + muc: m, + }, + { + name: "url refreshed from cache", + mgi: mock.GetsItem{Item: itemWID, Err: nil}, + itemInfo: details.ItemInfo{}, + respBody: []io.ReadCloser{nil, iorc}, + getErr: []error{errUnauth, nil}, + expectErr: require.NoError, + expect: require.NotNil, + muc: &mockURLCache{ + Get: func(ctx context.Context, itemID string) (itemProps, error) { + return itemProps{ + downloadURL: "http://example.com", + isDeleted: false, + }, + nil + }, + }, + }, + { + name: "url refreshed from cache but item deleted", + mgi: mock.GetsItem{Item: itemWID, Err: graph.ErrDeletedInFlight}, + itemInfo: details.ItemInfo{}, + respBody: []io.ReadCloser{nil, nil, nil}, + getErr: []error{errUnauth, graph.ErrDeletedInFlight, graph.ErrDeletedInFlight}, + expectErr: require.Error, + expect: require.Nil, + muc: &mockURLCache{ + Get: func(ctx context.Context, itemID string) (itemProps, error) { + return itemProps{ + downloadURL: "http://example.com", + isDeleted: true, + }, + nil + }, + }, + }, + { + name: "fallback to item fetch on any cache error", + mgi: mock.GetsItem{Item: itemWID, Err: nil}, + itemInfo: details.ItemInfo{}, + respBody: []io.ReadCloser{nil, iorc}, + getErr: []error{errUnauth, nil}, + expectErr: require.NoError, + expect: require.NotNil, + muc: &mockURLCache{ + Get: func(ctx context.Context, itemID string) (itemProps, error) { + return itemProps{}, assert.AnError + }, + }, }, } for _, test := range table { @@ -685,7 +765,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { mbh.GetResps = resps mbh.GetErrs = test.getErr - r, err := downloadContent(ctx, mbh, item, driveID) + r, err := downloadContent(ctx, mbh, test.muc, item, driveID) test.expect(t, r) test.expectErr(t, err, clues.ToCore(err)) }) diff --git a/src/internal/m365/onedrive/collections.go b/src/internal/m365/onedrive/collections.go index 7122a2361..2f270ad0a 100644 --- a/src/internal/m365/onedrive/collections.go +++ b/src/internal/m365/onedrive/collections.go @@ -255,7 +255,8 @@ func (c *Collections) Get( // Drive ID -> delta URL for drive deltaURLs = map[string]string{} // Drive ID -> folder ID -> folder path - folderPaths = map[string]map[string]string{} + folderPaths = map[string]map[string]string{} + numPrevItems = 0 ) for _, d := range drives { @@ -370,13 +371,30 @@ func (c *Collections) Get( c.statusUpdater, c.ctrl, CollectionScopeUnknown, - true) + true, + nil) if err != nil { return nil, false, clues.Wrap(err, "making collection").WithClues(ictx) } c.CollectionMap[driveID][fldID] = col } + + numDriveItems := c.NumItems - numPrevItems + numPrevItems = c.NumItems + + // Only create a drive cache if there are less than 300k items in the drive. + if numDriveItems < urlCacheDriveItemThreshold { + logger.Ctx(ictx).Info("adding url cache for drive") + + err = c.addURLCacheToDriveCollections( + ictx, + driveID, + errs) + if err != nil { + return nil, false, err + } + } } observe.Message(ctx, fmt.Sprintf("Discovered %d items to backup", c.NumItems)) @@ -405,7 +423,8 @@ func (c *Collections) Get( c.statusUpdater, c.ctrl, CollectionScopeUnknown, - true) + true, + nil) if err != nil { return nil, false, clues.Wrap(err, "making drive tombstone").WithClues(ctx) } @@ -438,6 +457,32 @@ func (c *Collections) Get( return collections, canUsePreviousBackup, nil } +// addURLCacheToDriveCollections adds an URL cache to all collections belonging to +// a drive. +func (c *Collections) addURLCacheToDriveCollections( + ctx context.Context, + driveID string, + errs *fault.Bus, +) error { + uc, err := newURLCache( + driveID, + urlCacheRefreshInterval, + c.handler.NewItemPager(driveID, "", api.DriveItemSelectDefault()), + errs) + if err != nil { + return err + } + + // Set the URL cache for all collections in this drive + for _, driveColls := range c.CollectionMap { + for _, coll := range driveColls { + coll.urlCache = uc + } + } + + return nil +} + func updateCollectionPaths( driveID, itemID string, cmap map[string]map[string]*Collection, @@ -557,7 +602,8 @@ func (c *Collections) handleDelete( c.ctrl, CollectionScopeUnknown, // DoNotMerge is not checked for deleted items. - false) + false, + nil) if err != nil { return clues.Wrap(err, "making collection").With( "drive_id", driveID, @@ -740,7 +786,8 @@ func (c *Collections) UpdateCollections( c.statusUpdater, c.ctrl, colScope, - invalidPrevDelta) + invalidPrevDelta, + nil) if err != nil { return clues.Stack(err).WithClues(ictx) } diff --git a/src/internal/m365/onedrive/collections_test.go b/src/internal/m365/onedrive/collections_test.go index bc64875f4..b3f410a87 100644 --- a/src/internal/m365/onedrive/collections_test.go +++ b/src/internal/m365/onedrive/collections_test.go @@ -2,6 +2,7 @@ package onedrive import ( "context" + "strconv" "testing" "github.com/alcionai/clues" @@ -2678,3 +2679,85 @@ func (suite *OneDriveCollectionsUnitSuite) TestCollectItems() { }) } } + +func (suite *OneDriveCollectionsUnitSuite) TestAddURLCacheToDriveCollections() { + driveID := "test-drive" + collCount := 3 + anyFolder := (&selectors.OneDriveBackup{}).Folders(selectors.Any())[0] + + table := []struct { + name string + items []deltaPagerResult + deltaURL string + prevDeltaSuccess bool + prevDelta string + err error + }{ + { + name: "cache is attached", + }, + } + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + itemPagers := map[string]api.DriveItemEnumerator{} + itemPagers[driveID] = &mockItemPager{} + + mbh := mock.DefaultOneDriveBH() + mbh.ItemPagerV = itemPagers + + c := NewCollections( + mbh, + "test-tenant", + "test-user", + nil, + control.Options{ToggleFeatures: control.Toggles{}}) + + if _, ok := c.CollectionMap[driveID]; !ok { + c.CollectionMap[driveID] = map[string]*Collection{} + } + + // Add a few collections + for i := 0; i < collCount; i++ { + coll, err := NewCollection( + &itemBackupHandler{api.Drives{}, anyFolder}, + nil, + nil, + driveID, + nil, + control.Options{ToggleFeatures: control.Toggles{}}, + CollectionScopeFolder, + true, + nil) + require.NoError(t, err, clues.ToCore(err)) + + c.CollectionMap[driveID][strconv.Itoa(i)] = coll + require.Equal(t, nil, coll.urlCache, "cache not nil") + } + + err := c.addURLCacheToDriveCollections( + ctx, + driveID, + fault.New(true)) + require.NoError(t, err, clues.ToCore(err)) + + // Check that all collections have the same cache instance attached + // to them + var uc *urlCache + for _, driveColls := range c.CollectionMap { + for _, coll := range driveColls { + require.NotNil(t, coll.urlCache, "cache is nil") + if uc == nil { + uc = coll.urlCache.(*urlCache) + } else { + require.Equal(t, uc, coll.urlCache, "cache not equal") + } + } + } + }) + } +} diff --git a/src/internal/m365/onedrive/item.go b/src/internal/m365/onedrive/item.go index c6215e9ae..bf12e91c4 100644 --- a/src/internal/m365/onedrive/item.go +++ b/src/internal/m365/onedrive/item.go @@ -77,7 +77,7 @@ func downloadFile( return nil, clues.New("malware detected").Label(graph.LabelsMalware) } - if (resp.StatusCode / 100) != 2 { + if resp != nil && (resp.StatusCode/100) != 2 { // upstream error checks can compare the status with // clues.HasLabel(err, graph.LabelStatus(http.KnownStatusCode)) return nil, clues. diff --git a/src/internal/m365/onedrive/url_cache.go b/src/internal/m365/onedrive/url_cache.go index bb5e61b94..2f585c223 100644 --- a/src/internal/m365/onedrive/url_cache.go +++ b/src/internal/m365/onedrive/url_cache.go @@ -15,11 +15,25 @@ import ( "github.com/alcionai/corso/src/pkg/services/m365/api" ) +const ( + urlCacheDriveItemThreshold = 300 * 1000 + urlCacheRefreshInterval = 1 * time.Hour +) + +type getItemPropertyer interface { + getItemProperties( + ctx context.Context, + itemID string, + ) (itemProps, error) +} + type itemProps struct { downloadURL string isDeleted bool } +var _ getItemPropertyer = &urlCache{} + // urlCache caches download URLs for drive items type urlCache struct { driveID string diff --git a/src/internal/m365/onedrive/url_cache_test.go b/src/internal/m365/onedrive/url_cache_test.go index 6e5da998c..1632c165a 100644 --- a/src/internal/m365/onedrive/url_cache_test.go +++ b/src/internal/m365/onedrive/url_cache_test.go @@ -118,9 +118,6 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() { require.NoError(t, err, clues.ToCore(err)) - err = cache.refreshCache(ctx) - require.NoError(t, err, clues.ToCore(err)) - // Launch parallel requests to the cache, one per item var wg sync.WaitGroup for i := 0; i < len(items); i++ {