diff --git a/src/internal/m365/onedrive/collection.go b/src/internal/m365/onedrive/collection.go index afeb0bcb0..719c1dbb5 100644 --- a/src/internal/m365/onedrive/collection.go +++ b/src/internal/m365/onedrive/collection.go @@ -84,6 +84,8 @@ type Collection struct { // should only be true if the old delta token expired doNotMergeItems bool + + cache *urlCache } func pathToLocation(p path.Path) (*path.Builder, error) { @@ -109,6 +111,7 @@ func NewCollection( ctrlOpts control.Options, colScope collectionScope, doNotMergeItems bool, + cache *urlCache, ) (*Collection, error) { // TODO(ashmrtn): If OneDrive switches to using folder IDs then this will need // to be changed as we won't be able to extract path information from the @@ -132,7 +135,8 @@ func NewCollection( statusUpdater, ctrlOpts, colScope, - doNotMergeItems) + doNotMergeItems, + cache) c.locPath = locPath c.prevLocPath = prevLocPath @@ -149,6 +153,7 @@ func newColl( ctrlOpts control.Options, colScope collectionScope, doNotMergeItems bool, + cache *urlCache, ) *Collection { c := &Collection{ handler: handler, @@ -162,6 +167,7 @@ func newColl( state: data.StateOf(prevPath, currPath), scope: colScope, doNotMergeItems: doNotMergeItems, + cache: cache, } return c @@ -267,7 +273,7 @@ func (oc *Collection) getDriveItemContent( el = errs.Local() ) - itemData, err := downloadContent(ctx, oc.handler, item, oc.driveID) + itemData, err := oc.downloadContent(ctx, oc.handler, item, oc.driveID) if err != nil { if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) { logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipMalware).Info("item flagged as malware") @@ -317,12 +323,15 @@ type itemAndAPIGetter interface { // downloadContent attempts to fetch the item content. If the content url // is expired (ie, returns a 401), it re-fetches the item to get a new download // url and tries again. -func downloadContent( +func (oc *Collection) downloadContent( ctx context.Context, iaag itemAndAPIGetter, item models.DriveItemable, driveID string, ) (io.ReadCloser, error) { + itemID := ptr.Val(item.GetId()) + ctx = clues.Add(ctx, "item_id", itemID) + content, err := downloadItem(ctx, iaag, item) if err == nil { return content, nil @@ -332,8 +341,20 @@ func downloadContent( // Assume unauthorized requests are a sign of an expired jwt // token, and that we've overrun the available window to - // download the actual file. Re-downloading the item will - // refresh that download url. + // download the file. Get a fresh url from the cache and attempt to + // download again. + content, err = oc.readFromCache(ctx, iaag, itemID) + if err == nil { + logger.Ctx(ctx).Debug("found item in cache", itemID) + + return content, nil + } + + // Consider cache errors(including deleted items) as cache misses. This is + // to preserve existing behavior. Fallback to refetching the item using the + // API. + logger.CtxErr(ctx, err).Info("cache miss. refetching from API") + di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId())) if err != nil { return nil, clues.Wrap(err, "retrieving expired item") @@ -347,6 +368,41 @@ func downloadContent( return content, nil } +// readFromCache fetches latest download URL from the cache and attempts to +// download the file using the new URL. +func (oc *Collection) readFromCache( + ctx context.Context, + iaag itemAndAPIGetter, + itemID string, +) (io.ReadCloser, error) { + if oc.cache == nil { + return nil, clues.New("nil url cache") + } + + props, err := oc.cache.getItemProperties(ctx, itemID) + if err != nil { + return nil, err + } + + // Handle newly deleted items + if props.isDeleted { + logger.Ctx(ctx).Info("item deleted in cache") + + return nil, graph.ErrDeletedInFlight + } + + rc, err := downloadFile(ctx, iaag, props.downloadURL) + if graph.IsErrUnauthorized(err) { + logger.CtxErr(ctx, err).Info("stale item in cache") + } + + if err != nil { + return nil, err + } + + return rc, nil +} + // populateItems iterates through items added to the collection // and uses the collection `itemReader` to read the item func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { diff --git a/src/internal/m365/onedrive/collection_test.go b/src/internal/m365/onedrive/collection_test.go index 2cfb65cae..dd8d799ff 100644 --- a/src/internal/m365/onedrive/collection_test.go +++ b/src/internal/m365/onedrive/collection_test.go @@ -204,7 +204,8 @@ func (suite *CollectionUnitTestSuite) TestCollection() { suite.testStatusUpdater(&wg, &collStatus), control.Options{ToggleFeatures: control.Toggles{}}, CollectionScopeFolder, - true) + true, + nil) require.NoError(t, err, clues.ToCore(err)) require.NotNil(t, coll) assert.Equal(t, folderPath, coll.FullPath()) @@ -312,7 +313,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() { suite.testStatusUpdater(&wg, &collStatus), control.Options{ToggleFeatures: control.Toggles{}}, CollectionScopeFolder, - true) + true, + nil) require.NoError(t, err, clues.ToCore(err)) stubItem := odTD.NewStubDriveItem( @@ -388,7 +390,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadUnauthorizedErrorRetry() suite.testStatusUpdater(&wg, &collStatus), control.Options{ToggleFeatures: control.Toggles{}}, CollectionScopeFolder, - true) + true, + nil) require.NoError(t, err, clues.ToCore(err)) coll.Add(stubItem) @@ -442,7 +445,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTim suite.testStatusUpdater(&wg, &collStatus), control.Options{ToggleFeatures: control.Toggles{}}, CollectionScopeFolder, - true) + true, + nil) require.NoError(t, err, clues.ToCore(err)) mtime := time.Now().AddDate(0, -1, 0) @@ -685,7 +689,19 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() { mbh.GetResps = resps mbh.GetErrs = test.getErr - r, err := downloadContent(ctx, mbh, item, driveID) + coll, err := NewCollection( + mbh, + nil, + nil, + "drive-id", + nil, + control.Options{ToggleFeatures: control.Toggles{}}, + CollectionScopeFolder, + true, + nil) + require.NoError(t, err, clues.ToCore(err)) + + r, err := coll.downloadContent(ctx, mbh, item, driveID) test.expect(t, r) test.expectErr(t, err, clues.ToCore(err)) }) diff --git a/src/internal/m365/onedrive/collections.go b/src/internal/m365/onedrive/collections.go index 7122a2361..cfc0e2ccf 100644 --- a/src/internal/m365/onedrive/collections.go +++ b/src/internal/m365/onedrive/collections.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "strings" + "time" "github.com/alcionai/clues" "github.com/microsoftgraph/msgraph-sdk-go/models" @@ -63,6 +64,9 @@ type Collections struct { NumItems int NumFiles int NumContainers int + + // drive ID -> url cache instance + driveURLCache map[string]*urlCache } func NewCollections( @@ -79,6 +83,7 @@ func NewCollections( CollectionMap: map[string]map[string]*Collection{}, statusUpdater: statusUpdater, ctrl: ctrlOpts, + driveURLCache: map[string]*urlCache{}, } } @@ -255,7 +260,8 @@ func (c *Collections) Get( // Drive ID -> delta URL for drive deltaURLs = map[string]string{} // Drive ID -> folder ID -> folder path - folderPaths = map[string]map[string]string{} + folderPaths = map[string]map[string]string{} + numPrevItems = 0 ) for _, d := range drives { @@ -370,13 +376,28 @@ func (c *Collections) Get( c.statusUpdater, c.ctrl, CollectionScopeUnknown, - true) + true, + nil) if err != nil { return nil, false, clues.Wrap(err, "making collection").WithClues(ictx) } c.CollectionMap[driveID][fldID] = col } + + numDriveItems := c.NumItems - numPrevItems + numPrevItems += numDriveItems + + // Only create a drive cache if there are less than 300k items in the drive. + // TODO: Tune this number. Delta query for 300k items takes ~20 mins. + if numDriveItems < 300*1000 { + logger.Ctx(ictx).Info("adding url cache for drive ", driveID) + + err = c.addURLCacheToDriveCollections(ictx, driveID, errs) + if err != nil { + return nil, err + } + } } observe.Message(ctx, fmt.Sprintf("Discovered %d items to backup", c.NumItems)) @@ -405,7 +426,8 @@ func (c *Collections) Get( c.statusUpdater, c.ctrl, CollectionScopeUnknown, - true) + true, + nil) if err != nil { return nil, false, clues.Wrap(err, "making drive tombstone").WithClues(ctx) } @@ -438,6 +460,32 @@ func (c *Collections) Get( return collections, canUsePreviousBackup, nil } +func (c *Collections) addURLCacheToDriveCollections( + ctx context.Context, + driveID string, + errs *fault.Bus, +) error { + uc, err := newURLCache( + driveID, + 1*time.Hour, // TODO: Add const + errs, + c.handler.ItemPager(driveID, "", api.DriveItemSelectDefault())) + if err != nil { + return err + } + + c.driveURLCache[driveID] = uc + + // Set the URL cache for all collections in this drive + for _, driveColls := range c.CollectionMap { + for _, coll := range driveColls { + coll.cache = uc + } + } + + return nil +} + func updateCollectionPaths( driveID, itemID string, cmap map[string]map[string]*Collection, @@ -557,7 +605,8 @@ func (c *Collections) handleDelete( c.ctrl, CollectionScopeUnknown, // DoNotMerge is not checked for deleted items. - false) + false, + nil) if err != nil { return clues.Wrap(err, "making collection").With( "drive_id", driveID, @@ -740,7 +789,8 @@ func (c *Collections) UpdateCollections( c.statusUpdater, c.ctrl, colScope, - invalidPrevDelta) + invalidPrevDelta, + nil) if err != nil { return clues.Stack(err).WithClues(ictx) }