Address review feedback
This commit is contained in:
parent
099651491d
commit
b68336a466
@ -85,7 +85,7 @@ type Collection struct {
|
|||||||
// should only be true if the old delta token expired
|
// should only be true if the old delta token expired
|
||||||
doNotMergeItems bool
|
doNotMergeItems bool
|
||||||
|
|
||||||
cache urlCacher
|
urlCache getItemPropertyer
|
||||||
}
|
}
|
||||||
|
|
||||||
func pathToLocation(p path.Path) (*path.Builder, error) {
|
func pathToLocation(p path.Path) (*path.Builder, error) {
|
||||||
@ -111,7 +111,7 @@ func NewCollection(
|
|||||||
ctrlOpts control.Options,
|
ctrlOpts control.Options,
|
||||||
colScope collectionScope,
|
colScope collectionScope,
|
||||||
doNotMergeItems bool,
|
doNotMergeItems bool,
|
||||||
cache urlCacher,
|
urlCache getItemPropertyer,
|
||||||
) (*Collection, error) {
|
) (*Collection, error) {
|
||||||
// TODO(ashmrtn): If OneDrive switches to using folder IDs then this will need
|
// TODO(ashmrtn): If OneDrive switches to using folder IDs then this will need
|
||||||
// to be changed as we won't be able to extract path information from the
|
// to be changed as we won't be able to extract path information from the
|
||||||
@ -136,7 +136,7 @@ func NewCollection(
|
|||||||
ctrlOpts,
|
ctrlOpts,
|
||||||
colScope,
|
colScope,
|
||||||
doNotMergeItems,
|
doNotMergeItems,
|
||||||
cache)
|
urlCache)
|
||||||
|
|
||||||
c.locPath = locPath
|
c.locPath = locPath
|
||||||
c.prevLocPath = prevLocPath
|
c.prevLocPath = prevLocPath
|
||||||
@ -153,7 +153,7 @@ func newColl(
|
|||||||
ctrlOpts control.Options,
|
ctrlOpts control.Options,
|
||||||
colScope collectionScope,
|
colScope collectionScope,
|
||||||
doNotMergeItems bool,
|
doNotMergeItems bool,
|
||||||
cache urlCacher,
|
urlCache getItemPropertyer,
|
||||||
) *Collection {
|
) *Collection {
|
||||||
c := &Collection{
|
c := &Collection{
|
||||||
handler: handler,
|
handler: handler,
|
||||||
@ -167,7 +167,7 @@ func newColl(
|
|||||||
state: data.StateOf(prevPath, currPath),
|
state: data.StateOf(prevPath, currPath),
|
||||||
scope: colScope,
|
scope: colScope,
|
||||||
doNotMergeItems: doNotMergeItems,
|
doNotMergeItems: doNotMergeItems,
|
||||||
cache: cache,
|
urlCache: urlCache,
|
||||||
}
|
}
|
||||||
|
|
||||||
return c
|
return c
|
||||||
@ -273,7 +273,7 @@ func (oc *Collection) getDriveItemContent(
|
|||||||
el = errs.Local()
|
el = errs.Local()
|
||||||
)
|
)
|
||||||
|
|
||||||
itemData, err := oc.downloadContent(ctx, oc.handler, item, oc.driveID)
|
itemData, err := downloadContent(ctx, oc.handler, oc.urlCache, item, oc.driveID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) {
|
if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) {
|
||||||
logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipMalware).Info("item flagged as malware")
|
logger.CtxErr(ctx, err).With("skipped_reason", fault.SkipMalware).Info("item flagged as malware")
|
||||||
@ -323,9 +323,10 @@ type itemAndAPIGetter interface {
|
|||||||
// downloadContent attempts to fetch the item content. If the content url
|
// downloadContent attempts to fetch the item content. If the content url
|
||||||
// is expired (ie, returns a 401), it re-fetches the item to get a new download
|
// is expired (ie, returns a 401), it re-fetches the item to get a new download
|
||||||
// url and tries again.
|
// url and tries again.
|
||||||
func (oc *Collection) downloadContent(
|
func downloadContent(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
iaag itemAndAPIGetter,
|
iaag itemAndAPIGetter,
|
||||||
|
uc getItemPropertyer,
|
||||||
item models.DriveItemable,
|
item models.DriveItemable,
|
||||||
driveID string,
|
driveID string,
|
||||||
) (io.ReadCloser, error) {
|
) (io.ReadCloser, error) {
|
||||||
@ -343,9 +344,9 @@ func (oc *Collection) downloadContent(
|
|||||||
// token, and that we've overrun the available window to
|
// token, and that we've overrun the available window to
|
||||||
// download the file. Get a fresh url from the cache and attempt to
|
// download the file. Get a fresh url from the cache and attempt to
|
||||||
// download again.
|
// download again.
|
||||||
content, err = oc.readFromCache(ctx, iaag, itemID)
|
content, err = readItemContents(ctx, iaag, uc, itemID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
logger.Ctx(ctx).Debug("found item in cache", itemID)
|
logger.Ctx(ctx).Debug("found item in url cache")
|
||||||
|
|
||||||
return content, nil
|
return content, nil
|
||||||
}
|
}
|
||||||
@ -353,7 +354,7 @@ func (oc *Collection) downloadContent(
|
|||||||
// Consider cache errors(including deleted items) as cache misses. This is
|
// Consider cache errors(including deleted items) as cache misses. This is
|
||||||
// to preserve existing behavior. Fallback to refetching the item using the
|
// to preserve existing behavior. Fallback to refetching the item using the
|
||||||
// API.
|
// API.
|
||||||
logger.CtxErr(ctx, err).Info("cache miss. refetching from API")
|
logger.CtxErr(ctx, err).Info("url cache miss: refetching from API")
|
||||||
|
|
||||||
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
|
di, err := iaag.GetItem(ctx, driveID, ptr.Val(item.GetId()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -368,18 +369,19 @@ func (oc *Collection) downloadContent(
|
|||||||
return content, nil
|
return content, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// readFromCache fetches latest download URL from the cache and attempts to
|
// readItemContents fetches latest download URL from the cache and attempts to
|
||||||
// download the file using the new URL.
|
// download the file using the new URL.
|
||||||
func (oc *Collection) readFromCache(
|
func readItemContents(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
iaag itemAndAPIGetter,
|
iaag itemAndAPIGetter,
|
||||||
|
uc getItemPropertyer,
|
||||||
itemID string,
|
itemID string,
|
||||||
) (io.ReadCloser, error) {
|
) (io.ReadCloser, error) {
|
||||||
if oc.cache == nil {
|
if uc == nil {
|
||||||
return nil, clues.New("nil cache")
|
return nil, clues.New("nil url cache")
|
||||||
}
|
}
|
||||||
|
|
||||||
props, err := oc.cache.getItemProperties(ctx, itemID)
|
props, err := uc.getItemProperties(ctx, itemID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -387,7 +389,6 @@ func (oc *Collection) readFromCache(
|
|||||||
// Handle newly deleted items
|
// Handle newly deleted items
|
||||||
if props.isDeleted {
|
if props.isDeleted {
|
||||||
logger.Ctx(ctx).Info("item deleted in cache")
|
logger.Ctx(ctx).Info("item deleted in cache")
|
||||||
|
|
||||||
return nil, graph.ErrDeletedInFlight
|
return nil, graph.ErrDeletedInFlight
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -605,7 +605,7 @@ func (suite *GetDriveItemUnitTestSuite) TestGetDriveItem_error() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ urlCacher = &mockURLCache{}
|
var _ getItemPropertyer = &mockURLCache{}
|
||||||
|
|
||||||
type mockURLCache struct {
|
type mockURLCache struct {
|
||||||
Get func(ctx context.Context, itemID string) (itemProps, error)
|
Get func(ctx context.Context, itemID string) (itemProps, error)
|
||||||
@ -765,19 +765,7 @@ func (suite *GetDriveItemUnitTestSuite) TestDownloadContent() {
|
|||||||
mbh.GetResps = resps
|
mbh.GetResps = resps
|
||||||
mbh.GetErrs = test.getErr
|
mbh.GetErrs = test.getErr
|
||||||
|
|
||||||
coll, err := NewCollection(
|
r, err := downloadContent(ctx, mbh, test.muc, item, driveID)
|
||||||
mbh,
|
|
||||||
nil,
|
|
||||||
nil,
|
|
||||||
driveID,
|
|
||||||
nil,
|
|
||||||
control.Options{ToggleFeatures: control.Toggles{}},
|
|
||||||
CollectionScopeFolder,
|
|
||||||
true,
|
|
||||||
test.muc)
|
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
|
||||||
|
|
||||||
r, err := coll.downloadContent(ctx, mbh, item, driveID)
|
|
||||||
test.expect(t, r)
|
test.expect(t, r)
|
||||||
test.expectErr(t, err, clues.ToCore(err))
|
test.expectErr(t, err, clues.ToCore(err))
|
||||||
})
|
})
|
||||||
|
|||||||
@ -385,7 +385,7 @@ func (c *Collections) Get(
|
|||||||
|
|
||||||
// Only create a drive cache if there are less than 300k items in the drive.
|
// Only create a drive cache if there are less than 300k items in the drive.
|
||||||
if numDriveItems < urlCacheDriveItemThreshold {
|
if numDriveItems < urlCacheDriveItemThreshold {
|
||||||
logger.Ctx(ictx).Info("adding url cache for drive ", driveID)
|
logger.Ctx(ictx).Info("adding url cache for drive")
|
||||||
|
|
||||||
err = c.addURLCacheToDriveCollections(
|
err = c.addURLCacheToDriveCollections(
|
||||||
ictx,
|
ictx,
|
||||||
@ -457,7 +457,7 @@ func (c *Collections) Get(
|
|||||||
return collections, canUsePreviousBackup, nil
|
return collections, canUsePreviousBackup, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// addURLCacheToDriveCollections adds a URL cache to all collections belonging to
|
// addURLCacheToDriveCollections adds an URL cache to all collections belonging to
|
||||||
// a drive.
|
// a drive.
|
||||||
func (c *Collections) addURLCacheToDriveCollections(
|
func (c *Collections) addURLCacheToDriveCollections(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
@ -476,7 +476,7 @@ func (c *Collections) addURLCacheToDriveCollections(
|
|||||||
// Set the URL cache for all collections in this drive
|
// Set the URL cache for all collections in this drive
|
||||||
for _, driveColls := range c.CollectionMap {
|
for _, driveColls := range c.CollectionMap {
|
||||||
for _, coll := range driveColls {
|
for _, coll := range driveColls {
|
||||||
coll.cache = uc
|
coll.urlCache = uc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -2733,14 +2733,13 @@ func (suite *OneDriveCollectionsUnitSuite) TestURLCacheIntegration() {
|
|||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
c.CollectionMap[driveID][strconv.Itoa(i)] = coll
|
c.CollectionMap[driveID][strconv.Itoa(i)] = coll
|
||||||
require.Equal(t, nil, coll.cache, "cache not nil")
|
require.Equal(t, nil, coll.urlCache, "cache not nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.addURLCacheToDriveCollections(
|
err := c.addURLCacheToDriveCollections(
|
||||||
ctx,
|
ctx,
|
||||||
driveID,
|
driveID,
|
||||||
fault.New(true))
|
fault.New(true))
|
||||||
|
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
// Check that all collections have the same cache instance attached
|
// Check that all collections have the same cache instance attached
|
||||||
@ -2748,11 +2747,11 @@ func (suite *OneDriveCollectionsUnitSuite) TestURLCacheIntegration() {
|
|||||||
var uc *urlCache
|
var uc *urlCache
|
||||||
for _, driveColls := range c.CollectionMap {
|
for _, driveColls := range c.CollectionMap {
|
||||||
for _, coll := range driveColls {
|
for _, coll := range driveColls {
|
||||||
require.NotNil(t, coll.cache, "cache is nil")
|
require.NotNil(t, coll.urlCache, "cache is nil")
|
||||||
if uc == nil {
|
if uc == nil {
|
||||||
uc = coll.cache.(*urlCache)
|
uc = coll.urlCache.(*urlCache)
|
||||||
} else {
|
} else {
|
||||||
require.Equal(t, uc, coll.cache, "cache not equal")
|
require.Equal(t, uc, coll.urlCache, "cache not equal")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,7 +15,7 @@ import (
|
|||||||
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
"github.com/alcionai/corso/src/pkg/services/m365/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
type urlCacher interface {
|
type getItemPropertyer interface {
|
||||||
getItemProperties(
|
getItemProperties(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
itemID string,
|
itemID string,
|
||||||
@ -27,7 +27,7 @@ type itemProps struct {
|
|||||||
isDeleted bool
|
isDeleted bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ urlCacher = &urlCache{}
|
var _ getItemPropertyer = &urlCache{}
|
||||||
|
|
||||||
// urlCache caches download URLs for drive items
|
// urlCache caches download URLs for drive items
|
||||||
type urlCache struct {
|
type urlCache struct {
|
||||||
|
|||||||
@ -118,9 +118,6 @@ func (suite *URLCacheIntegrationSuite) TestURLCacheBasic() {
|
|||||||
|
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
err = cache.refreshCache(ctx)
|
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
|
||||||
|
|
||||||
// Launch parallel requests to the cache, one per item
|
// Launch parallel requests to the cache, one per item
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := 0; i < len(items); i++ {
|
for i := 0; i < len(items); i++ {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user