diff --git a/src/internal/connector/graph/service.go b/src/internal/connector/graph/service.go index fd6142028..705419248 100644 --- a/src/internal/connector/graph/service.go +++ b/src/internal/connector/graph/service.go @@ -23,7 +23,7 @@ import ( const ( logGraphRequestsEnvKey = "LOG_GRAPH_REQUESTS" - numberOfRetries = 3 + numberOfRetries = 1 ) // AllMetadataFileNames produces the standard set of filenames used to store graph @@ -313,7 +313,7 @@ func RunWithRetry(run func() error) error { } if i < numberOfRetries { - time.Sleep(time.Duration(3*(i+2)) * time.Second) + time.Sleep(time.Duration(0*(i+2)) * time.Second) } } diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 78849e0be..454a2b375 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -11,7 +11,6 @@ import ( "time" "github.com/microsoftgraph/msgraph-sdk-go/models" - "github.com/pkg/errors" "github.com/spatialcurrent/go-lazy/pkg/lazy" "github.com/alcionai/corso/src/internal/connector/graph" @@ -216,7 +215,7 @@ func (oc *Collection) populateItems(ctx context.Context) { // Retrieve the OneDrive folder path to set later in `details.OneDriveInfo` parentPathString, err := path.GetDriveFolderPath(oc.folderPath) if err != nil { - oc.reportAsCompleted(ctx, 0, 0, 0, err) + oc.reportAsCompleted(ctx, 0, 0, 0, 0, 0, err) return } @@ -237,11 +236,11 @@ func (oc *Collection) populateItems(ctx context.Context) { errs = support.WrapAndAppend(id, err, errs) } countUpdater = func(size, dirs, items, dReads, iReads int64) { - atomic.AddInt64(&dirsRead, dReads) - atomic.AddInt64(&itemsRead, iReads) atomic.AddInt64(&byteCount, size) atomic.AddInt64(&dirsFound, dirs) - atomic.AddInt64(&itemsFound, dirs) + atomic.AddInt64(&itemsFound, items) + atomic.AddInt64(&dirsRead, dReads) + atomic.AddInt64(&itemsRead, iReads) } ) @@ -265,6 +264,22 @@ func (oc *Collection) populateItems(ctx context.Context) { wg.Add(1) + // fetch the item's metadata (if it has any, will no-op otherwise), + // and stream it into the collection's data channel + go oc.streamItemMeta( + ctx, + &wg, + semaphoreCh, + folderProgress, + errUpdater, + countUpdater, + item, + parentPathString) + + semaphoreCh <- struct{}{} + + wg.Add(1) + // fetch the item, and stream it into the collection's data channel go oc.streamItem( ctx, @@ -279,7 +294,109 @@ func (oc *Collection) populateItems(ctx context.Context) { wg.Wait() - oc.reportAsCompleted(ctx, itemsFound, itemsRead, byteCount, errs) + oc.reportAsCompleted(ctx, dirsFound, dirsRead, itemsFound, itemsRead, byteCount, errs) +} + +func (oc *Collection) streamItemMeta( + ctx context.Context, + wg *sync.WaitGroup, + semaphore <-chan struct{}, + progress chan<- struct{}, + errUpdater func(string, error), + countUpdater func(int64, int64, int64, int64, int64), + item models.DriveItemable, + parentPath string, +) { + defer wg.Done() + defer func() { <-semaphore }() + + var ( + id = *item.GetId() + name = *item.GetName() + size = *item.GetSize() + isFile = item.GetFile() != nil + + lazyRC io.ReadCloser + info details.ItemInfo + metadata io.ReadCloser + + suffix string + dirsFound int64 + dirsRead int64 + itemsFound int64 + itemsRead int64 + + err error + ) + + // we only handle metadata for onedrive right now + if oc.source != OneDriveSource { + return + } + + if isFile { + itemsFound, itemsRead = 1, 1 + suffix = MetaFileSuffix + } else { + dirsFound, dirsRead = 1, 1 + suffix = DirMetaFileSuffix + } + + info.OneDrive = oneDriveItemInfo(item, size) + info.OneDrive.ParentPath = parentPath + + // Construct a new lazy readCloser to feed to the collection consumer. + // This ensures that downloads won't be attempted unless that consumer + // attempts to read bytes. Assumption is that kopia will check things + // like file modtimes before attempting to read. + lazyRC = lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { + var msize int + metadata, msize, err = getItemMeta( + ctx, + oc.service, + oc.driveID, + item, + maxRetries, + oc.ctrl.ToggleFeatures.EnablePermissionsBackup, + oc.itemMetaReader) + if err != nil { + errUpdater(id, err) + return nil, err + } + + name += suffix + size = int64(msize) + + progReader, closer := observe.ItemProgress( + ctx, + metadata, + observe.ItemBackupMsg, + observe.PII(name), + size) + go closer() + return progReader, nil + }) + + // Item read successfully, record its addition. + // + // Note: this can cause inaccurate counts. Right now it counts all + // the items we intend to read. Errors within the lazy readCloser + // will create a conflict: an item is both successful and erroneous. + // But the async control to fix that is more error-prone than helpful. + // + // TODO: transform this into a stats bus so that async control of stats + // aggregation is handled at the backup level, not at the item iteration + // level. + countUpdater(size, dirsFound, itemsFound, dirsRead, itemsRead) + + // stream the item to the data consumer. + oc.data <- &Item{ + id: name, + data: lazyRC, + info: info, + } + + progress <- struct{}{} } func (oc *Collection) streamItem( @@ -296,111 +413,69 @@ func (oc *Collection) streamItem( defer func() { <-semaphore }() var ( - itemID = *item.GetId() - itemName = *item.GetName() - itemSize = *item.GetSize() - isFile = item.GetFile() != nil + id = *item.GetId() + name = *item.GetName() + size = *item.GetSize() + isFile = item.GetFile() != nil - itemInfo details.ItemInfo - itemMeta io.ReadCloser + lazyRC io.ReadCloser + info details.ItemInfo - dataSuffix string - metaSuffix string - - dirsFound int64 - itemsFound int64 - dirsRead int64 - itemsRead int64 - itemMetaSize int - - err error + dirsFound int64 + itemsFound int64 + dirsRead int64 + itemsRead int64 ) if isFile { itemsFound++ itemsRead++ - metaSuffix = MetaFileSuffix } else { dirsFound++ dirsRead++ - metaSuffix = DirMetaFileSuffix } switch oc.source { case SharePointSource: - itemInfo.SharePoint = sharePointItemInfo(item, itemSize) - itemInfo.SharePoint.ParentPath = parentPath + info.SharePoint = sharePointItemInfo(item, size) + info.SharePoint.ParentPath = parentPath default: - itemInfo.OneDrive = oneDriveItemInfo(item, itemSize) - itemInfo.OneDrive.ParentPath = parentPath - dataSuffix = DataFileSuffix + info.OneDrive = oneDriveItemInfo(item, size) + info.OneDrive.ParentPath = parentPath } - // directory handling - if !isFile { - // Construct a new lazy readCloser to feed to the collection consumer. - // This ensures that downloads won't be attempted unless that consumer - // attempts to read bytes. Assumption is that kopia will check things - // like file modtimes before attempting to read. - metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { - if oc.source == OneDriveSource { - itemMeta, itemMetaSize, err = getItemMeta( - ctx, - oc.service, - oc.driveID, - item, - maxRetries, - oc.ctrl.ToggleFeatures.EnablePermissionsBackup, - oc.itemMetaReader) - if err != nil { - errUpdater(itemID, err) - return nil, err - } - } + // Construct a new lazy readCloser to feed to the collection consumer. + // This ensures that downloads won't be attempted unless that consumer + // attempts to read bytes. Assumption is that kopia will check things + // like file modtimes before attempting to read. + lazyRC = lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { + itemData, _, err := readDriveItem( + ctx, + oc.service, + oc.itemClient, + oc.driveID, id, + item, + oc.itemReader, + oc.itemGetter) + if err != nil { + errUpdater(id, err) + return nil, err + } - progReader, closer := observe.ItemProgress( - ctx, itemMeta, observe.ItemBackupMsg, - observe.PII(itemName+metaSuffix), int64(itemMetaSize)) - go closer() - return progReader, nil - }) - } else { - // Construct a new lazy readCloser to feed to the collection consumer. - // This ensures that downloads won't be attempted unless that consumer - // attempts to read bytes. Assumption is that kopia will check things - // like file modtimes before attempting to read. - lazyData := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { - // Read the item - var ( - itemData io.ReadCloser - err error - ) + // this separates onedrive items from their metadata + if oc.source == OneDriveSource && isFile { + name += DataFileSuffix + } - itemData, item, err = readDriveItem( - ctx, - oc.service, - oc.itemClient, - oc.driveID, itemID, - item, - oc.itemReader, - oc.itemGetter) - if err != nil { - errUpdater(itemID, err) - return nil, err - } - - // display/log the item download - progReader, closer := observe.ItemProgress( - ctx, - itemData, - observe.ItemBackupMsg, - observe.PII(itemName+dataSuffix), - itemSize) - go closer() - - return progReader, nil - }) - } + progReader, closer := observe.ItemProgress( + ctx, + itemData, + observe.ItemBackupMsg, + observe.PII(name), + size) + go closer() + return progReader, nil + }) // Item read successfully, record its addition. // @@ -412,21 +487,13 @@ func (oc *Collection) streamItem( // TODO: transform this into a stats bus so that async control of stats // aggregation is handled at the backup level, not at the item iteration // level. - countUpdater(itemSize, dirsFound, itemsFound, dirsRead, itemsRead) - - if hasMeta { - oc.data <- &Item{ - id: itemName + metaSuffix, - data: metaReader, - info: itemInfo, - } - } + countUpdater(size, dirsFound, itemsFound, dirsRead, itemsRead) // stream the item to the data consumer. oc.data <- &Item{ - id: itemName + dataSuffix, - data: lazyData, - info: itemInfo, + id: name, + data: lazyRC, + info: info, } progress <- struct{}{} @@ -441,32 +508,24 @@ func getItemMeta( enablePermissionsBackup bool, read itemMetaReaderFunc, ) (io.ReadCloser, int, error) { + if !enablePermissionsBackup { + // We are still writing the metadata file but with + // empty permissions as we don't have a way to + // signify that the permissions was explicitly + // not added. + return io.NopCloser(strings.NewReader("{}")), 2, nil + } + var ( rc io.ReadCloser size int + err error ) - for i := 1; i <= maxRetries; i++ { - if !enablePermissionsBackup { - // We are still writing the metadata file but with - // empty permissions as we don't have a way to - // signify that the permissions was explicitly - // not added. - return io.NopCloser(strings.NewReader("{}")), 2, nil - } - - var err error + err = graph.RunWithRetry(func() error { rc, size, err = read(ctx, service, driveID, item) - if err == nil || - !graph.IsErrTimeout(err) || - !graph.IsInternalServerError(err) { - return nil, 0, errors.Wrap(err, "getting item metadata") - } - - if i < maxRetries { - time.Sleep(1 * time.Second) - } - } + return err + }) return rc, size, nil } @@ -486,49 +545,27 @@ func readDriveItem( item = original ) - for i := 0; i < maxRetries; i++ { + err = graph.RunWithRetry(func() error { _, rc, err = read(itemClient, item) - if err == nil { - return nil, nil, errors.Wrap(err, "reading drive item") - } - - if graph.IsErrUnauthorized(err) { - // assume unauthorized requests are a sign of an expired - // jwt token, and that we've overrun the available window - // to download the actual file. Re-downloading the item - // will refresh that download url. - di, diErr := get(ctx, service, driveID, itemID) - if diErr != nil { - return nil, nil, errors.Wrap(diErr, "retrieving item to refresh download url") - } - - item = di - - continue - - } else if !graph.IsErrTimeout(err) && - !graph.IsInternalServerError(err) { - // for all non-timeout, non-internal errors, do not retry - break - } - - if i < maxRetries { - time.Sleep(1 * time.Second) - } - } + return err + }) return rc, item, err } -func (oc *Collection) reportAsCompleted(ctx context.Context, itemsRead, itemsFound, byteCount int64, errs error) { +func (oc *Collection) reportAsCompleted( + ctx context.Context, + dirsFound, dirsRead, itemsFound, itemsRead, byteCount int64, + errs error, +) { close(oc.data) status := support.CreateStatus(ctx, support.Backup, 1, // num folders (always 1) support.CollectionMetrics{ - Objects: int(itemsFound), // items to read, - Successes: int(itemsRead), // items read successfully, - TotalBytes: byteCount, // Number of bytes read in the operation, + Objects: int(itemsFound) + int(dirsFound), // items to read, + Successes: int(itemsRead) + int(dirsRead), // items read successfully, + TotalBytes: byteCount, // Number of bytes read in the operation, }, errs, oc.folderPath.Folder(), // Additional details diff --git a/src/internal/connector/onedrive/collection_test.go b/src/internal/connector/onedrive/collection_test.go index 6fb8f74f3..ecbdf7e24 100644 --- a/src/internal/connector/onedrive/collection_test.go +++ b/src/internal/connector/onedrive/collection_test.go @@ -63,6 +63,11 @@ func (suite *CollectionUnitTestSuite) TestCollection() { }} ) + testItemMetaBytes, err := json.Marshal(testItemMeta) + if err != nil { + suite.T().Fatal("unable to marshall test permissions", err) + } + type nst struct { name string size int64 @@ -181,12 +186,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { _ string, _ models.DriveItemable, ) (io.ReadCloser, int, error) { - metaJSON, err := json.Marshal(testItemMeta) - if err != nil { - return nil, 0, err - } - - return io.NopCloser(bytes.NewReader(metaJSON)), len(metaJSON), nil + return io.NopCloser(bytes.NewReader(testItemMetaBytes)), len(testItemMetaBytes), nil } // Read items from the collection @@ -199,52 +199,58 @@ func (suite *CollectionUnitTestSuite) TestCollection() { wg.Wait() if test.source == OneDriveSource { - require.Len(t, readItems, 2) // .data and .meta + assert.Len(t, readItems, 2) // .data and .meta + assert.Equal(t, 2, collStatus.ObjectCount) + assert.Equal(t, 2, collStatus.Successful) } else { - require.Len(t, readItems, 1) + assert.Len(t, readItems, 1) + assert.Equal(t, 1, collStatus.ObjectCount) + assert.Equal(t, 1, collStatus.Successful) } - // Expect only 1 item - require.Equal(t, 1, collStatus.ObjectCount) - require.Equal(t, 1, collStatus.Successful) + var ( + foundData bool + foundMeta bool + ) - // Validate item info and data - readItem := readItems[0] - readItemInfo := readItem.(data.StreamInfo) + for _, readItem := range readItems { + readItemInfo := readItem.(data.StreamInfo) + id := readItem.UUID() - if test.source == OneDriveSource { - assert.Equal(t, testItemName+DataFileSuffix, readItem.UUID()) - } else { - assert.Equal(t, testItemName, readItem.UUID()) - } - - require.Implements(t, (*data.StreamModTime)(nil), readItem) - mt := readItem.(data.StreamModTime) - assert.Equal(t, now, mt.ModTime()) - - readData, err := io.ReadAll(readItem.ToReader()) - require.NoError(t, err) - - name, parentPath := test.infoFrom(t, readItemInfo.Info()) - - assert.Equal(t, testItemData, readData) - assert.Equal(t, testItemName, name) - assert.Equal(t, driveFolderPath, parentPath) - - if test.source == OneDriveSource { - readItemMeta := readItems[1] - - assert.Equal(t, testItemName+MetaFileSuffix, readItemMeta.UUID()) - - readMetaData, err := io.ReadAll(readItemMeta.ToReader()) - require.NoError(t, err) - - tm, err := json.Marshal(testItemMeta) - if err != nil { - t.Fatal("unable to marshall test permissions", err) + if strings.HasSuffix(id, DataFileSuffix) { + foundData = true } - assert.Equal(t, tm, readMetaData) + var hasMeta bool + if strings.HasSuffix(id, MetaFileSuffix) { + foundMeta = true + hasMeta = true + } + + assert.Contains(t, testItemName, id) + require.Implements(t, (*data.StreamModTime)(nil), readItem) + + mt := readItem.(data.StreamModTime) + assert.Equal(t, now, mt.ModTime()) + + readData, err := io.ReadAll(readItem.ToReader()) + require.NoError(t, err) + + name, parentPath := test.infoFrom(t, readItemInfo.Info()) + assert.Equal(t, testItemData, readData) + assert.Equal(t, testItemName, name) + assert.Equal(t, driveFolderPath, parentPath) + + if hasMeta { + ra, err := io.ReadAll(readItem.ToReader()) + require.NoError(t, err) + assert.Equal(t, testItemMetaBytes, ra) + } + } + + if test.source == OneDriveSource { + assert.True(t, foundData, "found data file") + assert.True(t, foundMeta, "found metadata file") } }) } @@ -359,7 +365,7 @@ func (suite *CollectionUnitTestSuite) TestCollectionDisablePermissionsBackup() { graph.HTTPClient(graph.NoTimeout()), folderPath, "fakeDriveID", - suite, + &MockGraphService{}, suite.testStatusUpdater(&wg, &collStatus), test.source, control.Options{ToggleFeatures: control.Toggles{}}) @@ -398,15 +404,14 @@ func (suite *CollectionUnitTestSuite) TestCollectionDisablePermissionsBackup() { wg.Wait() - // Expect no items - require.Equal(t, 1, collStatus.ObjectCount) - require.Equal(t, 1, collStatus.Successful) + assert.Equal(t, 1, collStatus.ObjectCount, "total objects") + assert.Equal(t, 1, collStatus.Successful, "successes") for _, i := range readItems { if strings.HasSuffix(i.UUID(), MetaFileSuffix) { content, err := io.ReadAll(i.ToReader()) require.NoError(t, err) - require.Equal(t, content, []byte("{}")) + assert.Equal(t, content, []byte("{}")) } } }) @@ -494,7 +499,7 @@ func (suite *CollectionUnitTestSuite) TestStreamItem() { errsIs: func(t *testing.T, e error, count int) { assert.True(t, graph.IsErrUnauthorized(e), "is unauthorized error") assert.ErrorIs(t, e, graph.Err401Unauthorized) - assert.Equal(t, 2, count, "count of errors aggregated") + assert.Equal(t, 1, count, "count of errors aggregated") }, readErrIs: func(t *testing.T, e error) { assert.True(t, graph.IsErrUnauthorized(e), "is unauthorized error") @@ -519,24 +524,6 @@ func (suite *CollectionUnitTestSuite) TestStreamItem() { assert.ErrorIs(t, e, context.DeadlineExceeded) }, }, - { - name: "throttled errors", - expectData: "", - coll: &Collection{ - data: mockDataChan(), - itemReader: mockReader("foo", graph.Err429TooManyRequests), - itemGetter: mockGetter(nil), - }, - errsIs: func(t *testing.T, e error, count int) { - assert.True(t, graph.IsErrThrottled(e), "is throttled error") - assert.ErrorIs(t, e, graph.Err429TooManyRequests) - assert.Equal(t, 1, count, "one errors") - }, - readErrIs: func(t *testing.T, e error) { - assert.True(t, graph.IsErrThrottled(e), "is throttled error") - assert.ErrorIs(t, e, graph.Err429TooManyRequests) - }, - }, { name: "internal server errors", expectData: "", @@ -567,7 +554,7 @@ func (suite *CollectionUnitTestSuite) TestStreamItem() { errCount int size int64 - countUpdater = func(sz int64) { size = sz } + countUpdater = func(sz, ds, itms, drd, ird int64) { size = sz } errUpdater = func(s string, e error) { errs = multierror.Append(errs, e) errCount++