diff --git a/src/internal/m365/onedrive/collections.go b/src/internal/m365/onedrive/collections.go index 2f270ad0a..0e4a2549c 100644 --- a/src/internal/m365/onedrive/collections.go +++ b/src/internal/m365/onedrive/collections.go @@ -323,6 +323,23 @@ func (c *Collections) Get( "num_deltas_entries", numDeltas, "delta_reset", delta.Reset) + numDriveItems := c.NumItems - numPrevItems + numPrevItems = c.NumItems + + // Attach an url cache + if numDriveItems < urlCacheDriveItemThreshold { + logger.Ctx(ictx).Info("adding url cache for drive") + + err = c.addURLCacheToDriveCollections( + ictx, + driveID, + prevDelta, + errs) + if err != nil { + return nil, false, err + } + } + // For both cases we don't need to do set difference on folder map if the // delta token was valid because we should see all the changes. if !delta.Reset { @@ -379,22 +396,6 @@ func (c *Collections) Get( c.CollectionMap[driveID][fldID] = col } - - numDriveItems := c.NumItems - numPrevItems - numPrevItems = c.NumItems - - // Only create a drive cache if there are less than 300k items in the drive. - if numDriveItems < urlCacheDriveItemThreshold { - logger.Ctx(ictx).Info("adding url cache for drive") - - err = c.addURLCacheToDriveCollections( - ictx, - driveID, - errs) - if err != nil { - return nil, false, err - } - } } observe.Message(ctx, fmt.Sprintf("Discovered %d items to backup", c.NumItems)) @@ -461,11 +462,12 @@ func (c *Collections) Get( // a drive. func (c *Collections) addURLCacheToDriveCollections( ctx context.Context, - driveID string, + driveID, prevDelta string, errs *fault.Bus, ) error { uc, err := newURLCache( driveID, + prevDelta, urlCacheRefreshInterval, c.handler.NewItemPager(driveID, "", api.DriveItemSelectDefault()), errs) diff --git a/src/internal/m365/onedrive/collections_test.go b/src/internal/m365/onedrive/collections_test.go index b3f410a87..93f76e147 100644 --- a/src/internal/m365/onedrive/collections_test.go +++ b/src/internal/m365/onedrive/collections_test.go @@ -2742,6 +2742,7 @@ func (suite *OneDriveCollectionsUnitSuite) TestAddURLCacheToDriveCollections() { err := c.addURLCacheToDriveCollections( ctx, driveID, + "", fault.New(true)) require.NoError(t, err, clues.ToCore(err)) diff --git a/src/internal/m365/onedrive/url_cache.go b/src/internal/m365/onedrive/url_cache.go index 2f585c223..1c7f2d93c 100644 --- a/src/internal/m365/onedrive/url_cache.go +++ b/src/internal/m365/onedrive/url_cache.go @@ -37,6 +37,7 @@ var _ getItemPropertyer = &urlCache{} // urlCache caches download URLs for drive items type urlCache struct { driveID string + prevDelta string idToProps map[string]itemProps lastRefreshTime time.Time refreshInterval time.Duration @@ -53,7 +54,7 @@ type urlCache struct { // newURLache creates a new URL cache for the specified drive ID func newURLCache( - driveID string, + driveID, prevDelta string, refreshInterval time.Duration, itemPager api.DriveItemEnumerator, errs *fault.Bus, @@ -70,6 +71,7 @@ func newURLCache( idToProps: make(map[string]itemProps), lastRefreshTime: time.Time{}, driveID: driveID, + prevDelta: prevDelta, refreshInterval: refreshInterval, itemPager: itemPager, errs: errs, @@ -179,6 +181,8 @@ func (uc *urlCache) deltaQuery( ctx context.Context, ) error { logger.Ctx(ctx).Debug("starting delta query") + // Reset item pager to remove any previous state + uc.itemPager.Reset() _, _, _, err := collectItems( ctx, @@ -187,7 +191,7 @@ func (uc *urlCache) deltaQuery( "", uc.updateCache, map[string]string{}, - "", + uc.prevDelta, uc.errs) if err != nil { return clues.Wrap(err, "delta query") diff --git a/src/internal/m365/onedrive/url_cache_test.go b/src/internal/m365/onedrive/url_cache_test.go index 1632c165a..4b2e4e96a 100644 --- a/src/internal/m365/onedrive/url_cache_test.go +++ b/src/internal/m365/onedrive/url_cache_test.go @@ -1,6 +1,7 @@ package onedrive import ( + "context" "errors" "math/rand" "net/http" @@ -89,10 +90,38 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() { nfid := ptr.Val(newFolder.GetId()) + collectorFunc := func( + context.Context, + string, + string, + []models.DriveItemable, + map[string]string, + map[string]string, + map[string]struct{}, + map[string]map[string]string, + bool, + *fault.Bus, + ) error { + return nil + } + + // Get the previous delta to feed into url cache + prevDelta, _, _, err := collectItems( + ctx, + suite.ac.Drives().NewItemPager(driveID, "", api.DriveItemSelectDefault()), + suite.driveID, + "drive-name", + collectorFunc, + map[string]string{}, + "", + fault.New(true)) + require.NoError(t, err, clues.ToCore(err)) + require.NotNil(t, prevDelta.URL) + // Create a bunch of files in the new folder var items []models.DriveItemable - for i := 0; i < 10; i++ { + for i := 0; i < 5; i++ { newItemName := "test_url_cache_basic_" + dttm.FormatNow(dttm.SafeForTesting) item, err := ac.Drives().PostItemInContainer( @@ -110,12 +139,12 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() { } // Create a new URL cache with a long TTL - cache, err := newURLCache( + uc, err := newURLCache( suite.driveID, + prevDelta.URL, 1*time.Hour, driveItemPager, fault.New(true)) - require.NoError(t, err, clues.ToCore(err)) // Launch parallel requests to the cache, one per item @@ -127,11 +156,11 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() { defer wg.Done() // Read item from URL cache - props, err := cache.getItemProperties( + props, err := uc.getItemProperties( ctx, ptr.Val(items[i].GetId())) - require.NoError(t, err, clues.ToCore(err)) + require.NotNil(t, props) require.NotEmpty(t, props.downloadURL) require.Equal(t, false, props.isDeleted) @@ -145,15 +174,14 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() { props.downloadURL, nil, nil) - require.NoError(t, err, clues.ToCore(err)) require.Equal(t, http.StatusOK, resp.StatusCode) }(i) } wg.Wait() - // Validate that <= 1 delta queries were made - require.LessOrEqual(t, cache.deltaQueryCount, 1) + // Validate that <= 1 delta queries were made by url cache + require.LessOrEqual(t, uc.deltaQueryCount, 1) } type URLCacheUnitSuite struct { @@ -404,6 +432,7 @@ func (suite *URLCacheUnitSuite) TestGetItemProperties() { cache, err := newURLCache( driveID, + "", 1*time.Hour, itemPager, fault.New(true)) @@ -446,6 +475,7 @@ func (suite *URLCacheUnitSuite) TestNeedsRefresh() { cache, err := newURLCache( driveID, + "", refreshInterval, &mockItemPager{}, fault.New(true)) @@ -519,6 +549,7 @@ func (suite *URLCacheUnitSuite) TestNewURLCache() { t := suite.T() _, err := newURLCache( test.driveID, + "", test.refreshInt, test.itemPager, test.errors)