Fix item re-download on unauthorized requests (#2804)

We had a regression which caused item re-downloads on 401s from
initial delta URL to be silently skipped. This fixes that behavior.<!-- Insert PR description-->

---

#### Does this PR need a docs update or release note?

- [x]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [ ]  No

#### Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [x] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* #<issue>

#### Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
Abin Simon 2023-03-15 13:15:20 +05:30 committed by GitHub
parent f729ed6626
commit efee7c4419
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 175 additions and 31 deletions

View File

@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed ### Fixed
- Fix repo connect not working without a config file - Fix repo connect not working without a config file
- Fix item re-download on expired links silently being skipped
## [v0.5.0] (beta) - 2023-03-13 ## [v0.5.0] (beta) - 2023-03-13

View File

@ -35,6 +35,10 @@ const (
// TODO: Tune this later along with collectionChannelBufferSize // TODO: Tune this later along with collectionChannelBufferSize
urlPrefetchChannelBufferSize = 5 urlPrefetchChannelBufferSize = 5
// maxDownloadRetires specifies the number of times a file download should
// be retried
maxDownloadRetires = 3
MetaFileSuffix = ".meta" MetaFileSuffix = ".meta"
DirMetaFileSuffix = ".dirmeta" DirMetaFileSuffix = ".dirmeta"
DataFileSuffix = ".data" DataFileSuffix = ".data"
@ -80,6 +84,7 @@ type Collection struct {
source driveSource source driveSource
service graph.Servicer service graph.Servicer
statusUpdater support.StatusUpdater statusUpdater support.StatusUpdater
itemGetter itemGetterFunc
itemReader itemReaderFunc itemReader itemReaderFunc
itemMetaReader itemMetaReaderFunc itemMetaReader itemMetaReaderFunc
ctrl control.Options ctrl control.Options
@ -96,6 +101,13 @@ type Collection struct {
doNotMergeItems bool doNotMergeItems bool
} }
// itemGetterFunc gets an specified item
type itemGetterFunc func(
ctx context.Context,
srv graph.Servicer,
driveID, itemID string,
) (models.DriveItemable, error)
// itemReadFunc returns a reader for the specified item // itemReadFunc returns a reader for the specified item
type itemReaderFunc func( type itemReaderFunc func(
ctx context.Context, ctx context.Context,
@ -143,8 +155,10 @@ func NewCollection(
// Allows tests to set a mock populator // Allows tests to set a mock populator
switch source { switch source {
case SharePointSource: case SharePointSource:
c.itemGetter = getDriveItem
c.itemReader = sharePointItemReader c.itemReader = sharePointItemReader
default: default:
c.itemGetter = getDriveItem
c.itemReader = oneDriveItemReader c.itemReader = oneDriveItemReader
c.itemMetaReader = oneDriveItemMetaReader c.itemMetaReader = oneDriveItemMetaReader
} }
@ -286,6 +300,60 @@ func (od *metadataItem) ModTime() time.Time {
return od.modTime return od.modTime
} }
// getDriveItemContent fetch drive item's contents with retries
func (oc *Collection) getDriveItemContent(
ctx context.Context,
item models.DriveItemable,
errs *fault.Bus,
) (io.ReadCloser, error) {
var (
itemID = ptr.Val(item.GetId())
itemName = ptr.Val(item.GetName())
el = errs.Local()
itemData io.ReadCloser
err error
)
// Initial try with url from delta + 2 retries
for i := 1; i <= maxDownloadRetires; i++ {
_, itemData, err = oc.itemReader(ctx, oc.itemClient, item)
if err == nil || !graph.IsErrUnauthorized(err) {
break
}
// Assume unauthorized requests are a sign of an expired jwt
// token, and that we've overrun the available window to
// download the actual file. Re-downloading the item will
// refresh that download url.
di, diErr := oc.itemGetter(ctx, oc.service, oc.driveID, itemID)
if diErr != nil {
err = errors.Wrap(diErr, "retrieving expired item")
break
}
item = di
}
// check for errors following retries
if err != nil {
if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) {
logger.Ctx(ctx).With("error", err.Error(), "malware", true).Error("downloading item")
el.AddSkip(fault.FileSkip(fault.SkipMalware, itemID, itemName, graph.MalwareInfo(item)))
} else {
logger.Ctx(ctx).With("error", err.Error()).Error("downloading item")
el.AddRecoverable(clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation))
}
// return err, not el.Err(), because the lazy reader needs to communicate to
// the data consumer that this item is unreadable, regardless of the fault state.
return nil, err
}
return itemData, nil
}
// populateItems iterates through items added to the collection // populateItems iterates through items added to the collection
// and uses the collection `itemReader` to read the item // and uses the collection `itemReader` to read the item
func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
@ -403,38 +471,8 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) {
// attempts to read bytes. Assumption is that kopia will check things // attempts to read bytes. Assumption is that kopia will check things
// like file modtimes before attempting to read. // like file modtimes before attempting to read.
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
// Read the item itemData, err := oc.getDriveItemContent(ctx, item, errs)
var (
itemData io.ReadCloser
err error
)
_, itemData, err = oc.itemReader(ctx, oc.itemClient, item)
if err != nil && graph.IsErrUnauthorized(err) {
// assume unauthorized requests are a sign of an expired
// jwt token, and that we've overrun the available window
// to download the actual file. Re-downloading the item
// will refresh that download url.
di, diErr := getDriveItem(ctx, oc.service, oc.driveID, itemID)
if diErr != nil {
err = errors.Wrap(diErr, "retrieving expired item")
}
item = di
}
// check for errors following retries
if err != nil { if err != nil {
if item.GetMalware() != nil || clues.HasLabel(err, graph.LabelsMalware) {
logger.Ctx(ctx).With("error", err.Error(), "malware", true).Error("downloading item")
el.AddSkip(fault.FileSkip(fault.SkipMalware, itemID, itemName, graph.MalwareInfo(item)))
} else {
logger.Ctx(ctx).With("error", err.Error()).Error("downloading item")
el.AddRecoverable(clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation))
}
// return err, not el.Err(), because the lazy reader needs to communicate to
// the data consumer that this item is unreadable, regardless of the fault state.
return nil, err return nil, err
} }

View File

@ -388,6 +388,111 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() {
} }
} }
func (suite *CollectionUnitTestSuite) TestCollectionReadUnauthorizedErrorRetry() {
var (
name = "name"
size int64 = 42
now = time.Now()
)
table := []struct {
name string
source driveSource
}{
{
name: "oneDrive",
source: OneDriveSource,
},
{
name: "sharePoint",
source: SharePointSource,
},
}
for _, test := range table {
suite.Run(test.name, func() {
ctx, flush := tester.NewContext()
defer flush()
var (
t = suite.T()
testItemID = "fakeItemID"
collStatus = support.ConnectorOperationStatus{}
wg = sync.WaitGroup{}
)
wg.Add(1)
folderPath, err := GetCanonicalPath("drive/driveID1/root:/folderPath", "a-tenant", "a-user", test.source)
require.NoError(t, err)
coll := NewCollection(
graph.HTTPClient(graph.NoTimeout()),
folderPath,
nil,
"fakeDriveID",
suite,
suite.testStatusUpdater(&wg, &collStatus),
test.source,
control.Options{ToggleFeatures: control.Toggles{EnablePermissionsBackup: true}},
true)
mockItem := models.NewDriveItem()
mockItem.SetId(&testItemID)
mockItem.SetFile(models.NewFile())
mockItem.SetName(&name)
mockItem.SetSize(&size)
mockItem.SetCreatedDateTime(&now)
mockItem.SetLastModifiedDateTime(&now)
coll.Add(mockItem)
count := 0
coll.itemGetter = func(
ctx context.Context,
srv graph.Servicer,
driveID, itemID string,
) (models.DriveItemable, error) {
return mockItem, nil
}
coll.itemReader = func(
context.Context,
*http.Client,
models.DriveItemable,
) (details.ItemInfo, io.ReadCloser, error) {
if count < 2 {
count++
return details.ItemInfo{}, nil, clues.Stack(assert.AnError).
Label(graph.LabelStatus(http.StatusUnauthorized))
}
return details.ItemInfo{}, io.NopCloser(strings.NewReader("test")), nil
}
coll.itemMetaReader = func(_ context.Context,
_ graph.Servicer,
_ string,
_ models.DriveItemable,
_ bool,
) (io.ReadCloser, int, error) {
return io.NopCloser(strings.NewReader(`{}`)), 2, nil
}
collItem, ok := <-coll.Items(ctx, fault.New(true))
assert.True(t, ok)
_, err = io.ReadAll(collItem.ToReader())
assert.NoError(t, err)
wg.Wait()
require.Equal(t, 1, collStatus.Metrics.Objects, "only one object should be counted")
require.Equal(t, 1, collStatus.Metrics.Successes, "read object successfully")
require.Equal(t, 2, count, "retry count")
})
}
}
// TODO(meain): Remove this test once we start always backing up permissions // TODO(meain): Remove this test once we start always backing up permissions
func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTime() { func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTime() {
table := []struct { table := []struct {