Compare commits
3 Commits
main
...
issue-2267
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c2d206c2a2 | ||
|
|
9059ad758b | ||
|
|
b63f45acb4 |
@ -23,7 +23,7 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
logGraphRequestsEnvKey = "LOG_GRAPH_REQUESTS"
|
logGraphRequestsEnvKey = "LOG_GRAPH_REQUESTS"
|
||||||
numberOfRetries = 3
|
numberOfRetries = 1
|
||||||
)
|
)
|
||||||
|
|
||||||
// AllMetadataFileNames produces the standard set of filenames used to store graph
|
// AllMetadataFileNames produces the standard set of filenames used to store graph
|
||||||
@ -313,7 +313,7 @@ func RunWithRetry(run func() error) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if i < numberOfRetries {
|
if i < numberOfRetries {
|
||||||
time.Sleep(time.Duration(3*(i+2)) * time.Second)
|
time.Sleep(time.Duration(0*(i+2)) * time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -11,7 +11,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||||
"github.com/pkg/errors"
|
|
||||||
"github.com/spatialcurrent/go-lazy/pkg/lazy"
|
"github.com/spatialcurrent/go-lazy/pkg/lazy"
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||||
@ -61,13 +60,16 @@ type Collection struct {
|
|||||||
// M365 IDs of file items within this collection
|
// M365 IDs of file items within this collection
|
||||||
driveItems map[string]models.DriveItemable
|
driveItems map[string]models.DriveItemable
|
||||||
// M365 ID of the drive this collection was created from
|
// M365 ID of the drive this collection was created from
|
||||||
driveID string
|
driveID string
|
||||||
source driveSource
|
source driveSource
|
||||||
service graph.Servicer
|
service graph.Servicer
|
||||||
statusUpdater support.StatusUpdater
|
statusUpdater support.StatusUpdater
|
||||||
|
ctrl control.Options
|
||||||
|
|
||||||
|
// TODO: these should be interfaces, not funcs
|
||||||
itemReader itemReaderFunc
|
itemReader itemReaderFunc
|
||||||
itemMetaReader itemMetaReaderFunc
|
itemMetaReader itemMetaReaderFunc
|
||||||
ctrl control.Options
|
itemGetter itemGetterFunc
|
||||||
|
|
||||||
// should only be true if the old delta token expired
|
// should only be true if the old delta token expired
|
||||||
doNotMergeItems bool
|
doNotMergeItems bool
|
||||||
@ -88,6 +90,12 @@ type itemMetaReaderFunc func(
|
|||||||
item models.DriveItemable,
|
item models.DriveItemable,
|
||||||
) (io.ReadCloser, int, error)
|
) (io.ReadCloser, int, error)
|
||||||
|
|
||||||
|
type itemGetterFunc func(
|
||||||
|
ctx context.Context,
|
||||||
|
srv graph.Servicer,
|
||||||
|
driveID, itemID string,
|
||||||
|
) (models.DriveItemable, error)
|
||||||
|
|
||||||
// NewCollection creates a Collection
|
// NewCollection creates a Collection
|
||||||
func NewCollection(
|
func NewCollection(
|
||||||
itemClient *http.Client,
|
itemClient *http.Client,
|
||||||
@ -99,15 +107,16 @@ func NewCollection(
|
|||||||
ctrlOpts control.Options,
|
ctrlOpts control.Options,
|
||||||
) *Collection {
|
) *Collection {
|
||||||
c := &Collection{
|
c := &Collection{
|
||||||
itemClient: itemClient,
|
|
||||||
folderPath: folderPath,
|
|
||||||
driveItems: map[string]models.DriveItemable{},
|
|
||||||
driveID: driveID,
|
|
||||||
source: source,
|
|
||||||
service: service,
|
|
||||||
data: make(chan data.Stream, collectionChannelBufferSize),
|
|
||||||
statusUpdater: statusUpdater,
|
|
||||||
ctrl: ctrlOpts,
|
ctrl: ctrlOpts,
|
||||||
|
data: make(chan data.Stream, collectionChannelBufferSize),
|
||||||
|
driveID: driveID,
|
||||||
|
driveItems: map[string]models.DriveItemable{},
|
||||||
|
folderPath: folderPath,
|
||||||
|
itemClient: itemClient,
|
||||||
|
itemGetter: getDriveItem,
|
||||||
|
service: service,
|
||||||
|
source: source,
|
||||||
|
statusUpdater: statusUpdater,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allows tests to set a mock populator
|
// Allows tests to set a mock populator
|
||||||
@ -203,6 +212,13 @@ func (od *Item) ModTime() time.Time {
|
|||||||
// populateItems iterates through items added to the collection
|
// populateItems iterates through items added to the collection
|
||||||
// and uses the collection `itemReader` to read the item
|
// and uses the collection `itemReader` to read the item
|
||||||
func (oc *Collection) populateItems(ctx context.Context) {
|
func (oc *Collection) populateItems(ctx context.Context) {
|
||||||
|
// Retrieve the OneDrive folder path to set later in `details.OneDriveInfo`
|
||||||
|
parentPathString, err := path.GetDriveFolderPath(oc.folderPath)
|
||||||
|
if err != nil {
|
||||||
|
oc.reportAsCompleted(ctx, 0, 0, 0, 0, 0, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errs error
|
errs error
|
||||||
byteCount int64
|
byteCount int64
|
||||||
@ -210,17 +226,23 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
dirsRead int64
|
dirsRead int64
|
||||||
itemsFound int64
|
itemsFound int64
|
||||||
dirsFound int64
|
dirsFound int64
|
||||||
wg sync.WaitGroup
|
|
||||||
m sync.Mutex
|
|
||||||
)
|
|
||||||
|
|
||||||
// Retrieve the OneDrive folder path to set later in
|
wg sync.WaitGroup
|
||||||
// `details.OneDriveInfo`
|
m sync.Mutex
|
||||||
parentPathString, err := path.GetDriveFolderPath(oc.folderPath)
|
|
||||||
if err != nil {
|
errUpdater = func(id string, err error) {
|
||||||
oc.reportAsCompleted(ctx, 0, 0, 0, err)
|
m.Lock()
|
||||||
return
|
defer m.Unlock()
|
||||||
}
|
errs = support.WrapAndAppend(id, err, errs)
|
||||||
|
}
|
||||||
|
countUpdater = func(size, dirs, items, dReads, iReads int64) {
|
||||||
|
atomic.AddInt64(&byteCount, size)
|
||||||
|
atomic.AddInt64(&dirsFound, dirs)
|
||||||
|
atomic.AddInt64(&itemsFound, items)
|
||||||
|
atomic.AddInt64(&dirsRead, dReads)
|
||||||
|
atomic.AddInt64(&itemsRead, iReads)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
folderProgress, colCloser := observe.ProgressWithCount(
|
folderProgress, colCloser := observe.ProgressWithCount(
|
||||||
ctx,
|
ctx,
|
||||||
@ -233,12 +255,6 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
|
semaphoreCh := make(chan struct{}, urlPrefetchChannelBufferSize)
|
||||||
defer close(semaphoreCh)
|
defer close(semaphoreCh)
|
||||||
|
|
||||||
errUpdater := func(id string, err error) {
|
|
||||||
m.Lock()
|
|
||||||
errs = support.WrapAndAppend(id, err, errs)
|
|
||||||
m.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, item := range oc.driveItems {
|
for _, item := range oc.driveItems {
|
||||||
if oc.ctrl.FailFast && errs != nil {
|
if oc.ctrl.FailFast && errs != nil {
|
||||||
break
|
break
|
||||||
@ -248,198 +264,308 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
|||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
go func(item models.DriveItemable) {
|
// fetch the item's metadata (if it has any, will no-op otherwise),
|
||||||
defer wg.Done()
|
// and stream it into the collection's data channel
|
||||||
defer func() { <-semaphoreCh }()
|
go oc.streamItemMeta(
|
||||||
|
ctx,
|
||||||
|
&wg,
|
||||||
|
semaphoreCh,
|
||||||
|
folderProgress,
|
||||||
|
errUpdater,
|
||||||
|
countUpdater,
|
||||||
|
item,
|
||||||
|
parentPathString)
|
||||||
|
|
||||||
// Read the item
|
semaphoreCh <- struct{}{}
|
||||||
var (
|
|
||||||
itemID = *item.GetId()
|
|
||||||
itemName = *item.GetName()
|
|
||||||
itemSize = *item.GetSize()
|
|
||||||
itemInfo details.ItemInfo
|
|
||||||
itemMeta io.ReadCloser
|
|
||||||
itemMetaSize int
|
|
||||||
metaSuffix string
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
isFile := item.GetFile() != nil
|
wg.Add(1)
|
||||||
|
|
||||||
if isFile {
|
// fetch the item, and stream it into the collection's data channel
|
||||||
atomic.AddInt64(&itemsFound, 1)
|
go oc.streamItem(
|
||||||
|
ctx,
|
||||||
metaSuffix = MetaFileSuffix
|
&wg,
|
||||||
} else {
|
semaphoreCh,
|
||||||
atomic.AddInt64(&dirsFound, 1)
|
folderProgress,
|
||||||
|
errUpdater,
|
||||||
metaSuffix = DirMetaFileSuffix
|
countUpdater,
|
||||||
}
|
item,
|
||||||
|
parentPathString)
|
||||||
if oc.source == OneDriveSource {
|
|
||||||
// Fetch metadata for the file
|
|
||||||
for i := 1; i <= maxRetries; i++ {
|
|
||||||
if !oc.ctrl.ToggleFeatures.EnablePermissionsBackup {
|
|
||||||
// We are still writing the metadata file but with
|
|
||||||
// empty permissions as we don't have a way to
|
|
||||||
// signify that the permissions was explicitly
|
|
||||||
// not added.
|
|
||||||
itemMeta = io.NopCloser(strings.NewReader("{}"))
|
|
||||||
itemMetaSize = 2
|
|
||||||
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
itemMeta, itemMetaSize, err = oc.itemMetaReader(ctx, oc.service, oc.driveID, item)
|
|
||||||
|
|
||||||
// retry on Timeout type errors, break otherwise.
|
|
||||||
if err == nil ||
|
|
||||||
!graph.IsErrTimeout(err) ||
|
|
||||||
!graph.IsInternalServerError(err) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if i < maxRetries {
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
errUpdater(*item.GetId(), errors.Wrap(err, "failed to get item permissions"))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
switch oc.source {
|
|
||||||
case SharePointSource:
|
|
||||||
itemInfo.SharePoint = sharePointItemInfo(item, itemSize)
|
|
||||||
itemInfo.SharePoint.ParentPath = parentPathString
|
|
||||||
default:
|
|
||||||
itemInfo.OneDrive = oneDriveItemInfo(item, itemSize)
|
|
||||||
itemInfo.OneDrive.ParentPath = parentPathString
|
|
||||||
}
|
|
||||||
|
|
||||||
if isFile {
|
|
||||||
dataSuffix := ""
|
|
||||||
if oc.source == OneDriveSource {
|
|
||||||
dataSuffix = DataFileSuffix
|
|
||||||
}
|
|
||||||
|
|
||||||
// Construct a new lazy readCloser to feed to the collection consumer.
|
|
||||||
// This ensures that downloads won't be attempted unless that consumer
|
|
||||||
// attempts to read bytes. Assumption is that kopia will check things
|
|
||||||
// like file modtimes before attempting to read.
|
|
||||||
itemReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
|
||||||
// Read the item
|
|
||||||
var (
|
|
||||||
itemData io.ReadCloser
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
for i := 1; i <= maxRetries; i++ {
|
|
||||||
_, itemData, err = oc.itemReader(oc.itemClient, item)
|
|
||||||
if err == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if graph.IsErrUnauthorized(err) {
|
|
||||||
// assume unauthorized requests are a sign of an expired
|
|
||||||
// jwt token, and that we've overrun the available window
|
|
||||||
// to download the actual file. Re-downloading the item
|
|
||||||
// will refresh that download url.
|
|
||||||
di, diErr := getDriveItem(ctx, oc.service, oc.driveID, itemID)
|
|
||||||
if diErr != nil {
|
|
||||||
err = errors.Wrap(diErr, "retrieving expired item")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
item = di
|
|
||||||
|
|
||||||
continue
|
|
||||||
|
|
||||||
} else if !graph.IsErrTimeout(err) &&
|
|
||||||
!graph.IsInternalServerError(err) {
|
|
||||||
// Don't retry for non-timeout, on-unauth, as
|
|
||||||
// we are already retrying it in the default
|
|
||||||
// retry middleware
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if i < maxRetries {
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// check for errors following retries
|
|
||||||
if err != nil {
|
|
||||||
errUpdater(itemID, err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// display/log the item download
|
|
||||||
progReader, closer := observe.ItemProgress(
|
|
||||||
ctx,
|
|
||||||
itemData,
|
|
||||||
observe.ItemBackupMsg,
|
|
||||||
observe.PII(itemName+dataSuffix),
|
|
||||||
itemSize,
|
|
||||||
)
|
|
||||||
go closer()
|
|
||||||
|
|
||||||
return progReader, nil
|
|
||||||
})
|
|
||||||
|
|
||||||
oc.data <- &Item{
|
|
||||||
id: itemName + dataSuffix,
|
|
||||||
data: itemReader,
|
|
||||||
info: itemInfo,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if oc.source == OneDriveSource {
|
|
||||||
metaReader := lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
|
||||||
progReader, closer := observe.ItemProgress(
|
|
||||||
ctx, itemMeta, observe.ItemBackupMsg,
|
|
||||||
observe.PII(itemName+metaSuffix), int64(itemMetaSize))
|
|
||||||
go closer()
|
|
||||||
return progReader, nil
|
|
||||||
})
|
|
||||||
|
|
||||||
oc.data <- &Item{
|
|
||||||
id: itemName + metaSuffix,
|
|
||||||
data: metaReader,
|
|
||||||
info: itemInfo,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Item read successfully, add to collection
|
|
||||||
if isFile {
|
|
||||||
atomic.AddInt64(&itemsRead, 1)
|
|
||||||
} else {
|
|
||||||
atomic.AddInt64(&dirsRead, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// byteCount iteration
|
|
||||||
atomic.AddInt64(&byteCount, itemSize)
|
|
||||||
|
|
||||||
folderProgress <- struct{}{}
|
|
||||||
}(item)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
oc.reportAsCompleted(ctx, int(itemsFound), int(itemsRead), byteCount, errs)
|
oc.reportAsCompleted(ctx, dirsFound, dirsRead, itemsFound, itemsRead, byteCount, errs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oc *Collection) reportAsCompleted(ctx context.Context, itemsFound, itemsRead int, byteCount int64, errs error) {
|
func (oc *Collection) streamItemMeta(
|
||||||
|
ctx context.Context,
|
||||||
|
wg *sync.WaitGroup,
|
||||||
|
semaphore <-chan struct{},
|
||||||
|
progress chan<- struct{},
|
||||||
|
errUpdater func(string, error),
|
||||||
|
countUpdater func(int64, int64, int64, int64, int64),
|
||||||
|
item models.DriveItemable,
|
||||||
|
parentPath string,
|
||||||
|
) {
|
||||||
|
defer wg.Done()
|
||||||
|
defer func() { <-semaphore }()
|
||||||
|
|
||||||
|
var (
|
||||||
|
id = *item.GetId()
|
||||||
|
name = *item.GetName()
|
||||||
|
size = *item.GetSize()
|
||||||
|
isFile = item.GetFile() != nil
|
||||||
|
|
||||||
|
lazyRC io.ReadCloser
|
||||||
|
info details.ItemInfo
|
||||||
|
metadata io.ReadCloser
|
||||||
|
|
||||||
|
suffix string
|
||||||
|
dirsFound int64
|
||||||
|
dirsRead int64
|
||||||
|
itemsFound int64
|
||||||
|
itemsRead int64
|
||||||
|
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
// we only handle metadata for onedrive right now
|
||||||
|
if oc.source != OneDriveSource {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if isFile {
|
||||||
|
itemsFound, itemsRead = 1, 1
|
||||||
|
suffix = MetaFileSuffix
|
||||||
|
} else {
|
||||||
|
dirsFound, dirsRead = 1, 1
|
||||||
|
suffix = DirMetaFileSuffix
|
||||||
|
}
|
||||||
|
|
||||||
|
info.OneDrive = oneDriveItemInfo(item, size)
|
||||||
|
info.OneDrive.ParentPath = parentPath
|
||||||
|
|
||||||
|
// Construct a new lazy readCloser to feed to the collection consumer.
|
||||||
|
// This ensures that downloads won't be attempted unless that consumer
|
||||||
|
// attempts to read bytes. Assumption is that kopia will check things
|
||||||
|
// like file modtimes before attempting to read.
|
||||||
|
lazyRC = lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||||
|
var msize int
|
||||||
|
metadata, msize, err = getItemMeta(
|
||||||
|
ctx,
|
||||||
|
oc.service,
|
||||||
|
oc.driveID,
|
||||||
|
item,
|
||||||
|
maxRetries,
|
||||||
|
oc.ctrl.ToggleFeatures.EnablePermissionsBackup,
|
||||||
|
oc.itemMetaReader)
|
||||||
|
if err != nil {
|
||||||
|
errUpdater(id, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
name += suffix
|
||||||
|
size = int64(msize)
|
||||||
|
|
||||||
|
progReader, closer := observe.ItemProgress(
|
||||||
|
ctx,
|
||||||
|
metadata,
|
||||||
|
observe.ItemBackupMsg,
|
||||||
|
observe.PII(name),
|
||||||
|
size)
|
||||||
|
go closer()
|
||||||
|
return progReader, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// Item read successfully, record its addition.
|
||||||
|
//
|
||||||
|
// Note: this can cause inaccurate counts. Right now it counts all
|
||||||
|
// the items we intend to read. Errors within the lazy readCloser
|
||||||
|
// will create a conflict: an item is both successful and erroneous.
|
||||||
|
// But the async control to fix that is more error-prone than helpful.
|
||||||
|
//
|
||||||
|
// TODO: transform this into a stats bus so that async control of stats
|
||||||
|
// aggregation is handled at the backup level, not at the item iteration
|
||||||
|
// level.
|
||||||
|
countUpdater(size, dirsFound, itemsFound, dirsRead, itemsRead)
|
||||||
|
|
||||||
|
// stream the item to the data consumer.
|
||||||
|
oc.data <- &Item{
|
||||||
|
id: name,
|
||||||
|
data: lazyRC,
|
||||||
|
info: info,
|
||||||
|
}
|
||||||
|
|
||||||
|
progress <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (oc *Collection) streamItem(
|
||||||
|
ctx context.Context,
|
||||||
|
wg *sync.WaitGroup,
|
||||||
|
semaphore <-chan struct{},
|
||||||
|
progress chan<- struct{},
|
||||||
|
errUpdater func(string, error),
|
||||||
|
countUpdater func(int64, int64, int64, int64, int64),
|
||||||
|
item models.DriveItemable,
|
||||||
|
parentPath string,
|
||||||
|
) {
|
||||||
|
defer wg.Done()
|
||||||
|
defer func() { <-semaphore }()
|
||||||
|
|
||||||
|
var (
|
||||||
|
id = *item.GetId()
|
||||||
|
name = *item.GetName()
|
||||||
|
size = *item.GetSize()
|
||||||
|
isFile = item.GetFile() != nil
|
||||||
|
|
||||||
|
lazyRC io.ReadCloser
|
||||||
|
info details.ItemInfo
|
||||||
|
|
||||||
|
dirsFound int64
|
||||||
|
itemsFound int64
|
||||||
|
dirsRead int64
|
||||||
|
itemsRead int64
|
||||||
|
)
|
||||||
|
|
||||||
|
if isFile {
|
||||||
|
itemsFound++
|
||||||
|
itemsRead++
|
||||||
|
} else {
|
||||||
|
dirsFound++
|
||||||
|
dirsRead++
|
||||||
|
}
|
||||||
|
|
||||||
|
switch oc.source {
|
||||||
|
case SharePointSource:
|
||||||
|
info.SharePoint = sharePointItemInfo(item, size)
|
||||||
|
info.SharePoint.ParentPath = parentPath
|
||||||
|
default:
|
||||||
|
info.OneDrive = oneDriveItemInfo(item, size)
|
||||||
|
info.OneDrive.ParentPath = parentPath
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct a new lazy readCloser to feed to the collection consumer.
|
||||||
|
// This ensures that downloads won't be attempted unless that consumer
|
||||||
|
// attempts to read bytes. Assumption is that kopia will check things
|
||||||
|
// like file modtimes before attempting to read.
|
||||||
|
lazyRC = lazy.NewLazyReadCloser(func() (io.ReadCloser, error) {
|
||||||
|
itemData, _, err := readDriveItem(
|
||||||
|
ctx,
|
||||||
|
oc.service,
|
||||||
|
oc.itemClient,
|
||||||
|
oc.driveID, id,
|
||||||
|
item,
|
||||||
|
oc.itemReader,
|
||||||
|
oc.itemGetter)
|
||||||
|
if err != nil {
|
||||||
|
errUpdater(id, err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// this separates onedrive items from their metadata
|
||||||
|
if oc.source == OneDriveSource && isFile {
|
||||||
|
name += DataFileSuffix
|
||||||
|
}
|
||||||
|
|
||||||
|
progReader, closer := observe.ItemProgress(
|
||||||
|
ctx,
|
||||||
|
itemData,
|
||||||
|
observe.ItemBackupMsg,
|
||||||
|
observe.PII(name),
|
||||||
|
size)
|
||||||
|
go closer()
|
||||||
|
return progReader, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// Item read successfully, record its addition.
|
||||||
|
//
|
||||||
|
// Note: this can cause inaccurate counts. Right now it counts all
|
||||||
|
// the items we intend to read. Errors within the lazy readCloser
|
||||||
|
// will create a conflict: an item is both successful and erroneous.
|
||||||
|
// But the async control to fix that is more error-prone than helpful.
|
||||||
|
//
|
||||||
|
// TODO: transform this into a stats bus so that async control of stats
|
||||||
|
// aggregation is handled at the backup level, not at the item iteration
|
||||||
|
// level.
|
||||||
|
countUpdater(size, dirsFound, itemsFound, dirsRead, itemsRead)
|
||||||
|
|
||||||
|
// stream the item to the data consumer.
|
||||||
|
oc.data <- &Item{
|
||||||
|
id: name,
|
||||||
|
data: lazyRC,
|
||||||
|
info: info,
|
||||||
|
}
|
||||||
|
|
||||||
|
progress <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getItemMeta(
|
||||||
|
ctx context.Context,
|
||||||
|
service graph.Servicer,
|
||||||
|
driveID string,
|
||||||
|
item models.DriveItemable,
|
||||||
|
maxRetries int,
|
||||||
|
enablePermissionsBackup bool,
|
||||||
|
read itemMetaReaderFunc,
|
||||||
|
) (io.ReadCloser, int, error) {
|
||||||
|
if !enablePermissionsBackup {
|
||||||
|
// We are still writing the metadata file but with
|
||||||
|
// empty permissions as we don't have a way to
|
||||||
|
// signify that the permissions was explicitly
|
||||||
|
// not added.
|
||||||
|
return io.NopCloser(strings.NewReader("{}")), 2, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
rc io.ReadCloser
|
||||||
|
size int
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
err = graph.RunWithRetry(func() error {
|
||||||
|
rc, size, err = read(ctx, service, driveID, item)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
|
return rc, size, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readDriveItem(
|
||||||
|
ctx context.Context,
|
||||||
|
service graph.Servicer,
|
||||||
|
itemClient *http.Client,
|
||||||
|
driveID, itemID string,
|
||||||
|
original models.DriveItemable,
|
||||||
|
read itemReaderFunc,
|
||||||
|
get itemGetterFunc,
|
||||||
|
) (io.ReadCloser, models.DriveItemable, error) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
rc io.ReadCloser
|
||||||
|
item = original
|
||||||
|
)
|
||||||
|
|
||||||
|
err = graph.RunWithRetry(func() error {
|
||||||
|
_, rc, err = read(itemClient, item)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
|
return rc, item, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (oc *Collection) reportAsCompleted(
|
||||||
|
ctx context.Context,
|
||||||
|
dirsFound, dirsRead, itemsFound, itemsRead, byteCount int64,
|
||||||
|
errs error,
|
||||||
|
) {
|
||||||
close(oc.data)
|
close(oc.data)
|
||||||
|
|
||||||
status := support.CreateStatus(ctx, support.Backup,
|
status := support.CreateStatus(ctx, support.Backup,
|
||||||
1, // num folders (always 1)
|
1, // num folders (always 1)
|
||||||
support.CollectionMetrics{
|
support.CollectionMetrics{
|
||||||
Objects: itemsFound, // items to read,
|
Objects: int(itemsFound) + int(dirsFound), // items to read,
|
||||||
Successes: itemsRead, // items read successfully,
|
Successes: int(itemsRead) + int(dirsRead), // items read successfully,
|
||||||
TotalBytes: byteCount, // Number of bytes read in the operation,
|
TotalBytes: byteCount, // Number of bytes read in the operation,
|
||||||
},
|
},
|
||||||
errs,
|
errs,
|
||||||
oc.folderPath.Folder(), // Additional details
|
oc.folderPath.Folder(), // Additional details
|
||||||
|
|||||||
@ -11,7 +11,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
msgraphsdk "github.com/microsoftgraph/msgraph-sdk-go"
|
"github.com/hashicorp/go-multierror"
|
||||||
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
"github.com/microsoftgraph/msgraph-sdk-go/models"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
"github.com/alcionai/corso/src/pkg/path"
|
"github.com/alcionai/corso/src/pkg/path"
|
||||||
@ -29,17 +30,6 @@ type CollectionUnitTestSuite struct {
|
|||||||
suite.Suite
|
suite.Suite
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allows `*CollectionUnitTestSuite` to be used as a graph.Servicer
|
|
||||||
// TODO: Implement these methods
|
|
||||||
|
|
||||||
func (suite *CollectionUnitTestSuite) Client() *msgraphsdk.GraphServiceClient {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *CollectionUnitTestSuite) Adapter() *msgraphsdk.GraphRequestAdapter {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCollectionUnitTestSuite(t *testing.T) {
|
func TestCollectionUnitTestSuite(t *testing.T) {
|
||||||
suite.Run(t, new(CollectionUnitTestSuite))
|
suite.Run(t, new(CollectionUnitTestSuite))
|
||||||
}
|
}
|
||||||
@ -73,6 +63,11 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
|
|||||||
}}
|
}}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
testItemMetaBytes, err := json.Marshal(testItemMeta)
|
||||||
|
if err != nil {
|
||||||
|
suite.T().Fatal("unable to marshall test permissions", err)
|
||||||
|
}
|
||||||
|
|
||||||
type nst struct {
|
type nst struct {
|
||||||
name string
|
name string
|
||||||
size int64
|
size int64
|
||||||
@ -165,7 +160,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
|
|||||||
graph.HTTPClient(graph.NoTimeout()),
|
graph.HTTPClient(graph.NoTimeout()),
|
||||||
folderPath,
|
folderPath,
|
||||||
"drive-id",
|
"drive-id",
|
||||||
suite,
|
&MockGraphService{},
|
||||||
suite.testStatusUpdater(&wg, &collStatus),
|
suite.testStatusUpdater(&wg, &collStatus),
|
||||||
test.source,
|
test.source,
|
||||||
control.Options{ToggleFeatures: control.Toggles{EnablePermissionsBackup: true}})
|
control.Options{ToggleFeatures: control.Toggles{EnablePermissionsBackup: true}})
|
||||||
@ -191,12 +186,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
|
|||||||
_ string,
|
_ string,
|
||||||
_ models.DriveItemable,
|
_ models.DriveItemable,
|
||||||
) (io.ReadCloser, int, error) {
|
) (io.ReadCloser, int, error) {
|
||||||
metaJSON, err := json.Marshal(testItemMeta)
|
return io.NopCloser(bytes.NewReader(testItemMetaBytes)), len(testItemMetaBytes), nil
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return io.NopCloser(bytes.NewReader(metaJSON)), len(metaJSON), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read items from the collection
|
// Read items from the collection
|
||||||
@ -209,52 +199,58 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
if test.source == OneDriveSource {
|
if test.source == OneDriveSource {
|
||||||
require.Len(t, readItems, 2) // .data and .meta
|
assert.Len(t, readItems, 2) // .data and .meta
|
||||||
|
assert.Equal(t, 2, collStatus.ObjectCount)
|
||||||
|
assert.Equal(t, 2, collStatus.Successful)
|
||||||
} else {
|
} else {
|
||||||
require.Len(t, readItems, 1)
|
assert.Len(t, readItems, 1)
|
||||||
|
assert.Equal(t, 1, collStatus.ObjectCount)
|
||||||
|
assert.Equal(t, 1, collStatus.Successful)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Expect only 1 item
|
var (
|
||||||
require.Equal(t, 1, collStatus.ObjectCount)
|
foundData bool
|
||||||
require.Equal(t, 1, collStatus.Successful)
|
foundMeta bool
|
||||||
|
)
|
||||||
|
|
||||||
// Validate item info and data
|
for _, readItem := range readItems {
|
||||||
readItem := readItems[0]
|
readItemInfo := readItem.(data.StreamInfo)
|
||||||
readItemInfo := readItem.(data.StreamInfo)
|
id := readItem.UUID()
|
||||||
|
|
||||||
if test.source == OneDriveSource {
|
if strings.HasSuffix(id, DataFileSuffix) {
|
||||||
assert.Equal(t, testItemName+DataFileSuffix, readItem.UUID())
|
foundData = true
|
||||||
} else {
|
|
||||||
assert.Equal(t, testItemName, readItem.UUID())
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Implements(t, (*data.StreamModTime)(nil), readItem)
|
|
||||||
mt := readItem.(data.StreamModTime)
|
|
||||||
assert.Equal(t, now, mt.ModTime())
|
|
||||||
|
|
||||||
readData, err := io.ReadAll(readItem.ToReader())
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
name, parentPath := test.infoFrom(t, readItemInfo.Info())
|
|
||||||
|
|
||||||
assert.Equal(t, testItemData, readData)
|
|
||||||
assert.Equal(t, testItemName, name)
|
|
||||||
assert.Equal(t, driveFolderPath, parentPath)
|
|
||||||
|
|
||||||
if test.source == OneDriveSource {
|
|
||||||
readItemMeta := readItems[1]
|
|
||||||
|
|
||||||
assert.Equal(t, testItemName+MetaFileSuffix, readItemMeta.UUID())
|
|
||||||
|
|
||||||
readMetaData, err := io.ReadAll(readItemMeta.ToReader())
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
tm, err := json.Marshal(testItemMeta)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal("unable to marshall test permissions", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Equal(t, tm, readMetaData)
|
var hasMeta bool
|
||||||
|
if strings.HasSuffix(id, MetaFileSuffix) {
|
||||||
|
foundMeta = true
|
||||||
|
hasMeta = true
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Contains(t, testItemName, id)
|
||||||
|
require.Implements(t, (*data.StreamModTime)(nil), readItem)
|
||||||
|
|
||||||
|
mt := readItem.(data.StreamModTime)
|
||||||
|
assert.Equal(t, now, mt.ModTime())
|
||||||
|
|
||||||
|
readData, err := io.ReadAll(readItem.ToReader())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
name, parentPath := test.infoFrom(t, readItemInfo.Info())
|
||||||
|
assert.Equal(t, testItemData, readData)
|
||||||
|
assert.Equal(t, testItemName, name)
|
||||||
|
assert.Equal(t, driveFolderPath, parentPath)
|
||||||
|
|
||||||
|
if hasMeta {
|
||||||
|
ra, err := io.ReadAll(readItem.ToReader())
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, testItemMetaBytes, ra)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.source == OneDriveSource {
|
||||||
|
assert.True(t, foundData, "found data file")
|
||||||
|
assert.True(t, foundMeta, "found metadata file")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -298,7 +294,7 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() {
|
|||||||
graph.HTTPClient(graph.NoTimeout()),
|
graph.HTTPClient(graph.NoTimeout()),
|
||||||
folderPath,
|
folderPath,
|
||||||
"fakeDriveID",
|
"fakeDriveID",
|
||||||
suite,
|
&MockGraphService{},
|
||||||
suite.testStatusUpdater(&wg, &collStatus),
|
suite.testStatusUpdater(&wg, &collStatus),
|
||||||
test.source,
|
test.source,
|
||||||
control.Options{ToggleFeatures: control.Toggles{EnablePermissionsBackup: true}})
|
control.Options{ToggleFeatures: control.Toggles{EnablePermissionsBackup: true}})
|
||||||
@ -369,7 +365,7 @@ func (suite *CollectionUnitTestSuite) TestCollectionDisablePermissionsBackup() {
|
|||||||
graph.HTTPClient(graph.NoTimeout()),
|
graph.HTTPClient(graph.NoTimeout()),
|
||||||
folderPath,
|
folderPath,
|
||||||
"fakeDriveID",
|
"fakeDriveID",
|
||||||
suite,
|
&MockGraphService{},
|
||||||
suite.testStatusUpdater(&wg, &collStatus),
|
suite.testStatusUpdater(&wg, &collStatus),
|
||||||
test.source,
|
test.source,
|
||||||
control.Options{ToggleFeatures: control.Toggles{}})
|
control.Options{ToggleFeatures: control.Toggles{}})
|
||||||
@ -408,17 +404,195 @@ func (suite *CollectionUnitTestSuite) TestCollectionDisablePermissionsBackup() {
|
|||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
// Expect no items
|
assert.Equal(t, 1, collStatus.ObjectCount, "total objects")
|
||||||
require.Equal(t, 1, collStatus.ObjectCount)
|
assert.Equal(t, 1, collStatus.Successful, "successes")
|
||||||
require.Equal(t, 1, collStatus.Successful)
|
|
||||||
|
|
||||||
for _, i := range readItems {
|
for _, i := range readItems {
|
||||||
if strings.HasSuffix(i.UUID(), MetaFileSuffix) {
|
if strings.HasSuffix(i.UUID(), MetaFileSuffix) {
|
||||||
content, err := io.ReadAll(i.ToReader())
|
content, err := io.ReadAll(i.ToReader())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, content, []byte("{}"))
|
assert.Equal(t, content, []byte("{}"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *CollectionUnitTestSuite) TestStreamItem() {
|
||||||
|
var (
|
||||||
|
id = "id"
|
||||||
|
name = "name"
|
||||||
|
size int64 = 42
|
||||||
|
now = time.Now()
|
||||||
|
)
|
||||||
|
|
||||||
|
mockItem := models.NewDriveItem()
|
||||||
|
mockItem.SetId(&id)
|
||||||
|
mockItem.SetName(&name)
|
||||||
|
mockItem.SetSize(&size)
|
||||||
|
mockItem.SetCreatedDateTime(&now)
|
||||||
|
mockItem.SetLastModifiedDateTime(&now)
|
||||||
|
|
||||||
|
mockReader := func(v string, e error) itemReaderFunc {
|
||||||
|
return func(*http.Client, models.DriveItemable) (details.ItemInfo, io.ReadCloser, error) {
|
||||||
|
return details.ItemInfo{}, io.NopCloser(strings.NewReader(v)), e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mockGetter := func(e error) itemGetterFunc {
|
||||||
|
return func(context.Context, graph.Servicer, string, string) (models.DriveItemable, error) {
|
||||||
|
return mockItem, e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mockDataChan := func() chan data.Stream {
|
||||||
|
return make(chan data.Stream, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
coll *Collection
|
||||||
|
expectData string
|
||||||
|
errsIs func(*testing.T, error, int)
|
||||||
|
readErrIs func(*testing.T, error)
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "happy",
|
||||||
|
expectData: "happy",
|
||||||
|
coll: &Collection{
|
||||||
|
data: mockDataChan(),
|
||||||
|
itemReader: mockReader("happy", nil),
|
||||||
|
itemGetter: mockGetter(nil),
|
||||||
|
},
|
||||||
|
errsIs: func(t *testing.T, e error, count int) {
|
||||||
|
assert.NoError(t, e, "no errors")
|
||||||
|
assert.Zero(t, count, "zero errors")
|
||||||
|
},
|
||||||
|
readErrIs: func(t *testing.T, e error) {
|
||||||
|
assert.NoError(t, e, "no reader error")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "reader err",
|
||||||
|
expectData: "",
|
||||||
|
coll: &Collection{
|
||||||
|
data: mockDataChan(),
|
||||||
|
itemReader: mockReader("foo", assert.AnError),
|
||||||
|
itemGetter: mockGetter(nil),
|
||||||
|
},
|
||||||
|
errsIs: func(t *testing.T, e error, count int) {
|
||||||
|
assert.ErrorIs(t, e, assert.AnError)
|
||||||
|
assert.Equal(t, 1, count, "one errors")
|
||||||
|
},
|
||||||
|
readErrIs: func(t *testing.T, e error) {
|
||||||
|
assert.Error(t, e, "basic error")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "iteration err",
|
||||||
|
expectData: "",
|
||||||
|
coll: &Collection{
|
||||||
|
data: mockDataChan(),
|
||||||
|
itemReader: mockReader("foo", graph.Err401Unauthorized),
|
||||||
|
itemGetter: mockGetter(assert.AnError),
|
||||||
|
},
|
||||||
|
errsIs: func(t *testing.T, e error, count int) {
|
||||||
|
assert.True(t, graph.IsErrUnauthorized(e), "is unauthorized error")
|
||||||
|
assert.ErrorIs(t, e, graph.Err401Unauthorized)
|
||||||
|
assert.Equal(t, 1, count, "count of errors aggregated")
|
||||||
|
},
|
||||||
|
readErrIs: func(t *testing.T, e error) {
|
||||||
|
assert.True(t, graph.IsErrUnauthorized(e), "is unauthorized error")
|
||||||
|
assert.ErrorIs(t, e, graph.Err401Unauthorized)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "timeout errors",
|
||||||
|
expectData: "",
|
||||||
|
coll: &Collection{
|
||||||
|
data: mockDataChan(),
|
||||||
|
itemReader: mockReader("foo", context.DeadlineExceeded),
|
||||||
|
itemGetter: mockGetter(nil),
|
||||||
|
},
|
||||||
|
errsIs: func(t *testing.T, e error, count int) {
|
||||||
|
assert.True(t, graph.IsErrTimeout(e), "is timeout error")
|
||||||
|
assert.ErrorIs(t, e, context.DeadlineExceeded)
|
||||||
|
assert.Equal(t, 1, count, "one errors")
|
||||||
|
},
|
||||||
|
readErrIs: func(t *testing.T, e error) {
|
||||||
|
assert.True(t, graph.IsErrTimeout(e), "is timeout error")
|
||||||
|
assert.ErrorIs(t, e, context.DeadlineExceeded)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "internal server errors",
|
||||||
|
expectData: "",
|
||||||
|
coll: &Collection{
|
||||||
|
data: mockDataChan(),
|
||||||
|
itemReader: mockReader("foo", graph.Err500InternalServerError),
|
||||||
|
itemGetter: mockGetter(nil),
|
||||||
|
},
|
||||||
|
errsIs: func(t *testing.T, e error, count int) {
|
||||||
|
assert.True(t, graph.IsInternalServerError(e), "is internal server error")
|
||||||
|
assert.ErrorIs(t, e, graph.Err500InternalServerError)
|
||||||
|
assert.Equal(t, 1, count, "one errors")
|
||||||
|
},
|
||||||
|
readErrIs: func(t *testing.T, e error) {
|
||||||
|
assert.True(t, graph.IsInternalServerError(e), "is internal server error")
|
||||||
|
assert.ErrorIs(t, e, graph.Err500InternalServerError)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, test := range table {
|
||||||
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
|
ctx, flush := tester.NewContext()
|
||||||
|
defer flush()
|
||||||
|
|
||||||
|
var (
|
||||||
|
wg sync.WaitGroup
|
||||||
|
errs error
|
||||||
|
errCount int
|
||||||
|
size int64
|
||||||
|
|
||||||
|
countUpdater = func(sz, ds, itms, drd, ird int64) { size = sz }
|
||||||
|
errUpdater = func(s string, e error) {
|
||||||
|
errs = multierror.Append(errs, e)
|
||||||
|
errCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
semaphore = make(chan struct{}, 1)
|
||||||
|
progress = make(chan struct{}, 1)
|
||||||
|
)
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
semaphore <- struct{}{}
|
||||||
|
|
||||||
|
go test.coll.streamItem(
|
||||||
|
ctx,
|
||||||
|
&wg,
|
||||||
|
semaphore,
|
||||||
|
progress,
|
||||||
|
errUpdater,
|
||||||
|
countUpdater,
|
||||||
|
mockItem,
|
||||||
|
"parentPath",
|
||||||
|
)
|
||||||
|
|
||||||
|
// wait for the func to run
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
assert.Zero(t, len(semaphore), "semaphore was released")
|
||||||
|
assert.NotNil(t, <-progress, "progress was communicated")
|
||||||
|
assert.NotZero(t, size, "countUpdater was called")
|
||||||
|
|
||||||
|
data, ok := <-test.coll.data
|
||||||
|
assert.True(t, ok, "data channel survived")
|
||||||
|
|
||||||
|
bs, err := io.ReadAll(data.ToReader())
|
||||||
|
|
||||||
|
test.readErrIs(t, err)
|
||||||
|
test.errsIs(t, errs, errCount)
|
||||||
|
assert.Equal(t, test.expectData, string(bs), "streamed item bytes")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user