Compare commits
12 Commits
main
...
url_cache_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ed6d1066ed | ||
|
|
1d1603e0ad | ||
|
|
671fd5150d | ||
|
|
b68336a466 | ||
|
|
099651491d | ||
|
|
397a0a6aa3 | ||
|
|
34e1f393cb | ||
|
|
30e9439287 | ||
|
|
c13f70ecc7 | ||
|
|
01bb562bb5 | ||
|
|
c62bcc6d95 | ||
|
|
9bd1b5588a |
@ -84,6 +84,8 @@ type Collection struct {
|
|||||||
|
|
||||||
// should only be true if the old delta token expired
|
// should only be true if the old delta token expired
|
||||||
doNotMergeItems bool
|
doNotMergeItems bool
|
||||||
|
|
||||||
|
urlCache getItemPropertyer
|
||||||
}
|
}
|
||||||
|
|
||||||
func pathToLocation(p path.Path) (*path.Builder, error) {
|
func pathToLocation(p path.Path) (*path.Builder, error) {
|
||||||
@ -109,6 +111,7 @@ func NewCollection(
|
|||||||
ctrlOpts control.Options,
|
ctrlOpts control.Options,
|
||||||
colScope collectionScope,
|
colScope collectionScope,
|
||||||
doNotMergeItems bool,
|
doNotMergeItems bool,
|
||||||
|
urlCache getItemPropertyer,
|
||||||
) (*Collection, error) {
|
) (*Collection, error) {
|
||||||
// TODO(ashmrtn): If OneDrive switches to using folder IDs then this will need
|
// TODO(ashmrtn): If OneDrive switches to using folder IDs then this will need
|
||||||
// to be changed as we won't be able to extract path information from the
|
// to be changed as we won't be able to extract path information from the
|
||||||
@ -132,7 +135,8 @@ func NewCollection(
|
|||||||
statusUpdater,
|
statusUpdater,
|
||||||
ctrlOpts,
|
ctrlOpts,
|
||||||
colScope,
|
colScope,
|
||||||
doNotMergeItems)
|
doNotMergeItems,
|
||||||
|
urlCache)
|
||||||
|
|
||||||
c.locPath = locPath
|
c.locPath = locPath
|
||||||
c.prevLocPath = prevLocPath
|
c.prevLocPath = prevLocPath
|
||||||
@ -149,6 +153,7 @@ func newColl(
|
|||||||
ctrlOpts control.Options,
|
ctrlOpts control.Options,
|
||||||
colScope collectionScope,
|
colScope collectionScope,
|
||||||
doNotMergeItems bool,
|
doNotMergeItems bool,
|
||||||
|
urlCache getItemPropertyer,
|
||||||
) *Collection {
|
) *Collection {
|
||||||
c := &Collection{
|
c := &Collection{
|
||||||
handler: handler,
|
handler: handler,
|
||||||
@ -162,6 +167,7 @@ func newColl(
|
|||||||
state: data.StateOf(prevPath, currPath),
|
state: data.StateOf(prevPath, currPath),
|
||||||
scope: colScope,
|
scope: colScope,
|
||||||
doNotMergeItems: doNotMergeItems,
|
doNotMergeItems: doNotMergeItems,
|
||||||
|
urlCache: urlCache,
|
||||||
}
|
}
|
||||||
|
|
||||||
return c
|
return c
|
||||||
@ -267,7 +273,7 @@ func (oc *Collection) getDriveItemContent(
|
|||||||
el = errs.Local()
|
el = errs.Local()
|
||||||
)
|
)
|
||||||
|
|
||||||
itemData, err := downloadContent(ctx, oc.handler, item, oc.driveID)
|
itemData, err := downloadContent(ctx, oc.handler, oc.urlCache, item, oc.driveID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) {
|
if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) {
|
||||||
logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipMalware).Info("item flagged as malware")
|
logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipMalware).Info("item flagged as malware")
|
||||||
@ -317,12 +323,16 @@ type itemAndAPIGetter interface {
|
|||||||
// downloadContent attempts to fetch the item content. If the content url
|
// downloadContent attempts to fetch the item content. If the content url
|
||||||
// is expired (ie, returns a 401), it re-fetches the item to get a new download
|
// is expired (ie, returns a 401), it re-fetches the item to get a new download
|
||||||
// url and tries again.
|
// url and tries again.
|
||||||
func downloadContent(
|
func (oc *Collection) downloadContent(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
iaag itemAndAPIGetter,
|
iaag itemAndAPIGetter,
|
||||||
|
uc getItemPropertyer,
|
||||||
item models.DriveItemable,
|
item models.DriveItemable,
|
||||||
driveID string,
|
driveID string,
|
||||||
) (io.ReadCloser, error) {
|
) (io.ReadCloser, error) {
|
||||||
|
itemID := ptr.Val(item.GetId())
|
||||||
|
ctx = clues.Add(ctx, "item_id", itemID)
|
||||||
|
|
||||||
content, err := downloadItem(ctx, iaag, item)
|
content, err := downloadItem(ctx, iaag, item)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return content, nil
|
return content, nil
|
||||||
@ -332,8 +342,20 @@ func downloadContent(
|
|||||||
|
|
||||||
// Assume unauthorized requests are a sign of an expired jwt
|
// Assume unauthorized requests are a sign of an expired jwt
|
||||||
// token, and that we've overrun the available window to
|
// token, and that we've overrun the available window to
|
||||||
// download the actual file. Re-downloading the item will
|
// download the file. Get a fresh url from the cache and attempt to
|
||||||
// refresh that download url.
|
// download again.
|
||||||
|
content, err = readItemContents(ctx, iaag, uc, itemID)
|
||||||
|
if err == nil {
|
||||||
|
logger.Ctx(ctx).Debug("found item in url cache")
|
||||||
|
|
||||||
|
return content, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Consider cache errors(including deleted items) as cache misses. This is
|
||||||
|
// to preserve existing behavior. Fallback to refetching the item using the
|
||||||
|
// API.
|
||||||
|
logger.CtxErr(ctx, err).Info("url cache miss: refetching from API")
|
||||||
|
|
||||||
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
|
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, clues.Wrap(err, "retrieving expired item")
|
return nil, clues.Wrap(err, "retrieving expired item")
|
||||||
@ -347,6 +369,42 @@ func downloadContent(
|
|||||||
return content, nil
|
return content, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// readItemContents fetches latest download URL from the cache and attempts to
|
||||||
|
// download the file using the new URL.
|
||||||
|
func readItemContents(
|
||||||
|
ctx context.Context,
|
||||||
|
iaag itemAndAPIGetter,
|
||||||
|
uc getItemPropertyer,
|
||||||
|
itemID string,
|
||||||
|
) (io.ReadCloser, error) {
|
||||||
|
if uc == nil {
|
||||||
|
return nil, clues.New("nil url cache")
|
||||||
|
}
|
||||||
|
|
||||||
|
props, err := uc.getItemProperties(ctx, itemID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle newly deleted items
|
||||||
|
if props.isDeleted {
|
||||||
|
logger.Ctx(ctx).Info("item deleted in cache")
|
||||||
|
return nil, graph.ErrDeletedInFlight
|
||||||
|
}
|
||||||
|
|
||||||
|
rc, err := downloadFile(ctx, iaag, props.downloadURL)
|
||||||
|
if graph.IsErrUnauthorized(err) {
|
||||||
|
logger.CtxErr(ctx, err).Info("stale item in cache")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.CtxErr(ctx, err).Info("item served from url cache")
|
||||||
|
return rc, nil
|
||||||
|
}
|
||||||
|
|
||||||
// populateItems iterates through items added to the collection
|
// populateItems iterates through items added to the collection
|
||||||
// and uses the collection `itemReader` to read the item
|
// and uses the collection `itemReader` to read the item
|
||||||
func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
|
func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
|
||||||
|
|||||||
@ -2,6 +2,7 @@ package onedrive
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -204,7 +205,10 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
|
|||||||
suite.testStatusUpdater(&wg, &collStatus),
|
suite.testStatusUpdater(&wg, &collStatus),
|
||||||
control.Options{ToggleFeatures: control.Toggles{}},
|
control.Options{ToggleFeatures: control.Toggles{}},
|
||||||
CollectionScopeFolder,
|
CollectionScopeFolder,
|
||||||
true)
|
true,
|
||||||
|
nil)
|
||||||
|
true,
|
||||||
|
nil)
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
require.NotNil(t, coll)
|
require.NotNil(t, coll)
|
||||||
assert.Equal(t, folderPath, coll.FullPath())
|
assert.Equal(t, folderPath, coll.FullPath())
|
||||||
@ -312,7 +316,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() {
|
|||||||
suite.testStatusUpdater(&wg, &collStatus),
|
suite.testStatusUpdater(&wg, &collStatus),
|
||||||
control.Options{ToggleFeatures: control.Toggles{}},
|
control.Options{ToggleFeatures: control.Toggles{}},
|
||||||
CollectionScopeFolder,
|
CollectionScopeFolder,
|
||||||
true)
|
true,
|
||||||
|
nil)
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
stubItem := odTD.NewStubDriveItem(
|
stubItem := odTD.NewStubDriveItem(
|
||||||
@ -388,7 +393,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadUnauthorizedErrorRetry()
|
|||||||
suite.testStatusUpdater(&wg, &collStatus),
|
suite.testStatusUpdater(&wg, &collStatus),
|
||||||
control.Options{ToggleFeatures: control.Toggles{}},
|
control.Options{ToggleFeatures: control.Toggles{}},
|
||||||
CollectionScopeFolder,
|
CollectionScopeFolder,
|
||||||
true)
|
true,
|
||||||
|
nil)
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
coll.Add(stubItem)
|
coll.Add(stubItem)
|
||||||
@ -442,7 +448,8 @@ func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTim
|
|||||||
suite.testStatusUpdater(&wg, &collStatus),
|
suite.testStatusUpdater(&wg, &collStatus),
|
||||||
control.Options{ToggleFeatures: control.Toggles{}},
|
control.Options{ToggleFeatures: control.Toggles{}},
|
||||||
CollectionScopeFolder,
|
CollectionScopeFolder,
|
||||||
true)
|
true,
|
||||||
|
nil)
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
mtime := time.Now().AddDate(0, -1, 0)
|
mtime := time.Now().AddDate(0, -1, 0)
|
||||||
@ -600,6 +607,19 @@ func (suite *GetDriveItemUnitTestSuite) TestGetDriveItem_error() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ getItemPropertyer = &mockURLCache{}
|
||||||
|
|
||||||
|
type mockURLCache struct {
|
||||||
|
Get func(ctx context.Context, itemID string) (itemProps, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (muc *mockURLCache) getItemProperties(
|
||||||
|
ctx context.Context,
|
||||||
|
itemID string,
|
||||||
|
) (itemProps, error) {
|
||||||
|
return muc.Get(ctx, itemID)
|
||||||
|
}
|
||||||
|
|
||||||
func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
|
func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
|
||||||
var (
|
var (
|
||||||
driveID string
|
driveID string
|
||||||
@ -611,6 +631,12 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
|
|||||||
|
|
||||||
itemWID.SetId(ptr.To("brainhooldy"))
|
itemWID.SetId(ptr.To("brainhooldy"))
|
||||||
|
|
||||||
|
m := &mockURLCache{
|
||||||
|
Get: func(ctx context.Context, itemID string) (itemProps, error) {
|
||||||
|
return itemProps{}, clues.Stack(assert.AnError)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
table := []struct {
|
table := []struct {
|
||||||
name string
|
name string
|
||||||
mgi mock.GetsItem
|
mgi mock.GetsItem
|
||||||
@ -619,6 +645,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
|
|||||||
getErr []error
|
getErr []error
|
||||||
expectErr require.ErrorAssertionFunc
|
expectErr require.ErrorAssertionFunc
|
||||||
expect require.ValueAssertionFunc
|
expect require.ValueAssertionFunc
|
||||||
|
muc *mockURLCache
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "good",
|
name: "good",
|
||||||
@ -627,6 +654,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
|
|||||||
getErr: []error{nil},
|
getErr: []error{nil},
|
||||||
expectErr: require.NoError,
|
expectErr: require.NoError,
|
||||||
expect: require.NotNil,
|
expect: require.NotNil,
|
||||||
|
muc: m,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "expired url redownloads",
|
name: "expired url redownloads",
|
||||||
@ -636,6 +664,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
|
|||||||
getErr: []error{errUnauth, nil},
|
getErr: []error{errUnauth, nil},
|
||||||
expectErr: require.NoError,
|
expectErr: require.NoError,
|
||||||
expect: require.NotNil,
|
expect: require.NotNil,
|
||||||
|
muc: m,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "immediate error",
|
name: "immediate error",
|
||||||
@ -643,6 +672,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
|
|||||||
getErr: []error{assert.AnError},
|
getErr: []error{assert.AnError},
|
||||||
expectErr: require.Error,
|
expectErr: require.Error,
|
||||||
expect: require.Nil,
|
expect: require.Nil,
|
||||||
|
muc: m,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "re-fetching the item fails",
|
name: "re-fetching the item fails",
|
||||||
@ -651,6 +681,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
|
|||||||
mgi: mock.GetsItem{Item: nil, Err: assert.AnError},
|
mgi: mock.GetsItem{Item: nil, Err: assert.AnError},
|
||||||
expectErr: require.Error,
|
expectErr: require.Error,
|
||||||
expect: require.Nil,
|
expect: require.Nil,
|
||||||
|
muc: m,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "expired url fails redownload",
|
name: "expired url fails redownload",
|
||||||
@ -660,6 +691,57 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
|
|||||||
getErr: []error{errUnauth, assert.AnError},
|
getErr: []error{errUnauth, assert.AnError},
|
||||||
expectErr: require.Error,
|
expectErr: require.Error,
|
||||||
expect: require.Nil,
|
expect: require.Nil,
|
||||||
|
muc: m,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "url refreshed from cache",
|
||||||
|
mgi: mock.GetsItem{Item: itemWID, Err: nil},
|
||||||
|
itemInfo: details.ItemInfo{},
|
||||||
|
respBody: []io.ReadCloser{nil, iorc},
|
||||||
|
getErr: []error{errUnauth, nil},
|
||||||
|
expectErr: require.NoError,
|
||||||
|
expect: require.NotNil,
|
||||||
|
muc: &mockURLCache{
|
||||||
|
Get: func(ctx context.Context, itemID string) (itemProps, error) {
|
||||||
|
return itemProps{
|
||||||
|
downloadURL: "http://example.com",
|
||||||
|
isDeleted: false,
|
||||||
|
},
|
||||||
|
nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "url refreshed from cache but item deleted",
|
||||||
|
mgi: mock.GetsItem{Item: itemWID, Err: graph.ErrDeletedInFlight},
|
||||||
|
itemInfo: details.ItemInfo{},
|
||||||
|
respBody: []io.ReadCloser{nil, nil, nil},
|
||||||
|
getErr: []error{errUnauth, graph.ErrDeletedInFlight, graph.ErrDeletedInFlight},
|
||||||
|
expectErr: require.Error,
|
||||||
|
expect: require.Nil,
|
||||||
|
muc: &mockURLCache{
|
||||||
|
Get: func(ctx context.Context, itemID string) (itemProps, error) {
|
||||||
|
return itemProps{
|
||||||
|
downloadURL: "http://example.com",
|
||||||
|
isDeleted: true,
|
||||||
|
},
|
||||||
|
nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "fallback to item fetch on any cache error",
|
||||||
|
mgi: mock.GetsItem{Item: itemWID, Err: nil},
|
||||||
|
itemInfo: details.ItemInfo{},
|
||||||
|
respBody: []io.ReadCloser{nil, iorc},
|
||||||
|
getErr: []error{errUnauth, nil},
|
||||||
|
expectErr: require.NoError,
|
||||||
|
expect: require.NotNil,
|
||||||
|
muc: &mockURLCache{
|
||||||
|
Get: func(ctx context.Context, itemID string) (itemProps, error) {
|
||||||
|
return itemProps{}, assert.AnError
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
@ -685,7 +767,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
|
|||||||
mbh.GetResps = resps
|
mbh.GetResps = resps
|
||||||
mbh.GetErrs = test.getErr
|
mbh.GetErrs = test.getErr
|
||||||
|
|
||||||
r, err := downloadContent(ctx, mbh, item, driveID)
|
r, err := downloadContent(ctx, mbh, test.muc, item, driveID)
|
||||||
test.expect(t, r)
|
test.expect(t, r)
|
||||||
test.expectErr(t, err, clues.ToCore(err))
|
test.expectErr(t, err, clues.ToCore(err))
|
||||||
})
|
})
|
||||||
|
|||||||
@ -63,6 +63,9 @@ type Collections struct {
|
|||||||
NumItems int
|
NumItems int
|
||||||
NumFiles int
|
NumFiles int
|
||||||
NumContainers int
|
NumContainers int
|
||||||
|
|
||||||
|
// drive ID -> url cache instance
|
||||||
|
driveURLCache map[string]*urlCache
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCollections(
|
func NewCollections(
|
||||||
@ -79,6 +82,7 @@ func NewCollections(
|
|||||||
CollectionMap: map[string]map[string]*Collection{},
|
CollectionMap: map[string]map[string]*Collection{},
|
||||||
statusUpdater: statusUpdater,
|
statusUpdater: statusUpdater,
|
||||||
ctrl: ctrlOpts,
|
ctrl: ctrlOpts,
|
||||||
|
driveURLCache: map[string]*urlCache{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -256,6 +260,7 @@ func (c *Collections) Get(
|
|||||||
deltaURLs = map[string]string{}
|
deltaURLs = map[string]string{}
|
||||||
// Drive ID -> folder ID -> folder path
|
// Drive ID -> folder ID -> folder path
|
||||||
folderPaths = map[string]map[string]string{}
|
folderPaths = map[string]map[string]string{}
|
||||||
|
numPrevItems = 0
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, d := range drives {
|
for _, d := range drives {
|
||||||
@ -322,6 +327,24 @@ func (c *Collections) Get(
|
|||||||
"num_deltas_entries", numDeltas,
|
"num_deltas_entries", numDeltas,
|
||||||
"delta_reset", delta.Reset)
|
"delta_reset", delta.Reset)
|
||||||
|
|
||||||
|
numDriveItems := c.NumItems - numPrevItems
|
||||||
|
numPrevItems = c.NumItems
|
||||||
|
|
||||||
|
// Only create a drive cache if there are less than 300k items in the drive.
|
||||||
|
if numDriveItems < urlCacheDriveItemThreshold {
|
||||||
|
logger.Ctx(ictx).Info("adding url cache for drive ", driveID)
|
||||||
|
logger.Ctx(ictx).Info("url cache: new items in drive ", numDriveItems)
|
||||||
|
|
||||||
|
err = c.addURLCacheToDriveCollections(
|
||||||
|
ictx,
|
||||||
|
driveID,
|
||||||
|
prevDelta,
|
||||||
|
errs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// For both cases we don't need to do set difference on folder map if the
|
// For both cases we don't need to do set difference on folder map if the
|
||||||
// delta token was valid because we should see all the changes.
|
// delta token was valid because we should see all the changes.
|
||||||
if !delta.Reset {
|
if !delta.Reset {
|
||||||
@ -370,13 +393,31 @@ func (c *Collections) Get(
|
|||||||
c.statusUpdater,
|
c.statusUpdater,
|
||||||
c.ctrl,
|
c.ctrl,
|
||||||
CollectionScopeUnknown,
|
CollectionScopeUnknown,
|
||||||
true)
|
true,
|
||||||
|
nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, clues.Wrap(err, "making collection").WithClues(ictx)
|
return nil, false, clues.Wrap(err, "making collection").WithClues(ictx)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.CollectionMap[driveID][fldID] = col
|
c.CollectionMap[driveID][fldID] = col
|
||||||
}
|
}
|
||||||
|
|
||||||
|
numDriveItems := c.NumItems - numPrevItems
|
||||||
|
numPrevItems = c.NumItems
|
||||||
|
|
||||||
|
// Only create a drive cache if there are less than 300k items in the drive.
|
||||||
|
if numDriveItems < urlCacheDriveItemThreshold {
|
||||||
|
logger.Ctx(ictx).Info("adding url cache for drive")
|
||||||
|
|
||||||
|
err = c.addURLCacheToDriveCollections(
|
||||||
|
ictx,
|
||||||
|
driveID,
|
||||||
|
prevDelta,
|
||||||
|
errs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
observe.Message(ctx, fmt.Sprintf("Discovered %d items to backup", c.NumItems))
|
observe.Message(ctx, fmt.Sprintf("Discovered %d items to backup", c.NumItems))
|
||||||
@ -405,7 +446,8 @@ func (c *Collections) Get(
|
|||||||
c.statusUpdater,
|
c.statusUpdater,
|
||||||
c.ctrl,
|
c.ctrl,
|
||||||
CollectionScopeUnknown,
|
CollectionScopeUnknown,
|
||||||
true)
|
true,
|
||||||
|
nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, clues.Wrap(err, "making drive tombstone").WithClues(ctx)
|
return nil, false, clues.Wrap(err, "making drive tombstone").WithClues(ctx)
|
||||||
}
|
}
|
||||||
@ -438,6 +480,33 @@ func (c *Collections) Get(
|
|||||||
return collections, canUsePreviousBackup, nil
|
return collections, canUsePreviousBackup, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// addURLCacheToDriveCollections adds an URL cache to all collections belonging to
|
||||||
|
// a drive.
|
||||||
|
func (c *Collections) addURLCacheToDriveCollections(
|
||||||
|
ctx context.Context,
|
||||||
|
driveID, prevDelta string,
|
||||||
|
errs *fault.Bus,
|
||||||
|
) error {
|
||||||
|
uc, err := newURLCache(
|
||||||
|
driveID,
|
||||||
|
prevDelta,
|
||||||
|
urlCacheRefreshInterval,
|
||||||
|
c.handler.NewItemPager(driveID, "", api.DriveItemSelectDefault()),
|
||||||
|
errs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the URL cache for all collections in this drive
|
||||||
|
for _, driveColls := range c.CollectionMap {
|
||||||
|
for _, coll := range driveColls {
|
||||||
|
coll.urlCache = uc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func updateCollectionPaths(
|
func updateCollectionPaths(
|
||||||
driveID, itemID string,
|
driveID, itemID string,
|
||||||
cmap map[string]map[string]*Collection,
|
cmap map[string]map[string]*Collection,
|
||||||
@ -557,7 +626,8 @@ func (c *Collections) handleDelete(
|
|||||||
c.ctrl,
|
c.ctrl,
|
||||||
CollectionScopeUnknown,
|
CollectionScopeUnknown,
|
||||||
// DoNotMerge is not checked for deleted items.
|
// DoNotMerge is not checked for deleted items.
|
||||||
false)
|
false,
|
||||||
|
nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return clues.Wrap(err, "making collection").With(
|
return clues.Wrap(err, "making collection").With(
|
||||||
"drive_id", driveID,
|
"drive_id", driveID,
|
||||||
@ -740,7 +810,8 @@ func (c *Collections) UpdateCollections(
|
|||||||
c.statusUpdater,
|
c.statusUpdater,
|
||||||
c.ctrl,
|
c.ctrl,
|
||||||
colScope,
|
colScope,
|
||||||
invalidPrevDelta)
|
invalidPrevDelta,
|
||||||
|
nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return clues.Stack(err).WithClues(ictx)
|
return clues.Stack(err).WithClues(ictx)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,6 +2,7 @@ package onedrive
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/alcionai/clues"
|
"github.com/alcionai/clues"
|
||||||
@ -2678,3 +2679,83 @@ func (suite *OneDriveCollectionsUnitSuite) TestCollectItems() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *OneDriveCollectionsUnitSuite) TestURLCacheIntegration() {
|
||||||
|
driveID := "test-drive"
|
||||||
|
collCount := 3
|
||||||
|
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
items []deltaPagerResult
|
||||||
|
deltaURL string
|
||||||
|
prevDeltaSuccess bool
|
||||||
|
prevDelta string
|
||||||
|
err error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "cache is attached",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range table {
|
||||||
|
suite.Run(test.name, func() {
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
|
ctx, flush := tester.NewContext(t)
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
mbh := mock.DefaultOneDriveBH()
|
||||||
|
mbh.ItemPagerV = map[string]api.DriveItemEnumerator{}
|
||||||
|
|
||||||
|
c := NewCollections(
|
||||||
|
mbh,
|
||||||
|
"test-tenant",
|
||||||
|
"test-user",
|
||||||
|
testFolderMatcher{(&selectors.OneDriveBackup{}).Folders(selectors.Any())[0]},
|
||||||
|
nil,
|
||||||
|
control.Options{ToggleFeatures: control.Toggles{}})
|
||||||
|
|
||||||
|
if _, ok := c.CollectionMap[driveID]; !ok {
|
||||||
|
c.CollectionMap[driveID] = map[string]*Collection{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a few collections
|
||||||
|
for i := 0; i < collCount; i++ {
|
||||||
|
coll, err := NewCollection(
|
||||||
|
&itemBackupHandler{api.Drives{}},
|
||||||
|
nil,
|
||||||
|
nil,
|
||||||
|
driveID,
|
||||||
|
nil,
|
||||||
|
control.Options{ToggleFeatures: control.Toggles{}},
|
||||||
|
CollectionScopeFolder,
|
||||||
|
true,
|
||||||
|
nil)
|
||||||
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
|
c.CollectionMap[driveID][strconv.Itoa(i)] = coll
|
||||||
|
require.Equal(t, nil, coll.urlCache, "cache not nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.addURLCacheToDriveCollections(
|
||||||
|
ctx,
|
||||||
|
driveID,
|
||||||
|
"",
|
||||||
|
fault.New(true))
|
||||||
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
|
// Check that all collections have the same cache instance attached
|
||||||
|
// to them
|
||||||
|
var uc *urlCache
|
||||||
|
for _, driveColls := range c.CollectionMap {
|
||||||
|
for _, coll := range driveColls {
|
||||||
|
require.NotNil(t, coll.urlCache, "cache is nil")
|
||||||
|
if uc == nil {
|
||||||
|
uc = coll.urlCache.(*urlCache)
|
||||||
|
} else {
|
||||||
|
require.Equal(t, uc, coll.urlCache, "cache not equal")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -77,7 +77,7 @@ func downloadFile(
|
|||||||
return nil, clues.New("malware detected").Label(graph.LabelsMalware)
|
return nil, clues.New("malware detected").Label(graph.LabelsMalware)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (resp.StatusCode / 100) != 2 {
|
if resp != nil && (resp.StatusCode/100) != 2 {
|
||||||
// upstream error checks can compare the status with
|
// upstream error checks can compare the status with
|
||||||
// clues.HasLabel(err, graph.LabelStatus(http.KnownStatusCode))
|
// clues.HasLabel(err, graph.LabelStatus(http.KnownStatusCode))
|
||||||
return nil, clues.
|
return nil, clues.
|
||||||
|
|||||||
@ -15,14 +15,24 @@ import (
|
|||||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type getItemPropertyer interface {
|
||||||
|
getItemProperties(
|
||||||
|
ctx context.Context,
|
||||||
|
itemID string,
|
||||||
|
) (itemProps, error)
|
||||||
|
}
|
||||||
|
|
||||||
type itemProps struct {
|
type itemProps struct {
|
||||||
downloadURL string
|
downloadURL string
|
||||||
isDeleted bool
|
isDeleted bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ getItemPropertyer = &urlCache{}
|
||||||
|
|
||||||
// urlCache caches download URLs for drive items
|
// urlCache caches download URLs for drive items
|
||||||
type urlCache struct {
|
type urlCache struct {
|
||||||
driveID string
|
driveID string
|
||||||
|
prevDelta string
|
||||||
idToProps map[string]itemProps
|
idToProps map[string]itemProps
|
||||||
lastRefreshTime time.Time
|
lastRefreshTime time.Time
|
||||||
refreshInterval time.Duration
|
refreshInterval time.Duration
|
||||||
@ -39,7 +49,7 @@ type urlCache struct {
|
|||||||
|
|
||||||
// newURLache creates a new URL cache for the specified drive ID
|
// newURLache creates a new URL cache for the specified drive ID
|
||||||
func newURLCache(
|
func newURLCache(
|
||||||
driveID string,
|
driveID, prevDelta string,
|
||||||
refreshInterval time.Duration,
|
refreshInterval time.Duration,
|
||||||
itemPager api.DriveItemEnumerator,
|
itemPager api.DriveItemEnumerator,
|
||||||
errs *fault.Bus,
|
errs *fault.Bus,
|
||||||
@ -56,6 +66,7 @@ func newURLCache(
|
|||||||
idToProps: make(map[string]itemProps),
|
idToProps: make(map[string]itemProps),
|
||||||
lastRefreshTime: time.Time{},
|
lastRefreshTime: time.Time{},
|
||||||
driveID: driveID,
|
driveID: driveID,
|
||||||
|
prevDelta: prevDelta,
|
||||||
refreshInterval: refreshInterval,
|
refreshInterval: refreshInterval,
|
||||||
itemPager: itemPager,
|
itemPager: itemPager,
|
||||||
errs: errs,
|
errs: errs,
|
||||||
@ -166,6 +177,9 @@ func (uc *urlCache) deltaQuery(
|
|||||||
) error {
|
) error {
|
||||||
logger.Ctx(ctx).Debug("starting delta query")
|
logger.Ctx(ctx).Debug("starting delta query")
|
||||||
|
|
||||||
|
// Reset item pager to remove any previous state
|
||||||
|
uc.itemPager.Reset()
|
||||||
|
|
||||||
_, _, _, err := collectItems(
|
_, _, _, err := collectItems(
|
||||||
ctx,
|
ctx,
|
||||||
uc.itemPager,
|
uc.itemPager,
|
||||||
@ -173,7 +187,7 @@ func (uc *urlCache) deltaQuery(
|
|||||||
"",
|
"",
|
||||||
uc.updateCache,
|
uc.updateCache,
|
||||||
map[string]string{},
|
map[string]string{},
|
||||||
"",
|
uc.prevDelta,
|
||||||
uc.errs)
|
uc.errs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return clues.Wrap(err, "delta query")
|
return clues.Wrap(err, "delta query")
|
||||||
|
|||||||
@ -112,15 +112,13 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
|
|||||||
// Create a new URL cache with a long TTL
|
// Create a new URL cache with a long TTL
|
||||||
cache, err := newURLCache(
|
cache, err := newURLCache(
|
||||||
suite.driveID,
|
suite.driveID,
|
||||||
|
"",
|
||||||
1*time.Hour,
|
1*time.Hour,
|
||||||
driveItemPager,
|
driveItemPager,
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
|
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
err = cache.refreshCache(ctx)
|
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
|
||||||
|
|
||||||
// Launch parallel requests to the cache, one per item
|
// Launch parallel requests to the cache, one per item
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := 0; i < len(items); i++ {
|
for i := 0; i < len(items); i++ {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user