diff --git a/src/internal/m365/onedrive/collection.go b/src/internal/m365/onedrive/collection.go index afeb0bcb0..197bee01f 100644 --- a/src/internal/m365/onedrive/collection.go +++ b/src/internal/m365/onedrive/collection.go @@ -84,6 +84,8 @@ type Collection struct { // should only be true if the old delta token expired doNotMergeItems bool + + urlCache getItemPropertyer } func pathToLocation(p path.Path) (*path.Builder, error) { @@ -109,6 +111,7 @@ func NewCollection( ctrlOpts control.Options, colScope collectionScope, doNotMergeItems bool, + urlCache getItemPropertyer, ) (*Collection, error) { // TODO(ashmrtn): If OneDrive switches to using folder IDs then this will need // to be changed as we won't be able to extract path information from the @@ -132,7 +135,8 @@ func NewCollection( statusUpdater, ctrlOpts, colScope, - doNotMergeItems) + doNotMergeItems, + urlCache) c.locPath = locPath c.prevLocPath = prevLocPath @@ -149,6 +153,7 @@ func newColl( ctrlOpts control.Options, colScope collectionScope, doNotMergeItems bool, + urlCache getItemPropertyer, ) *Collection { c := &Collection{ handler: handler, @@ -162,6 +167,7 @@ func newColl( state: data.StateOf(prevPath, currPath), scope: colScope, doNotMergeItems: doNotMergeItems, + urlCache: urlCache, } return c @@ -267,7 +273,7 @@ func (oc *Collection) getDriveItemContent( el = errs.Local() ) - itemData, err := downloadContent(ctx, oc.handler, item, oc.driveID) + itemData, err := downloadContent(ctx, oc.handler, oc.urlCache, item, oc.driveID) if err != nil { if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) { logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipMalware).Info("item flagged as malware") @@ -320,9 +326,13 @@ type itemAndAPIGetter interface { func downloadContent( ctx context.Context, iaag itemAndAPIGetter, + uc getItemPropertyer, item models.DriveItemable, driveID string, ) (io.ReadCloser, error) { + itemID := ptr.Val(item.GetId()) + ctx = clues.Add(ctx, "item_id", itemID) + content, err := downloadItem(ctx, iaag, item) if err == nil { return content, nil @@ -332,8 +342,19 @@ func downloadContent( // Assume unauthorized requests are a sign of an expired jwt // token, and that we've overrun the available window to - // download the actual file. Re-downloading the item will - // refresh that download url. + // download the file. Get a fresh url from the cache and attempt to + // download again. + content, err = readItemContents(ctx, iaag, uc, itemID) + if err == nil { + logger.Ctx(ctx).Debug("found item in url cache") + return content, nil + } + + // Consider cache errors(including deleted items) as cache misses. This is + // to preserve existing behavior. Fallback to refetching the item using the + // API. + logger.CtxErr(ctx, err).Info("url cache miss: refetching from API") + di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId())) if err != nil { return nil, clues.Wrap(err, "retrieving expired item") @@ -347,6 +368,41 @@ func downloadContent( return content, nil } +// readItemContents fetches latest download URL from the cache and attempts to +// download the file using the new URL. +func readItemContents( + ctx context.Context, + iaag itemAndAPIGetter, + uc getItemPropertyer, + itemID string, +) (io.ReadCloser, error) { + if uc == nil { + return nil, clues.New("nil url cache") + } + + props, err := uc.getItemProperties(ctx, itemID) + if err != nil { + return nil, err + } + + // Handle newly deleted items + if props.isDeleted { + logger.Ctx(ctx).Info("item deleted in cache") + return nil, graph.ErrDeletedInFlight + } + + rc, err := downloadFile(ctx, iaag, props.downloadURL) + if graph.IsErrUnauthorized(err) { + logger.CtxErr(ctx, err).Info("stale item in cache") + } + + if err != nil { + return nil, err + } + + return rc, nil +} + // populateItems iterates through items added to the collection // and uses the collection `itemReader` to read the item func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { diff --git a/src/internal/m365/onedrive/collection_test.go b/src/internal/m365/onedrive/collection_test.go index 2cfb65cae..4b8cc53b6 100644 --- a/src/internal/m365/onedrive/collection_test.go +++ b/src/internal/m365/onedrive/collection_test.go @@ -2,6 +2,7 @@ package onedrive import ( "bytes" + "context" "encoding/json" "io" "net/http" @@ -204,7 +205,8 @@ func (suite *CollectionUnitTestSuite) TestCollection() { suite.testStatusUpdater(&wg, &collStatus), control.Options{ToggleFeatures: control.Toggles{}}, CollectionScopeFolder, - true) + true, + nil) require.NoError(t, err, clues.ToCore(err)) require.NotNil(t, coll) assert.Equal(t, folderPath, coll.FullPath()) @@ -312,7 +314,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() { suite.testStatusUpdater(&wg, &collStatus), control.Options{ToggleFeatures: control.Toggles{}}, CollectionScopeFolder, - true) + true, + nil) require.NoError(t, err, clues.ToCore(err)) stubItem := odTD.NewStubDriveItem( @@ -388,7 +391,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadUnauthorizedErrorRetry() suite.testStatusUpdater(&wg, &collStatus), control.Options{ToggleFeatures: control.Toggles{}}, CollectionScopeFolder, - true) + true, + nil) require.NoError(t, err, clues.ToCore(err)) coll.Add(stubItem) @@ -442,7 +446,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTim suite.testStatusUpdater(&wg, &collStatus), control.Options{ToggleFeatures: control.Toggles{}}, CollectionScopeFolder, - true) + true, + nil) require.NoError(t, err, clues.ToCore(err)) mtime := time.Now().AddDate(0, -1, 0) @@ -600,6 +605,19 @@ func (suite *GetDriveItemUnitTestSuite) TestGetDriveItem_error() { } } +var _ getItemPropertyer = &mockURLCache{} + +type mockURLCache struct { + Get func(ctx context.Context, itemID string) (itemProps, error) +} + +func (muc *mockURLCache) getItemProperties( + ctx context.Context, + itemID string, +) (itemProps, error) { + return muc.Get(ctx, itemID) +} + func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { var ( driveID string @@ -611,6 +629,12 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { itemWID.SetId(ptr.To("brainhooldy")) + m := &mockURLCache{ + Get: func(ctx context.Context, itemID string) (itemProps, error) { + return itemProps{}, clues.Stack(assert.AnError) + }, + } + table := []struct { name string mgi mock.GetsItem @@ -619,6 +643,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { getErr []error expectErr require.ErrorAssertionFunc expect require.ValueAssertionFunc + muc *mockURLCache }{ { name: "good", @@ -627,6 +652,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { getErr: []error{nil}, expectErr: require.NoError, expect: require.NotNil, + muc: m, }, { name: "expired url redownloads", @@ -636,6 +662,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { getErr: []error{errUnauth, nil}, expectErr: require.NoError, expect: require.NotNil, + muc: m, }, { name: "immediate error", @@ -643,6 +670,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { getErr: []error{assert.AnError}, expectErr: require.Error, expect: require.Nil, + muc: m, }, { name: "re-fetching the item fails", @@ -651,6 +679,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { mgi: mock.GetsItem{Item: nil, Err: assert.AnError}, expectErr: require.Error, expect: require.Nil, + muc: m, }, { name: "expired url fails redownload", @@ -660,6 +689,57 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { getErr: []error{errUnauth, assert.AnError}, expectErr: require.Error, expect: require.Nil, + muc: m, + }, + { + name: "url refreshed from cache", + mgi: mock.GetsItem{Item: itemWID, Err: nil}, + itemInfo: details.ItemInfo{}, + respBody: []io.ReadCloser{nil, iorc}, + getErr: []error{errUnauth, nil}, + expectErr: require.NoError, + expect: require.NotNil, + muc: &mockURLCache{ + Get: func(ctx context.Context, itemID string) (itemProps, error) { + return itemProps{ + downloadURL: "http://example.com", + isDeleted: false, + }, + nil + }, + }, + }, + { + name: "url refreshed from cache but item deleted", + mgi: mock.GetsItem{Item: itemWID, Err: graph.ErrDeletedInFlight}, + itemInfo: details.ItemInfo{}, + respBody: []io.ReadCloser{nil, nil, nil}, + getErr: []error{errUnauth, graph.ErrDeletedInFlight, graph.ErrDeletedInFlight}, + expectErr: require.Error, + expect: require.Nil, + muc: &mockURLCache{ + Get: func(ctx context.Context, itemID string) (itemProps, error) { + return itemProps{ + downloadURL: "http://example.com", + isDeleted: true, + }, + nil + }, + }, + }, + { + name: "fallback to item fetch on any cache error", + mgi: mock.GetsItem{Item: itemWID, Err: nil}, + itemInfo: details.ItemInfo{}, + respBody: []io.ReadCloser{nil, iorc}, + getErr: []error{errUnauth, nil}, + expectErr: require.NoError, + expect: require.NotNil, + muc: &mockURLCache{ + Get: func(ctx context.Context, itemID string) (itemProps, error) { + return itemProps{}, assert.AnError + }, + }, }, } for _, test := range table { @@ -685,7 +765,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { mbh.GetResps = resps mbh.GetErrs = test.getErr - r, err := downloadContent(ctx, mbh, item, driveID) + r, err := downloadContent(ctx, mbh, test.muc, item, driveID) test.expect(t, r) test.expectErr(t, err, clues.ToCore(err)) }) diff --git a/src/internal/m365/onedrive/collections.go b/src/internal/m365/onedrive/collections.go index 7122a2361..2f270ad0a 100644 --- a/src/internal/m365/onedrive/collections.go +++ b/src/internal/m365/onedrive/collections.go @@ -255,7 +255,8 @@ func (c *Collections) Get( // Drive ID -> delta URL for drive deltaURLs = map[string]string{} // Drive ID -> folder ID -> folder path - folderPaths = map[string]map[string]string{} + folderPaths = map[string]map[string]string{} + numPrevItems = 0 ) for _, d := range drives { @@ -370,13 +371,30 @@ func (c *Collections) Get( c.statusUpdater, c.ctrl, CollectionScopeUnknown, - true) + true, + nil) if err != nil { return nil, false, clues.Wrap(err, "making collection").WithClues(ictx) } c.CollectionMap[driveID][fldID] = col } + + numDriveItems := c.NumItems - numPrevItems + numPrevItems = c.NumItems + + // Only create a drive cache if there are less than 300k items in the drive. + if numDriveItems < urlCacheDriveItemThreshold { + logger.Ctx(ictx).Info("adding url cache for drive") + + err = c.addURLCacheToDriveCollections( + ictx, + driveID, + errs) + if err != nil { + return nil, false, err + } + } } observe.Message(ctx, fmt.Sprintf("Discovered %d items to backup", c.NumItems)) @@ -405,7 +423,8 @@ func (c *Collections) Get( c.statusUpdater, c.ctrl, CollectionScopeUnknown, - true) + true, + nil) if err != nil { return nil, false, clues.Wrap(err, "making drive tombstone").WithClues(ctx) } @@ -438,6 +457,32 @@ func (c *Collections) Get( return collections, canUsePreviousBackup, nil } +// addURLCacheToDriveCollections adds an URL cache to all collections belonging to +// a drive. +func (c *Collections) addURLCacheToDriveCollections( + ctx context.Context, + driveID string, + errs *fault.Bus, +) error { + uc, err := newURLCache( + driveID, + urlCacheRefreshInterval, + c.handler.NewItemPager(driveID, "", api.DriveItemSelectDefault()), + errs) + if err != nil { + return err + } + + // Set the URL cache for all collections in this drive + for _, driveColls := range c.CollectionMap { + for _, coll := range driveColls { + coll.urlCache = uc + } + } + + return nil +} + func updateCollectionPaths( driveID, itemID string, cmap map[string]map[string]*Collection, @@ -557,7 +602,8 @@ func (c *Collections) handleDelete( c.ctrl, CollectionScopeUnknown, // DoNotMerge is not checked for deleted items. - false) + false, + nil) if err != nil { return clues.Wrap(err, "making collection").With( "drive_id", driveID, @@ -740,7 +786,8 @@ func (c *Collections) UpdateCollections( c.statusUpdater, c.ctrl, colScope, - invalidPrevDelta) + invalidPrevDelta, + nil) if err != nil { return clues.Stack(err).WithClues(ictx) } diff --git a/src/internal/m365/onedrive/collections_test.go b/src/internal/m365/onedrive/collections_test.go index bc64875f4..b3f410a87 100644 --- a/src/internal/m365/onedrive/collections_test.go +++ b/src/internal/m365/onedrive/collections_test.go @@ -2,6 +2,7 @@ package onedrive import ( "context" + "strconv" "testing" "github.com/alcionai/clues" @@ -2678,3 +2679,85 @@ func (suite *OneDriveCollectionsUnitSuite) TestCollectItems() { }) } } + +func (suite *OneDriveCollectionsUnitSuite) TestAddURLCacheToDriveCollections() { + driveID := "test-drive" + collCount := 3 + anyFolder := (&selectors.OneDriveBackup{}).Folders(selectors.Any())[0] + + table := []struct { + name string + items []deltaPagerResult + deltaURL string + prevDeltaSuccess bool + prevDelta string + err error + }{ + { + name: "cache is attached", + }, + } + for _, test := range table { + suite.Run(test.name, func() { + t := suite.T() + + ctx, flush := tester.NewContext(t) + defer flush() + + itemPagers := map[string]api.DriveItemEnumerator{} + itemPagers[driveID] = &mockItemPager{} + + mbh := mock.DefaultOneDriveBH() + mbh.ItemPagerV = itemPagers + + c := NewCollections( + mbh, + "test-tenant", + "test-user", + nil, + control.Options{ToggleFeatures: control.Toggles{}}) + + if _, ok := c.CollectionMap[driveID]; !ok { + c.CollectionMap[driveID] = map[string]*Collection{} + } + + // Add a few collections + for i := 0; i < collCount; i++ { + coll, err := NewCollection( + &itemBackupHandler{api.Drives{}, anyFolder}, + nil, + nil, + driveID, + nil, + control.Options{ToggleFeatures: control.Toggles{}}, + CollectionScopeFolder, + true, + nil) + require.NoError(t, err, clues.ToCore(err)) + + c.CollectionMap[driveID][strconv.Itoa(i)] = coll + require.Equal(t, nil, coll.urlCache, "cache not nil") + } + + err := c.addURLCacheToDriveCollections( + ctx, + driveID, + fault.New(true)) + require.NoError(t, err, clues.ToCore(err)) + + // Check that all collections have the same cache instance attached + // to them + var uc *urlCache + for _, driveColls := range c.CollectionMap { + for _, coll := range driveColls { + require.NotNil(t, coll.urlCache, "cache is nil") + if uc == nil { + uc = coll.urlCache.(*urlCache) + } else { + require.Equal(t, uc, coll.urlCache, "cache not equal") + } + } + } + }) + } +} diff --git a/src/internal/m365/onedrive/item.go b/src/internal/m365/onedrive/item.go index c6215e9ae..bf12e91c4 100644 --- a/src/internal/m365/onedrive/item.go +++ b/src/internal/m365/onedrive/item.go @@ -77,7 +77,7 @@ func downloadFile( return nil, clues.New("malware detected").Label(graph.LabelsMalware) } - if (resp.StatusCode / 100) != 2 { + if resp != nil && (resp.StatusCode/100) != 2 { // upstream error checks can compare the status with // clues.HasLabel(err, graph.LabelStatus(http.KnownStatusCode)) return nil, clues. diff --git a/src/internal/m365/onedrive/url_cache.go b/src/internal/m365/onedrive/url_cache.go index bb5e61b94..2f585c223 100644 --- a/src/internal/m365/onedrive/url_cache.go +++ b/src/internal/m365/onedrive/url_cache.go @@ -15,11 +15,25 @@ import ( "github.com/alcionai/corso/src/pkg/services/m365/api" ) +const ( + urlCacheDriveItemThreshold = 300 * 1000 + urlCacheRefreshInterval = 1 * time.Hour +) + +type getItemPropertyer interface { + getItemProperties( + ctx context.Context, + itemID string, + ) (itemProps, error) +} + type itemProps struct { downloadURL string isDeleted bool } +var _ getItemPropertyer = &urlCache{} + // urlCache caches download URLs for drive items type urlCache struct { driveID string diff --git a/src/internal/m365/onedrive/url_cache_test.go b/src/internal/m365/onedrive/url_cache_test.go index 6e5da998c..1632c165a 100644 --- a/src/internal/m365/onedrive/url_cache_test.go +++ b/src/internal/m365/onedrive/url_cache_test.go @@ -118,9 +118,6 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() { require.NoError(t, err, clues.ToCore(err)) - err = cache.refreshCache(ctx) - require.NoError(t, err, clues.ToCore(err)) - // Launch parallel requests to the cache, one per item var wg sync.WaitGroup for i := 0; i < len(items); i++ {