diff --git a/CHANGELOG.md b/CHANGELOG.md index ca1dbeecc..8b3fd1209 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Fix repo connect not working without a config file +- Fix item re-download on expired links silently being skipped ## [v0.5.0] (beta) - 2023-03-13 diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 9b262ddd2..662c61245 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -35,6 +35,10 @@ const ( // TODO: Tune this later along with collectionChannelBufferSize urlPrefetchChannelBufferSize = 5 + // maxDownloadRetires specifies the number of times a file download should + // be retried + maxDownloadRetires = 3 + MetaFileSuffix = ".meta" DirMetaFileSuffix = ".dirmeta" DataFileSuffix = ".data" @@ -80,6 +84,7 @@ type Collection struct { source driveSource service graph.Servicer statusUpdater support.StatusUpdater + itemGetter itemGetterFunc itemReader itemReaderFunc itemMetaReader itemMetaReaderFunc ctrl control.Options @@ -96,6 +101,13 @@ type Collection struct { doNotMergeItems bool } +// itemGetterFunc gets an specified item +type itemGetterFunc func( + ctx context.Context, + srv graph.Servicer, + driveID, itemID string, +) (models.DriveItemable, error) + // itemReadFunc returns a reader for the specified item type itemReaderFunc func( ctx context.Context, @@ -143,8 +155,10 @@ func NewCollection( // Allows tests to set a mock populator switch source { case SharePointSource: + c.itemGetter = getDriveItem c.itemReader = sharePointItemReader default: + c.itemGetter = getDriveItem c.itemReader = oneDriveItemReader c.itemMetaReader = oneDriveItemMetaReader } @@ -286,6 +300,60 @@ func (od *metadataItem) ModTime() time.Time { return od.modTime } +// getDriveItemContent fetch drive item's contents with retries +func (oc *Collection) getDriveItemContent( + ctx context.Context, + item models.DriveItemable, + errs *fault.Bus, +) (io.ReadCloser, error) { + var ( + itemID = ptr.Val(item.GetId()) + itemName = ptr.Val(item.GetName()) + el = errs.Local() + + itemData io.ReadCloser + err error + ) + + // Initial try with url from delta + 2 retries + for i := 1; i <= maxDownloadRetires; i++ { + _, itemData, err = oc.itemReader(ctx, oc.itemClient, item) + + if err == nil || !graph.IsErrUnauthorized(err) { + break + } + + // Assume unauthorized requests are a sign of an expired jwt + // token, and that we've overrun the available window to + // download the actual file. Re-downloading the item will + // refresh that download url. + di, diErr := oc.itemGetter(ctx, oc.service, oc.driveID, itemID) + if diErr != nil { + err = errors.Wrap(diErr, "retrieving expired item") + break + } + + item = di + } + + // check for errors following retries + if err != nil { + if clues.HasLabel(err, graph.LabelsMalware) || (item != nil && item.GetMalware() != nil) { + logger.Ctx(ctx).With("error", err.Error(), "malware", true).Error("downloading item") + el.AddSkip(fault.FileSkip(fault.SkipMalware, itemID, itemName, graph.MalwareInfo(item))) + } else { + logger.Ctx(ctx).With("error", err.Error()).Error("downloading item") + el.AddRecoverable(clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) + } + + // return err, not el.Err(), because the lazy reader needs to communicate to + // the data consumer that this item is unreadable, regardless of the fault state. + return nil, err + } + + return itemData, nil +} + // populateItems iterates through items added to the collection // and uses the collection `itemReader` to read the item func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { @@ -403,38 +471,8 @@ func (oc *Collection) populateItems(ctx context.Context, errs *fault.Bus) { // attempts to read bytes. Assumption is that kopia will check things // like file modtimes before attempting to read. itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { - // Read the item - var ( - itemData io.ReadCloser - err error - ) - - _, itemData, err = oc.itemReader(ctx, oc.itemClient, item) - - if err != nil && graph.IsErrUnauthorized(err) { - // assume unauthorized requests are a sign of an expired - // jwt token, and that we've overrun the available window - // to download the actual file. Re-downloading the item - // will refresh that download url. - di, diErr := getDriveItem(ctx, oc.service, oc.driveID, itemID) - if diErr != nil { - err = errors.Wrap(diErr, "retrieving expired item") - } - item = di - } - - // check for errors following retries + itemData, err := oc.getDriveItemContent(ctx, item, errs) if err != nil { - if item.GetMalware() != nil || clues.HasLabel(err, graph.LabelsMalware) { - logger.Ctx(ctx).With("error", err.Error(), "malware", true).Error("downloading item") - el.AddSkip(fault.FileSkip(fault.SkipMalware, itemID, itemName, graph.MalwareInfo(item))) - } else { - logger.Ctx(ctx).With("error", err.Error()).Error("downloading item") - el.AddRecoverable(clues.Stack(err).WithClues(ctx).Label(fault.LabelForceNoBackupCreation)) - } - - // return err, not el.Err(), because the lazy reader needs to communicate to - // the data consumer that this item is unreadable, regardless of the fault state. return nil, err } diff --git a/src/internal/connector/onedrive/collection_test.go b/src/internal/connector/onedrive/collection_test.go index 3be7f7951..aba15d77d 100644 --- a/src/internal/connector/onedrive/collection_test.go +++ b/src/internal/connector/onedrive/collection_test.go @@ -388,6 +388,111 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() { } } +func (suite *CollectionUnitTestSuite) TestCollectionReadUnauthorizedErrorRetry() { + var ( + name = "name" + size int64 = 42 + now = time.Now() + ) + + table := []struct { + name string + source driveSource + }{ + { + name: "oneDrive", + source: OneDriveSource, + }, + { + name: "sharePoint", + source: SharePointSource, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + ctx, flush := tester.NewContext() + defer flush() + + var ( + t = suite.T() + testItemID = "fakeItemID" + collStatus = support.ConnectorOperationStatus{} + wg = sync.WaitGroup{} + ) + + wg.Add(1) + + folderPath, err := GetCanonicalPath("drive/driveID1/root:/folderPath", "a-tenant", "a-user", test.source) + require.NoError(t, err) + + coll := NewCollection( + graph.HTTPClient(graph.NoTimeout()), + folderPath, + nil, + "fakeDriveID", + suite, + suite.testStatusUpdater(&wg, &collStatus), + test.source, + control.Options{ToggleFeatures: control.Toggles{EnablePermissionsBackup: true}}, + true) + + mockItem := models.NewDriveItem() + mockItem.SetId(&testItemID) + mockItem.SetFile(models.NewFile()) + mockItem.SetName(&name) + mockItem.SetSize(&size) + mockItem.SetCreatedDateTime(&now) + mockItem.SetLastModifiedDateTime(&now) + coll.Add(mockItem) + + count := 0 + + coll.itemGetter = func( + ctx context.Context, + srv graph.Servicer, + driveID, itemID string, + ) (models.DriveItemable, error) { + return mockItem, nil + } + + coll.itemReader = func( + context.Context, + *http.Client, + models.DriveItemable, + ) (details.ItemInfo, io.ReadCloser, error) { + if count < 2 { + count++ + return details.ItemInfo{}, nil, clues.Stack(assert.AnError). + Label(graph.LabelStatus(http.StatusUnauthorized)) + } + + return details.ItemInfo{}, io.NopCloser(strings.NewReader("test")), nil + } + + coll.itemMetaReader = func(_ context.Context, + _ graph.Servicer, + _ string, + _ models.DriveItemable, + _ bool, + ) (io.ReadCloser, int, error) { + return io.NopCloser(strings.NewReader(`{}`)), 2, nil + } + + collItem, ok := <-coll.Items(ctx, fault.New(true)) + assert.True(t, ok) + + _, err = io.ReadAll(collItem.ToReader()) + assert.NoError(t, err) + + wg.Wait() + + require.Equal(t, 1, collStatus.Metrics.Objects, "only one object should be counted") + require.Equal(t, 1, collStatus.Metrics.Successes, "read object successfully") + require.Equal(t, 2, count, "retry count") + }) + } +} + // TODO(meain): Remove this test once we start always backing up permissions func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTime() { table := []struct {