Cherry pick cache integration changes with dummy url cache
This commit is contained in:
parent
45a70fe1c4
commit
9bd1b5588a
@ -84,6 +84,8 @@ type Collection struct {
|
||||
|
||||
// should only be true if the old delta token expired
|
||||
doNotMergeItems bool
|
||||
|
||||
cache *urlCache
|
||||
}
|
||||
|
||||
func pathToLocation(p path.Path) (*path.Builder, error) {
|
||||
@ -109,6 +111,7 @@ func NewCollection(
|
||||
ctrlOpts control.Options,
|
||||
colScope collectionScope,
|
||||
doNotMergeItems bool,
|
||||
cache *urlCache,
|
||||
) (*Collection, error) {
|
||||
// TODO(ashmrtn): If OneDrive switches to using folder IDs then this will need
|
||||
// to be changed as we won't be able to extract path information from the
|
||||
@ -132,7 +135,8 @@ func NewCollection(
|
||||
statusUpdater,
|
||||
ctrlOpts,
|
||||
colScope,
|
||||
doNotMergeItems)
|
||||
doNotMergeItems,
|
||||
cache)
|
||||
|
||||
c.locPath = locPath
|
||||
c.prevLocPath = prevLocPath
|
||||
@ -149,6 +153,7 @@ func newColl(
|
||||
ctrlOpts control.Options,
|
||||
colScope collectionScope,
|
||||
doNotMergeItems bool,
|
||||
cache *urlCache,
|
||||
) *Collection {
|
||||
c := &Collection{
|
||||
handler: handler,
|
||||
@ -162,6 +167,7 @@ func newColl(
|
||||
state: data.StateOf(prevPath, currPath),
|
||||
scope: colScope,
|
||||
doNotMergeItems: doNotMergeItems,
|
||||
cache: cache,
|
||||
}
|
||||
|
||||
return c
|
||||
@ -267,7 +273,7 @@ func (oc *Collection) getDriveItemContent(
|
||||
el = errs.Local()
|
||||
)
|
||||
|
||||
itemData, err := downloadContent(ctx, oc.handler, item, oc.driveID)
|
||||
itemData, err := oc.downloadContent(ctx, oc.handler, item, oc.driveID)
|
||||
if err != nil {
|
||||
if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) {
|
||||
logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipMalware).Info("item flagged as malware")
|
||||
@ -317,12 +323,15 @@ type itemAndAPIGetter interface {
|
||||
// downloadContent attempts to fetch the item content. If the content url
|
||||
// is expired (ie, returns a 401), it re-fetches the item to get a new download
|
||||
// url and tries again.
|
||||
func downloadContent(
|
||||
func (oc *Collection) downloadContent(
|
||||
ctx context.Context,
|
||||
iaag itemAndAPIGetter,
|
||||
item models.DriveItemable,
|
||||
driveID string,
|
||||
) (io.ReadCloser, error) {
|
||||
itemID := ptr.Val(item.GetId())
|
||||
ctx = clues.Add(ctx, "item_id", itemID)
|
||||
|
||||
content, err := downloadItem(ctx, iaag, item)
|
||||
if err == nil {
|
||||
return content, nil
|
||||
@ -332,8 +341,20 @@ func downloadContent(
|
||||
|
||||
// Assume unauthorized requests are a sign of an expired jwt
|
||||
// token, and that we've overrun the available window to
|
||||
// download the actual file. Re-downloading the item will
|
||||
// refresh that download url.
|
||||
// download the file. Get a fresh url from the cache and attempt to
|
||||
// download again.
|
||||
content, err = oc.readFromCache(ctx, iaag, itemID)
|
||||
if err == nil {
|
||||
logger.Ctx(ctx).Debug("found item in cache", itemID)
|
||||
|
||||
return content, nil
|
||||
}
|
||||
|
||||
// Consider cache errors(including deleted items) as cache misses. This is
|
||||
// to preserve existing behavior. Fallback to refetching the item using the
|
||||
// API.
|
||||
logger.CtxErr(ctx, err).Info("cache miss. refetching from API")
|
||||
|
||||
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "retrieving expired item")
|
||||
@ -347,6 +368,41 @@ func downloadContent(
|
||||
return content, nil
|
||||
}
|
||||
|
||||
// readFromCache fetches latest download URL from the cache and attempts to
|
||||
// download the file using the new URL.
|
||||
func (oc *Collection) readFromCache(
|
||||
ctx context.Context,
|
||||
iaag itemAndAPIGetter,
|
||||
itemID string,
|
||||
) (io.ReadCloser, error) {
|
||||
if oc.cache == nil {
|
||||
return nil, clues.New("nil url cache")
|
||||
}
|
||||
|
||||
props, err := oc.cache.getItemProperties(ctx, itemID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Handle newly deleted items
|
||||
if props.isDeleted {
|
||||
logger.Ctx(ctx).Info("item deleted in cache")
|
||||
|
||||
return nil, graph.ErrDeletedInFlight
|
||||
}
|
||||
|
||||
rc, err := downloadFile(ctx, iaag, props.downloadURL)
|
||||
if graph.IsErrUnauthorized(err) {
|
||||
logger.CtxErr(ctx, err).Info("stale item in cache")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
// populateItems iterates through items added to the collection
|
||||
// and uses the collection `itemReader` to read the item
|
||||
func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
|
||||
|
||||
@ -204,7 +204,8 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
|
||||
suite.testStatusUpdater(&wg, &collStatus),
|
||||
control.Options{ToggleFeatures: control.Toggles{}},
|
||||
CollectionScopeFolder,
|
||||
true)
|
||||
true,
|
||||
nil)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
require.NotNil(t, coll)
|
||||
assert.Equal(t, folderPath, coll.FullPath())
|
||||
@ -312,7 +313,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() {
|
||||
suite.testStatusUpdater(&wg, &collStatus),
|
||||
control.Options{ToggleFeatures: control.Toggles{}},
|
||||
CollectionScopeFolder,
|
||||
true)
|
||||
true,
|
||||
nil)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
stubItem := odTD.NewStubDriveItem(
|
||||
@ -388,7 +390,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadUnauthorizedErrorRetry()
|
||||
suite.testStatusUpdater(&wg, &collStatus),
|
||||
control.Options{ToggleFeatures: control.Toggles{}},
|
||||
CollectionScopeFolder,
|
||||
true)
|
||||
true,
|
||||
nil)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
coll.Add(stubItem)
|
||||
@ -442,7 +445,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTim
|
||||
suite.testStatusUpdater(&wg, &collStatus),
|
||||
control.Options{ToggleFeatures: control.Toggles{}},
|
||||
CollectionScopeFolder,
|
||||
true)
|
||||
true,
|
||||
nil)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
mtime := time.Now().AddDate(0, -1, 0)
|
||||
@ -685,7 +689,19 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
|
||||
mbh.GetResps = resps
|
||||
mbh.GetErrs = test.getErr
|
||||
|
||||
r, err := downloadContent(ctx, mbh, item, driveID)
|
||||
coll, err := NewCollection(
|
||||
mbh,
|
||||
nil,
|
||||
nil,
|
||||
"drive-id",
|
||||
nil,
|
||||
control.Options{ToggleFeatures: control.Toggles{}},
|
||||
CollectionScopeFolder,
|
||||
true,
|
||||
nil)
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
r, err := coll.downloadContent(ctx, mbh, item, driveID)
|
||||
test.expect(t, r)
|
||||
test.expectErr(t, err, clues.ToCore(err))
|
||||
})
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||
@ -63,6 +64,9 @@ type Collections struct {
|
||||
NumItems int
|
||||
NumFiles int
|
||||
NumContainers int
|
||||
|
||||
// drive ID -> url cache instance
|
||||
driveURLCache map[string]*urlCache
|
||||
}
|
||||
|
||||
func NewCollections(
|
||||
@ -79,6 +83,7 @@ func NewCollections(
|
||||
CollectionMap: map[string]map[string]*Collection{},
|
||||
statusUpdater: statusUpdater,
|
||||
ctrl: ctrlOpts,
|
||||
driveURLCache: map[string]*urlCache{},
|
||||
}
|
||||
}
|
||||
|
||||
@ -256,6 +261,7 @@ func (c *Collections) Get(
|
||||
deltaURLs = map[string]string{}
|
||||
// Drive ID -> folder ID -> folder path
|
||||
folderPaths = map[string]map[string]string{}
|
||||
numPrevItems = 0
|
||||
)
|
||||
|
||||
for _, d := range drives {
|
||||
@ -370,13 +376,28 @@ func (c *Collections) Get(
|
||||
c.statusUpdater,
|
||||
c.ctrl,
|
||||
CollectionScopeUnknown,
|
||||
true)
|
||||
true,
|
||||
nil)
|
||||
if err != nil {
|
||||
return nil, false, clues.Wrap(err, "making collection").WithClues(ictx)
|
||||
}
|
||||
|
||||
c.CollectionMap[driveID][fldID] = col
|
||||
}
|
||||
|
||||
numDriveItems := c.NumItems - numPrevItems
|
||||
numPrevItems += numDriveItems
|
||||
|
||||
// Only create a drive cache if there are less than 300k items in the drive.
|
||||
// TODO: Tune this number. Delta query for 300k items takes ~20 mins.
|
||||
if numDriveItems < 300*1000 {
|
||||
logger.Ctx(ictx).Info("adding url cache for drive ", driveID)
|
||||
|
||||
err = c.addURLCacheToDriveCollections(ictx, driveID, errs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
observe.Message(ctx, fmt.Sprintf("Discovered %d items to backup", c.NumItems))
|
||||
@ -405,7 +426,8 @@ func (c *Collections) Get(
|
||||
c.statusUpdater,
|
||||
c.ctrl,
|
||||
CollectionScopeUnknown,
|
||||
true)
|
||||
true,
|
||||
nil)
|
||||
if err != nil {
|
||||
return nil, false, clues.Wrap(err, "making drive tombstone").WithClues(ctx)
|
||||
}
|
||||
@ -438,6 +460,32 @@ func (c *Collections) Get(
|
||||
return collections, canUsePreviousBackup, nil
|
||||
}
|
||||
|
||||
func (c *Collections) addURLCacheToDriveCollections(
|
||||
ctx context.Context,
|
||||
driveID string,
|
||||
errs *fault.Bus,
|
||||
) error {
|
||||
uc, err := newURLCache(
|
||||
driveID,
|
||||
1*time.Hour, // TODO: Add const
|
||||
errs,
|
||||
c.handler.ItemPager(driveID, "", api.DriveItemSelectDefault()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.driveURLCache[driveID] = uc
|
||||
|
||||
// Set the URL cache for all collections in this drive
|
||||
for _, driveColls := range c.CollectionMap {
|
||||
for _, coll := range driveColls {
|
||||
coll.cache = uc
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateCollectionPaths(
|
||||
driveID, itemID string,
|
||||
cmap map[string]map[string]*Collection,
|
||||
@ -557,7 +605,8 @@ func (c *Collections) handleDelete(
|
||||
c.ctrl,
|
||||
CollectionScopeUnknown,
|
||||
// DoNotMerge is not checked for deleted items.
|
||||
false)
|
||||
false,
|
||||
nil)
|
||||
if err != nil {
|
||||
return clues.Wrap(err, "making collection").With(
|
||||
"drive_id", driveID,
|
||||
@ -740,7 +789,8 @@ func (c *Collections) UpdateCollections(
|
||||
c.statusUpdater,
|
||||
c.ctrl,
|
||||
colScope,
|
||||
invalidPrevDelta)
|
||||
invalidPrevDelta,
|
||||
nil)
|
||||
if err != nil {
|
||||
return clues.Stack(err).WithClues(ictx)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user