continued progress, setting aside

This commit is contained in:
ryanfkeepers 2023-02-07 11:12:23 -07:00
parent 9059ad758b
commit c2d206c2a2
3 changed files with 253 additions and 229 deletions

View File

@ -23,7 +23,7 @@ import (
const ( const (
logGraphRequestsEnvKey = "LOG_GRAPH_REQUESTS" logGraphRequestsEnvKey = "LOG_GRAPH_REQUESTS"
numberOfRetries = 3 numberOfRetries = 1
) )
// AllMetadataFileNames produces the standard set of filenames used to store graph // AllMetadataFileNames produces the standard set of filenames used to store graph
@ -313,7 +313,7 @@ func RunWithRetry(run func() error) error {
} }
if i < numberOfRetries { if i < numberOfRetries {
time.Sleep(time.Duration(3*(i+2)) * time.Second) time.Sleep(time.Duration(0*(i+2)) * time.Second)
} }
} }

View File

@ -11,7 +11,6 @@ import (
"time" "time"
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/pkg/errors"
"github.com/spatialcurrent/go-lazy/pkg/lazy" "github.com/spatialcurrent/go-lazy/pkg/lazy"
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
@ -216,7 +215,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
// Retrieve the OneDrive folder path to set later in `details.OneDriveInfo` // Retrieve the OneDrive folder path to set later in `details.OneDriveInfo`
parentPathString, err := path.GetDriveFolderPath(oc.folderPath) parentPathString, err := path.GetDriveFolderPath(oc.folderPath)
if err != nil { if err != nil {
oc.reportAsCompleted(ctx, 0, 0, 0, err) oc.reportAsCompleted(ctx, 0, 0, 0, 0, 0, err)
return return
} }
@ -237,11 +236,11 @@ func (oc *Collection) populateItems(ctx context.Context) {
errs = support.WrapAndAppend(id, err, errs) errs = support.WrapAndAppend(id, err, errs)
} }
countUpdater = func(size, dirs, items, dReads, iReads int64) { countUpdater = func(size, dirs, items, dReads, iReads int64) {
atomic.AddInt64(&dirsRead, dReads)
atomic.AddInt64(&itemsRead, iReads)
atomic.AddInt64(&byteCount, size) atomic.AddInt64(&byteCount, size)
atomic.AddInt64(&dirsFound, dirs) atomic.AddInt64(&dirsFound, dirs)
atomic.AddInt64(&itemsFound, dirs) atomic.AddInt64(&itemsFound, items)
atomic.AddInt64(&dirsRead, dReads)
atomic.AddInt64(&itemsRead, iReads)
} }
) )
@ -265,6 +264,22 @@ func (oc *Collection) populateItems(ctx context.Context) {
wg.Add(1) wg.Add(1)
// fetch the item's metadata (if it has any, will no-op otherwise),
// and stream it into the collection's data channel
go oc.streamItemMeta(
ctx,
&wg,
semaphoreCh,
folderProgress,
errUpdater,
countUpdater,
item,
parentPathString)
semaphoreCh <- struct{}{}
wg.Add(1)
// fetch the item, and stream it into the collection's data channel // fetch the item, and stream it into the collection's data channel
go oc.streamItem( go oc.streamItem(
ctx, ctx,
@ -279,7 +294,109 @@ func (oc *Collection) populateItems(ctx context.Context) {
wg.Wait() wg.Wait()
oc.reportAsCompleted(ctx, itemsFound, itemsRead, byteCount, errs) oc.reportAsCompleted(ctx, dirsFound, dirsRead, itemsFound, itemsRead, byteCount, errs)
}
func (oc *Collection) streamItemMeta(
ctx context.Context,
wg *sync.WaitGroup,
semaphore <-chan struct{},
progress chan<- struct{},
errUpdater func(string, error),
countUpdater func(int64, int64, int64, int64, int64),
item models.DriveItemable,
parentPath string,
) {
defer wg.Done()
defer func() { <-semaphore }()
var (
id = *item.GetId()
name = *item.GetName()
size = *item.GetSize()
isFile = item.GetFile() != nil
lazyRC io.ReadCloser
info details.ItemInfo
metadata io.ReadCloser
suffix string
dirsFound int64
dirsRead int64
itemsFound int64
itemsRead int64
err error
)
// we only handle metadata for onedrive right now
if oc.source != OneDriveSource {
return
}
if isFile {
itemsFound, itemsRead = 1, 1
suffix = MetaFileSuffix
} else {
dirsFound, dirsRead = 1, 1
suffix = DirMetaFileSuffix
}
info.OneDrive = oneDriveItemInfo(item, size)
info.OneDrive.ParentPath = parentPath
// Construct a new lazy readCloser to feed to the collection consumer.
// This ensures that downloads won't be attempted unless that consumer
// attempts to read bytes. Assumption is that kopia will check things
// like file modtimes before attempting to read.
lazyRC = lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
var msize int
metadata, msize, err = getItemMeta(
ctx,
oc.service,
oc.driveID,
item,
maxRetries,
oc.ctrl.ToggleFeatures.EnablePermissionsBackup,
oc.itemMetaReader)
if err != nil {
errUpdater(id, err)
return nil, err
}
name += suffix
size = int64(msize)
progReader, closer := observe.ItemProgress(
ctx,
metadata,
observe.ItemBackupMsg,
observe.PII(name),
size)
go closer()
return progReader, nil
})
// Item read successfully, record its addition.
//
// Note: this can cause inaccurate counts. Right now it counts all
// the items we intend to read. Errors within the lazy readCloser
// will create a conflict: an item is both successful and erroneous.
// But the async control to fix that is more error-prone than helpful.
//
// TODO: transform this into a stats bus so that async control of stats
// aggregation is handled at the backup level, not at the item iteration
// level.
countUpdater(size, dirsFound, itemsFound, dirsRead, itemsRead)
// stream the item to the data consumer.
oc.data <- &Item{
id: name,
data: lazyRC,
info: info,
}
progress <- struct{}{}
} }
func (oc *Collection) streamItem( func (oc *Collection) streamItem(
@ -296,111 +413,69 @@ func (oc *Collection) streamItem(
defer func() { <-semaphore }() defer func() { <-semaphore }()
var ( var (
itemID = *item.GetId() id = *item.GetId()
itemName = *item.GetName() name = *item.GetName()
itemSize = *item.GetSize() size = *item.GetSize()
isFile = item.GetFile() != nil isFile = item.GetFile() != nil
itemInfo details.ItemInfo lazyRC io.ReadCloser
itemMeta io.ReadCloser info details.ItemInfo
dataSuffix string dirsFound int64
metaSuffix string itemsFound int64
dirsRead int64
dirsFound int64 itemsRead int64
itemsFound int64
dirsRead int64
itemsRead int64
itemMetaSize int
err error
) )
if isFile { if isFile {
itemsFound++ itemsFound++
itemsRead++ itemsRead++
metaSuffix = MetaFileSuffix
} else { } else {
dirsFound++ dirsFound++
dirsRead++ dirsRead++
metaSuffix = DirMetaFileSuffix
} }
switch oc.source { switch oc.source {
case SharePointSource: case SharePointSource:
itemInfo.SharePoint = sharePointItemInfo(item, itemSize) info.SharePoint = sharePointItemInfo(item, size)
itemInfo.SharePoint.ParentPath = parentPath info.SharePoint.ParentPath = parentPath
default: default:
itemInfo.OneDrive = oneDriveItemInfo(item, itemSize) info.OneDrive = oneDriveItemInfo(item, size)
itemInfo.OneDrive.ParentPath = parentPath info.OneDrive.ParentPath = parentPath
dataSuffix = DataFileSuffix
} }
// directory handling // Construct a new lazy readCloser to feed to the collection consumer.
if !isFile { // This ensures that downloads won't be attempted unless that consumer
// Construct a new lazy readCloser to feed to the collection consumer. // attempts to read bytes. Assumption is that kopia will check things
// This ensures that downloads won't be attempted unless that consumer // like file modtimes before attempting to read.
// attempts to read bytes. Assumption is that kopia will check things lazyRC = lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
// like file modtimes before attempting to read. itemData, _, err := readDriveItem(
metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) { ctx,
if oc.source == OneDriveSource { oc.service,
itemMeta, itemMetaSize, err = getItemMeta( oc.itemClient,
ctx, oc.driveID, id,
oc.service, item,
oc.driveID, oc.itemReader,
item, oc.itemGetter)
maxRetries, if err != nil {
oc.ctrl.ToggleFeatures.EnablePermissionsBackup, errUpdater(id, err)
oc.itemMetaReader) return nil, err
if err != nil { }
errUpdater(itemID, err)
return nil, err
}
}
progReader, closer := observe.ItemProgress( // this separates onedrive items from their metadata
ctx, itemMeta, observe.ItemBackupMsg, if oc.source == OneDriveSource && isFile {
observe.PII(itemName+metaSuffix), int64(itemMetaSize)) name += DataFileSuffix
go closer() }
return progReader, nil
})
} else {
// Construct a new lazy readCloser to feed to the collection consumer.
// This ensures that downloads won't be attempted unless that consumer
// attempts to read bytes. Assumption is that kopia will check things
// like file modtimes before attempting to read.
lazyData := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
// Read the item
var (
itemData io.ReadCloser
err error
)
itemData, item, err = readDriveItem( progReader, closer := observe.ItemProgress(
ctx, ctx,
oc.service, itemData,
oc.itemClient, observe.ItemBackupMsg,
oc.driveID, itemID, observe.PII(name),
item, size)
oc.itemReader, go closer()
oc.itemGetter) return progReader, nil
if err != nil { })
errUpdater(itemID, err)
return nil, err
}
// display/log the item download
progReader, closer := observe.ItemProgress(
ctx,
itemData,
observe.ItemBackupMsg,
observe.PII(itemName+dataSuffix),
itemSize)
go closer()
return progReader, nil
})
}
// Item read successfully, record its addition. // Item read successfully, record its addition.
// //
@ -412,21 +487,13 @@ func (oc *Collection) streamItem(
// TODO: transform this into a stats bus so that async control of stats // TODO: transform this into a stats bus so that async control of stats
// aggregation is handled at the backup level, not at the item iteration // aggregation is handled at the backup level, not at the item iteration
// level. // level.
countUpdater(itemSize, dirsFound, itemsFound, dirsRead, itemsRead) countUpdater(size, dirsFound, itemsFound, dirsRead, itemsRead)
if hasMeta {
oc.data <- &Item{
id: itemName + metaSuffix,
data: metaReader,
info: itemInfo,
}
}
// stream the item to the data consumer. // stream the item to the data consumer.
oc.data <- &Item{ oc.data <- &Item{
id: itemName + dataSuffix, id: name,
data: lazyData, data: lazyRC,
info: itemInfo, info: info,
} }
progress <- struct{}{} progress <- struct{}{}
@ -441,32 +508,24 @@ func getItemMeta(
enablePermissionsBackup bool, enablePermissionsBackup bool,
read itemMetaReaderFunc, read itemMetaReaderFunc,
) (io.ReadCloser, int, error) { ) (io.ReadCloser, int, error) {
if !enablePermissionsBackup {
// We are still writing the metadata file but with
// empty permissions as we don't have a way to
// signify that the permissions was explicitly
// not added.
return io.NopCloser(strings.NewReader("{}")), 2, nil
}
var ( var (
rc io.ReadCloser rc io.ReadCloser
size int size int
err error
) )
for i := 1; i <= maxRetries; i++ { err = graph.RunWithRetry(func() error {
if !enablePermissionsBackup {
// We are still writing the metadata file but with
// empty permissions as we don't have a way to
// signify that the permissions was explicitly
// not added.
return io.NopCloser(strings.NewReader("{}")), 2, nil
}
var err error
rc, size, err = read(ctx, service, driveID, item) rc, size, err = read(ctx, service, driveID, item)
if err == nil || return err
!graph.IsErrTimeout(err) || })
!graph.IsInternalServerError(err) {
return nil, 0, errors.Wrap(err, "getting item metadata")
}
if i < maxRetries {
time.Sleep(1 * time.Second)
}
}
return rc, size, nil return rc, size, nil
} }
@ -486,49 +545,27 @@ func readDriveItem(
item = original item = original
) )
for i := 0; i < maxRetries; i++ { err = graph.RunWithRetry(func() error {
_, rc, err = read(itemClient, item) _, rc, err = read(itemClient, item)
if err == nil { return err
return nil, nil, errors.Wrap(err, "reading drive item") })
}
if graph.IsErrUnauthorized(err) {
// assume unauthorized requests are a sign of an expired
// jwt token, and that we've overrun the available window
// to download the actual file. Re-downloading the item
// will refresh that download url.
di, diErr := get(ctx, service, driveID, itemID)
if diErr != nil {
return nil, nil, errors.Wrap(diErr, "retrieving item to refresh download url")
}
item = di
continue
} else if !graph.IsErrTimeout(err) &&
!graph.IsInternalServerError(err) {
// for all non-timeout, non-internal errors, do not retry
break
}
if i < maxRetries {
time.Sleep(1 * time.Second)
}
}
return rc, item, err return rc, item, err
} }
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsRead, itemsFound, byteCount int64, errs error) { func (oc *Collection) reportAsCompleted(
ctx context.Context,
dirsFound, dirsRead, itemsFound, itemsRead, byteCount int64,
errs error,
) {
close(oc.data) close(oc.data)
status := support.CreateStatus(ctx, support.Backup, status := support.CreateStatus(ctx, support.Backup,
1, // num folders (always 1) 1, // num folders (always 1)
support.CollectionMetrics{ support.CollectionMetrics{
Objects: int(itemsFound), // items to read, Objects: int(itemsFound) + int(dirsFound), // items to read,
Successes: int(itemsRead), // items read successfully, Successes: int(itemsRead) + int(dirsRead), // items read successfully,
TotalBytes: byteCount, // Number of bytes read in the operation, TotalBytes: byteCount, // Number of bytes read in the operation,
}, },
errs, errs,
oc.folderPath.Folder(), // Additional details oc.folderPath.Folder(), // Additional details

View File

@ -63,6 +63,11 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
}} }}
) )
testItemMetaBytes, err := json.Marshal(testItemMeta)
if err != nil {
suite.T().Fatal("unable to marshall test permissions", err)
}
type nst struct { type nst struct {
name string name string
size int64 size int64
@ -181,12 +186,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
_ string, _ string,
_ models.DriveItemable, _ models.DriveItemable,
) (io.ReadCloser, int, error) { ) (io.ReadCloser, int, error) {
metaJSON, err := json.Marshal(testItemMeta) return io.NopCloser(bytes.NewReader(testItemMetaBytes)), len(testItemMetaBytes), nil
if err != nil {
return nil, 0, err
}
return io.NopCloser(bytes.NewReader(metaJSON)), len(metaJSON), nil
} }
// Read items from the collection // Read items from the collection
@ -199,52 +199,58 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
wg.Wait() wg.Wait()
if test.source == OneDriveSource { if test.source == OneDriveSource {
require.Len(t, readItems, 2) // .data and .meta assert.Len(t, readItems, 2) // .data and .meta
assert.Equal(t, 2, collStatus.ObjectCount)
assert.Equal(t, 2, collStatus.Successful)
} else { } else {
require.Len(t, readItems, 1) assert.Len(t, readItems, 1)
assert.Equal(t, 1, collStatus.ObjectCount)
assert.Equal(t, 1, collStatus.Successful)
} }
// Expect only 1 item var (
require.Equal(t, 1, collStatus.ObjectCount) foundData bool
require.Equal(t, 1, collStatus.Successful) foundMeta bool
)
// Validate item info and data for _, readItem := range readItems {
readItem := readItems[0] readItemInfo := readItem.(data.StreamInfo)
readItemInfo := readItem.(data.StreamInfo) id := readItem.UUID()
if test.source == OneDriveSource { if strings.HasSuffix(id, DataFileSuffix) {
assert.Equal(t, testItemName+DataFileSuffix, readItem.UUID()) foundData = true
} else {
assert.Equal(t, testItemName, readItem.UUID())
}
require.Implements(t, (*data.StreamModTime)(nil), readItem)
mt := readItem.(data.StreamModTime)
assert.Equal(t, now, mt.ModTime())
readData, err := io.ReadAll(readItem.ToReader())
require.NoError(t, err)
name, parentPath := test.infoFrom(t, readItemInfo.Info())
assert.Equal(t, testItemData, readData)
assert.Equal(t, testItemName, name)
assert.Equal(t, driveFolderPath, parentPath)
if test.source == OneDriveSource {
readItemMeta := readItems[1]
assert.Equal(t, testItemName+MetaFileSuffix, readItemMeta.UUID())
readMetaData, err := io.ReadAll(readItemMeta.ToReader())
require.NoError(t, err)
tm, err := json.Marshal(testItemMeta)
if err != nil {
t.Fatal("unable to marshall test permissions", err)
} }
assert.Equal(t, tm, readMetaData) var hasMeta bool
if strings.HasSuffix(id, MetaFileSuffix) {
foundMeta = true
hasMeta = true
}
assert.Contains(t, testItemName, id)
require.Implements(t, (*data.StreamModTime)(nil), readItem)
mt := readItem.(data.StreamModTime)
assert.Equal(t, now, mt.ModTime())
readData, err := io.ReadAll(readItem.ToReader())
require.NoError(t, err)
name, parentPath := test.infoFrom(t, readItemInfo.Info())
assert.Equal(t, testItemData, readData)
assert.Equal(t, testItemName, name)
assert.Equal(t, driveFolderPath, parentPath)
if hasMeta {
ra, err := io.ReadAll(readItem.ToReader())
require.NoError(t, err)
assert.Equal(t, testItemMetaBytes, ra)
}
}
if test.source == OneDriveSource {
assert.True(t, foundData, "found data file")
assert.True(t, foundMeta, "found metadata file")
} }
}) })
} }
@ -359,7 +365,7 @@ func (suite *CollectionUnitTestSuite) TestCollectionDisablePermissionsBackup() {
graph.HTTPClient(graph.NoTimeout()), graph.HTTPClient(graph.NoTimeout()),
folderPath, folderPath,
"fakeDriveID", "fakeDriveID",
suite, &MockGraphService{},
suite.testStatusUpdater(&wg, &collStatus), suite.testStatusUpdater(&wg, &collStatus),
test.source, test.source,
control.Options{ToggleFeatures: control.Toggles{}}) control.Options{ToggleFeatures: control.Toggles{}})
@ -398,15 +404,14 @@ func (suite *CollectionUnitTestSuite) TestCollectionDisablePermissionsBackup() {
wg.Wait() wg.Wait()
// Expect no items assert.Equal(t, 1, collStatus.ObjectCount, "total objects")
require.Equal(t, 1, collStatus.ObjectCount) assert.Equal(t, 1, collStatus.Successful, "successes")
require.Equal(t, 1, collStatus.Successful)
for _, i := range readItems { for _, i := range readItems {
if strings.HasSuffix(i.UUID(), MetaFileSuffix) { if strings.HasSuffix(i.UUID(), MetaFileSuffix) {
content, err := io.ReadAll(i.ToReader()) content, err := io.ReadAll(i.ToReader())
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, content, []byte("{}")) assert.Equal(t, content, []byte("{}"))
} }
} }
}) })
@ -494,7 +499,7 @@ func (suite *CollectionUnitTestSuite) TestStreamItem() {
errsIs: func(t *testing.T, e error, count int) { errsIs: func(t *testing.T, e error, count int) {
assert.True(t, graph.IsErrUnauthorized(e), "is unauthorized error") assert.True(t, graph.IsErrUnauthorized(e), "is unauthorized error")
assert.ErrorIs(t, e, graph.Err401Unauthorized) assert.ErrorIs(t, e, graph.Err401Unauthorized)
assert.Equal(t, 2, count, "count of errors aggregated") assert.Equal(t, 1, count, "count of errors aggregated")
}, },
readErrIs: func(t *testing.T, e error) { readErrIs: func(t *testing.T, e error) {
assert.True(t, graph.IsErrUnauthorized(e), "is unauthorized error") assert.True(t, graph.IsErrUnauthorized(e), "is unauthorized error")
@ -519,24 +524,6 @@ func (suite *CollectionUnitTestSuite) TestStreamItem() {
assert.ErrorIs(t, e, context.DeadlineExceeded) assert.ErrorIs(t, e, context.DeadlineExceeded)
}, },
}, },
{
name: "throttled errors",
expectData: "",
coll: &Collection{
data: mockDataChan(),
itemReader: mockReader("foo", graph.Err429TooManyRequests),
itemGetter: mockGetter(nil),
},
errsIs: func(t *testing.T, e error, count int) {
assert.True(t, graph.IsErrThrottled(e), "is throttled error")
assert.ErrorIs(t, e, graph.Err429TooManyRequests)
assert.Equal(t, 1, count, "one errors")
},
readErrIs: func(t *testing.T, e error) {
assert.True(t, graph.IsErrThrottled(e), "is throttled error")
assert.ErrorIs(t, e, graph.Err429TooManyRequests)
},
},
{ {
name: "internal server errors", name: "internal server errors",
expectData: "", expectData: "",
@ -567,7 +554,7 @@ func (suite *CollectionUnitTestSuite) TestStreamItem() {
errCount int errCount int
size int64 size int64
countUpdater = func(sz int64) { size = sz } countUpdater = func(sz, ds, itms, drd, ird int64) { size = sz }
errUpdater = func(s string, e error) { errUpdater = func(s string, e error) {
errs = multierror.Append(errs, e) errs = multierror.Append(errs, e)
errCount++ errCount++