update items to accept ctx, fault.Errors (#2493)

## Description

In order for corso to track recoverable errors,
we need to pass a fault.Errors struct into the
items stream.  As long as we're doing that, we
might as well pass along the available ctx as well.

## Does this PR need a docs update or release note?

- [x]  No 

## Type of change

- [x] 🧹 Tech Debt/Cleanup

## Issue(s)

* #1970

## Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-02-16 12:09:20 -07:00 committed by GitHub
parent fffa633971
commit 28ad304bb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 151 additions and 107 deletions

View File

@ -109,7 +109,6 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection
control.Options{},
fault.New(true))
require.NoError(t, err)
assert.Empty(t, excludes)
for range collections {
@ -122,7 +121,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection
assert.GreaterOrEqual(t, 2, len(collections), "expected 1 <= num collections <= 2")
for _, col := range collections {
for object := range col.Items() {
for object := range col.Items(ctx, fault.New(true)) {
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(object.ToReader())
assert.NoError(t, err, "received a buf.Read error")
@ -273,7 +272,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti
assert.Less(t, test.expected, len(collections))
for _, coll := range collections {
for object := range coll.Items() {
for object := range coll.Items(ctx, fault.New(true)) {
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(object.ToReader())
assert.NoError(t, err, "reading item")
@ -377,7 +376,7 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) TestCreateShar
for _, collection := range cols {
t.Logf("Path: %s\n", collection.FullPath().String())
for item := range collection.Items() {
for item := range collection.Items(ctx, fault.New(true)) {
t.Log("File: " + item.UUID())
bs, err := io.ReadAll(item.ToReader())

View File

@ -64,6 +64,7 @@ type DeltaPath struct {
func parseMetadataCollections(
ctx context.Context,
colls []data.RestoreCollection,
errs *fault.Errors,
) (CatDeltaPaths, error) {
// cdp stores metadata
cdp := CatDeltaPaths{
@ -83,7 +84,7 @@ func parseMetadataCollections(
for _, coll := range colls {
var (
breakLoop bool
items = coll.Items()
items = coll.Items(ctx, errs)
category = coll.FullPath().Category()
)
@ -179,7 +180,7 @@ func DataCollections(
collections = []data.BackupCollection{}
)
cdps, err := parseMetadataCollections(ctx, metadata)
cdps, err := parseMetadataCollections(ctx, metadata, errs)
if err != nil {
return nil, nil, err
}

View File

@ -178,7 +178,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
cdps, err := parseMetadataCollections(ctx, []data.RestoreCollection{
data.NotFoundRestoreCollection{Collection: coll},
})
}, fault.New(true))
test.expectError(t, err)
emails := cdps[path.EmailCategory]
@ -353,7 +353,7 @@ func (suite *DataCollectionsIntegrationSuite) TestDelta() {
cdps, err := parseMetadataCollections(ctx, []data.RestoreCollection{
data.NotFoundRestoreCollection{Collection: metadata},
})
}, fault.New(true))
require.NoError(t, err)
dps := cdps[test.scope.Category().PathType()]
@ -422,7 +422,7 @@ func (suite *DataCollectionsIntegrationSuite) TestMailSerializationRegression()
for _, edc := range collections {
t.Run(edc.FullPath().String(), func(t *testing.T) {
isMetadata := edc.FullPath().Service() == path.ExchangeMetadataService
streamChannel := edc.Items()
streamChannel := edc.Items(ctx, fault.New(true))
// Verify that each message can be restored
for stream := range streamChannel {
@ -494,7 +494,7 @@ func (suite *DataCollectionsIntegrationSuite) TestContactSerializationRegression
isMetadata := edc.FullPath().Service() == path.ExchangeMetadataService
count := 0
for stream := range edc.Items() {
for stream := range edc.Items(ctx, fault.New(true)) {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)
@ -606,7 +606,7 @@ func (suite *DataCollectionsIntegrationSuite) TestEventsSerializationRegression(
assert.Equal(t, "", edc.FullPath().Folder(false))
}
for item := range edc.Items() {
for item := range edc.Items(ctx, fault.New(true)) {
buf := &bytes.Buffer{}
read, err := buf.ReadFrom(item.ToReader())

View File

@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"
"github.com/alcionai/clues"
"github.com/microsoft/kiota-abstractions-go/serialization"
"github.com/alcionai/corso/src/internal/connector/graph"
@ -126,8 +127,8 @@ func NewCollection(
// Items utility function to asynchronously execute process to fill data channel with
// M365 exchange objects and returns the data channel
func (col *Collection) Items() <-chan data.Stream {
go col.streamItems(context.TODO())
func (col *Collection) Items(ctx context.Context, errs *fault.Errors) <-chan data.Stream {
go col.streamItems(ctx, errs)
return col.data
}
@ -162,9 +163,8 @@ func (col Collection) DoNotMergeItems() bool {
// streamItems is a utility function that uses col.collectionType to be able to serialize
// all the M365IDs defined in the added field. data channel is closed by this function
func (col *Collection) streamItems(ctx context.Context) {
func (col *Collection) streamItems(ctx context.Context, errs *fault.Errors) {
var (
errs error
success int64
totalBytes int64
wg sync.WaitGroup
@ -177,7 +177,7 @@ func (col *Collection) streamItems(ctx context.Context) {
)
defer func() {
col.finishPopulation(ctx, int(success), totalBytes, errs)
col.finishPopulation(ctx, int(success), totalBytes, errs.Err())
}()
if len(col.added)+len(col.removed) > 0 {
@ -224,17 +224,9 @@ func (col *Collection) streamItems(ctx context.Context) {
}(id)
}
updaterMu := sync.Mutex{}
errUpdater := func(user string, err error) {
updaterMu.Lock()
defer updaterMu.Unlock()
errs = support.WrapAndAppend(user, err, errs)
}
// add any new items
for id := range col.added {
if col.ctrl.FailFast && errs != nil {
if errs.Err() != nil {
break
}
@ -246,13 +238,7 @@ func (col *Collection) streamItems(ctx context.Context) {
defer wg.Done()
defer func() { <-semaphoreCh }()
var (
item serialization.Parsable
info *details.ExchangeInfo
err error
)
item, info, err = getItemWithRetries(
item, info, err := getItemWithRetries(
ctx,
user,
id,
@ -265,9 +251,9 @@ func (col *Collection) streamItems(ctx context.Context) {
// investigation upset.
if graph.IsErrDeletedInFlight(err) {
atomic.AddInt64(&success, 1)
log.Infow("item not found", "err", err)
log.With("err", err).Infow("item not found", clues.InErr(err).Slice()...)
} else {
errUpdater(user, support.ConnectorStackErrorTraceWrap(err, "fetching item"))
errs.Add(clues.Wrap(err, "fetching item"))
}
return
@ -275,7 +261,7 @@ func (col *Collection) streamItems(ctx context.Context) {
data, err := col.items.Serialize(ctx, item, user, id)
if err != nil {
errUpdater(user, err)
errs.Add(clues.Wrap(err, "serializing item"))
return
}
@ -307,40 +293,24 @@ func getItemWithRetries(
items itemer,
errs *fault.Errors,
) (serialization.Parsable, *details.ExchangeInfo, error) {
var (
item serialization.Parsable
info *details.ExchangeInfo
err error
)
for i := 1; i <= numberOfRetries; i++ {
item, info, err = items.GetItem(ctx, userID, itemID, errs)
if err == nil {
break
}
// If the data is no longer available just return here and chalk it up
// as a success. There's no reason to retry; it's gone Let it go.
if graph.IsErrDeletedInFlight(err) {
return nil, nil, err
}
if i < numberOfRetries {
time.Sleep(time.Duration(3*(i+1)) * time.Second)
}
}
item, info, err := items.GetItem(ctx, userID, itemID, errs)
if err != nil {
return nil, nil, err
}
return item, info, err
return item, info, nil
}
// terminatePopulateSequence is a utility function used to close a Collection's data channel
// and to send the status update through the channel.
func (col *Collection) finishPopulation(ctx context.Context, success int, totalBytes int64, errs error) {
func (col *Collection) finishPopulation(
ctx context.Context,
success int,
totalBytes int64,
err error,
) {
close(col.data)
attempted := len(col.added) + len(col.removed)
status := support.CreateStatus(ctx,
support.Backup,
@ -350,9 +320,11 @@ func (col *Collection) finishPopulation(ctx context.Context, success int, totalB
Successes: success,
TotalBytes: totalBytes,
},
errs,
err,
col.fullPath.Folder(false))
logger.Ctx(ctx).Debugw("done streaming items", "status", status.String())
col.statusUpdater(status)
}

View File

@ -372,7 +372,7 @@ func restoreCollection(
var (
metrics support.CollectionMetrics
items = dc.Items()
items = dc.Items(ctx, errs)
directory = dc.FullPath()
service = directory.Service()
category = directory.Category()

View File

@ -10,6 +10,7 @@ import (
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -131,7 +132,10 @@ func (md MetadataCollection) DoNotMergeItems() bool {
return false
}
func (md MetadataCollection) Items() <-chan data.Stream {
func (md MetadataCollection) Items(
ctx context.Context,
errs *fault.Errors,
) <-chan data.Stream {
res := make(chan data.Stream)
go func() {
@ -142,7 +146,7 @@ func (md MetadataCollection) Items() <-chan data.Stream {
// statusUpdater may not have accounted for the fact that this collection
// will be running.
status := support.CreateStatus(
context.TODO(),
ctx,
support.Backup,
1,
support.CollectionMetrics{

View File

@ -11,6 +11,8 @@ import (
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -41,6 +43,9 @@ func (suite *MetadataCollectionUnitSuite) TestFullPath() {
}
func (suite *MetadataCollectionUnitSuite) TestItems() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
itemNames := []string{
@ -87,7 +92,7 @@ func (suite *MetadataCollectionUnitSuite) TestItems() {
gotData := [][]byte{}
gotNames := []string{}
for s := range c.Items() {
for s := range c.Items(ctx, fault.New(true)) {
gotNames = append(gotNames, s.UUID())
buf, err := io.ReadAll(s.ToReader())
@ -152,14 +157,16 @@ func (suite *MetadataCollectionUnitSuite) TestMakeMetadataCollection() {
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
ctx, flush := tester.NewContext()
defer flush()
col, err := MakeMetadataCollection(
tenant,
user,
test.service,
test.cat,
[]MetadataCollectionEntry{test.metadata},
func(*support.ConnectorOperationStatus) {},
)
func(*support.ConnectorOperationStatus) {})
test.errCheck(t, err)
if err != nil {
@ -172,7 +179,7 @@ func (suite *MetadataCollectionUnitSuite) TestMakeMetadataCollection() {
}
itemCount := 0
for item := range col.Items() {
for item := range col.Items(ctx, fault.New(true)) {
assert.Equal(t, test.metadata.fileName, item.UUID())
gotMap := map[string]string{}

View File

@ -800,13 +800,16 @@ func checkHasCollections(
assert.ElementsMatch(t, expectedNames, gotNames)
}
//revive:disable:context-as-argument
func checkCollections(
t *testing.T,
ctx context.Context,
expectedItems int,
expected map[string]map[string][]byte,
got []data.BackupCollection,
restorePermissions bool,
) int {
//revive:enable:context-as-argument
collectionsWithItems := []data.BackupCollection{}
skipped := 0
@ -821,7 +824,7 @@ func checkCollections(
// Need to iterate through all items even if we don't expect to find a match
// because otherwise we'll deadlock waiting for GC status. Unexpected or
// missing collection paths will be reported by checkHasCollections.
for item := range returned.Items() {
for item := range returned.Items(ctx, fault.New(true)) {
// Skip metadata collections as they aren't directly related to items to
// backup. Don't add them to the item count either since the item count
// is for actual pull items.

View File

@ -493,7 +493,7 @@ func runBackupAndCompare(
// Pull the data prior to waiting for the status as otherwise it will
// deadlock.
skipped := checkCollections(t, totalKopiaItems, expectedData, dcs, config.opts.RestorePermissions)
skipped := checkCollections(t, ctx, totalKopiaItems, expectedData, dcs, config.opts.RestorePermissions)
status := backupGC.AwaitStatus()
@ -998,7 +998,7 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames
// Pull the data prior to waiting for the status as otherwise it will
// deadlock.
skipped := checkCollections(t, allItems, allExpectedData, dcs, true)
skipped := checkCollections(t, ctx, allItems, allExpectedData, dcs, true)
status := backupGC.AwaitStatus()
assert.Equal(t, allItems+skipped, status.ObjectCount, "status.ObjectCount")

View File

@ -2,6 +2,7 @@ package mockconnector
import (
"bytes"
"context"
"io"
"math/rand"
"time"
@ -10,6 +11,7 @@ import (
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -107,7 +109,10 @@ func (medc MockExchangeDataCollection) DoNotMergeItems() bool { return med
// Items returns a channel that has the next items in the collection. The
// channel is closed when there are no more items available.
func (medc *MockExchangeDataCollection) Items() <-chan data.Stream {
func (medc *MockExchangeDataCollection) Items(
ctx context.Context,
_ *fault.Errors, // unused
) <-chan data.Stream {
res := make(chan data.Stream)
go func() {

View File

@ -14,6 +14,8 @@ import (
"github.com/alcionai/corso/src/internal/connector/mockconnector"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/fault"
)
type MockExchangeCollectionSuite struct {
@ -25,11 +27,13 @@ func TestMockExchangeCollectionSuite(t *testing.T) {
}
func (suite *MockExchangeCollectionSuite) TestMockExchangeCollection() {
mdc := mockconnector.NewMockExchangeCollection(nil, nil, 2)
ctx, flush := tester.NewContext()
defer flush()
mdc := mockconnector.NewMockExchangeCollection(nil, nil, 2)
messagesRead := 0
for item := range mdc.Items() {
for item := range mdc.Items(ctx, fault.New(true)) {
_, err := io.ReadAll(item.ToReader())
assert.NoError(suite.T(), err)
messagesRead++
@ -39,12 +43,14 @@ func (suite *MockExchangeCollectionSuite) TestMockExchangeCollection() {
}
func (suite *MockExchangeCollectionSuite) TestMockExchangeCollectionItemSize() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
mdc := mockconnector.NewMockExchangeCollection(nil, nil, 2)
mdc.Data[1] = []byte("This is some buffer of data so that the size is different than the default")
for item := range mdc.Items() {
for item := range mdc.Items(ctx, fault.New(true)) {
buf, err := io.ReadAll(item.ToReader())
assert.NoError(t, err)
@ -57,11 +63,14 @@ func (suite *MockExchangeCollectionSuite) TestMockExchangeCollectionItemSize() {
// NewExchangeCollectionMail_Hydration tests that mock exchange mail data collection can be used for restoration
// functions by verifying no failures on (de)serializing steps using kiota serialization library
func (suite *MockExchangeCollectionSuite) TestMockExchangeCollection_NewExchangeCollectionMail_Hydration() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
mdc := mockconnector.NewMockExchangeCollection(nil, nil, 3)
buf := &bytes.Buffer{}
for stream := range mdc.Items() {
for stream := range mdc.Items(ctx, fault.New(true)) {
_, err := buf.ReadFrom(stream.ToReader())
assert.NoError(t, err)

View File

@ -2,6 +2,7 @@ package mockconnector
import (
"bytes"
"context"
"io"
"testing"
@ -10,6 +11,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -44,7 +46,10 @@ func (mlc *MockListCollection) PreviousPath() path.Path {
return nil
}
func (mlc *MockListCollection) Items() <-chan data.Stream {
func (mlc *MockListCollection) Items(
ctx context.Context,
_ *fault.Errors, // unused
) <-chan data.Stream {
res := make(chan data.Stream)
go func() {

View File

@ -20,6 +20,7 @@ import (
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
)
@ -138,8 +139,11 @@ func (oc *Collection) Add(item models.DriveItemable) {
}
// Items() returns the channel containing M365 Exchange objects
func (oc *Collection) Items() <-chan data.Stream {
go oc.populateItems(context.Background())
func (oc *Collection) Items(
ctx context.Context,
errs *fault.Errors, // TODO: currently unused while onedrive isn't up to date with clues/fault
) <-chan data.Stream {
go oc.populateItems(ctx)
return oc.data
}

View File

@ -20,8 +20,10 @@ import (
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -150,6 +152,9 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
ctx, flush := tester.NewContext()
defer flush()
var (
wg = sync.WaitGroup{}
collStatus = support.ConnectorOperationStatus{}
@ -204,7 +209,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() {
// Read items from the collection
wg.Add(1)
for item := range coll.Items() {
for item := range coll.Items(ctx, fault.New(true)) {
readItems = append(readItems, item)
}
@ -284,9 +289,11 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() {
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
ctx, flush := tester.NewContext()
defer flush()
var (
testItemID = "fakeItemID"
collStatus = support.ConnectorOperationStatus{}
wg = sync.WaitGroup{}
)
@ -328,7 +335,7 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() {
return io.NopCloser(strings.NewReader(`{}`)), 2, nil
}
collItem, ok := <-coll.Items()
collItem, ok := <-coll.Items(ctx, fault.New(true))
assert.True(t, ok)
_, err = io.ReadAll(collItem.ToReader())
@ -355,11 +362,13 @@ func (suite *CollectionUnitTestSuite) TestCollectionDisablePermissionsBackup() {
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
ctx, flush := tester.NewContext()
defer flush()
var (
testItemID = "fakeItemID"
testItemName = "Fake Item"
testItemSize = int64(10)
collStatus = support.ConnectorOperationStatus{}
wg = sync.WaitGroup{}
)
@ -408,7 +417,7 @@ func (suite *CollectionUnitTestSuite) TestCollectionDisablePermissionsBackup() {
}
readItems := []data.Stream{}
for item := range coll.Items() {
for item := range coll.Items(ctx, fault.New(true)) {
readItems = append(readItems, item)
}
@ -442,6 +451,9 @@ func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTim
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
ctx, flush := tester.NewContext()
defer flush()
var (
testItemID = "fakeItemID"
testItemName = "Fake Item"
@ -495,7 +507,7 @@ func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTim
}
readItems := []data.Stream{}
for item := range coll.Items() {
for item := range coll.Items(ctx, fault.New(true)) {
readItems = append(readItems, item)
}

View File

@ -128,7 +128,7 @@ func deserializeMetadata(
prevFolders := map[string]map[string]string{}
for _, col := range cols {
items := col.Items()
items := col.Items(ctx, nil) // TODO: fault.Errors instead of nil
for breakLoop := false; !breakLoop; {
select {

View File

@ -242,7 +242,7 @@ func RestoreCollection(
}
// Restore items from the collection
items := dc.Items()
items := dc.Items(ctx, nil) // TODO: fault.Errors instead of nil
for {
select {

View File

@ -19,6 +19,7 @@ import (
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
)
@ -109,8 +110,11 @@ func (sc Collection) DoNotMergeItems() bool {
return false
}
func (sc *Collection) Items() <-chan data.Stream {
go sc.populate(context.TODO())
func (sc *Collection) Items(
ctx context.Context,
errs *fault.Errors,
) <-chan data.Stream {
go sc.populate(ctx)
return sc.data
}

View File

@ -20,6 +20,7 @@ import (
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -147,12 +148,15 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() {
for _, test := range tables {
t.Run(test.name, func(t *testing.T) {
ctx, flush := tester.NewContext()
defer flush()
col := NewCollection(test.getDir(t), nil, test.category, nil, control.Defaults())
col.data <- test.getItem(t, test.itemName)
readItems := []data.Stream{}
for item := range col.Items() {
for item := range col.Items(ctx, fault.New(true)) {
readItems = append(readItems, item)
}

View File

@ -236,7 +236,7 @@ func RestoreListCollection(
siteID := directory.ResourceOwner()
// Restore items from the collection
items := dc.Items()
items := dc.Items(ctx, nil) // TODO: fault.Errors instead of nil
for {
select {
@ -316,7 +316,7 @@ func RestorePageCollection(
siteID := directory.ResourceOwner()
// Restore items from collection
items := dc.Items()
items := dc.Items(ctx, nil) // TODO: fault.Errors instead of nil
for {
select {

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -32,7 +33,7 @@ type Collection interface {
// Each returned struct contains the next item in the collection
// The channel is closed when there are no more items in the collection or if
// an unrecoverable error caused an early termination in the sender.
Items() <-chan Stream
Items(ctx context.Context, errs *fault.Errors) <-chan Stream
// FullPath returns a path struct that acts as a metadata tag for this
// Collection.
FullPath() path.Path

View File

@ -8,6 +8,7 @@ import (
"github.com/kopia/kopia/fs"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -23,7 +24,10 @@ type kopiaDataCollection struct {
counter ByteCounter
}
func (kdc *kopiaDataCollection) Items() <-chan data.Stream {
func (kdc *kopiaDataCollection) Items(
ctx context.Context,
errs *fault.Errors,
) <-chan data.Stream {
res := make(chan data.Stream)
go func() {

View File

@ -16,6 +16,7 @@ import (
"github.com/alcionai/corso/src/internal/connector/mockconnector"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
@ -94,6 +95,9 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() {
for _, test := range table {
suite.Run(test.name, func() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
c := kopiaDataCollection{
@ -102,7 +106,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() {
}
count := 0
for returnedStream := range c.Items() {
for returnedStream := range c.Items(ctx, fault.New(true)) {
require.Less(t, count, len(test.streams))
assert.Equal(t, returnedStream.UUID(), uuids[count])

View File

@ -286,7 +286,7 @@ func collectionEntries(
// Track which items have already been seen so we can skip them if we see
// them again in the data from the base snapshot.
seen = map[string]struct{}{}
items = streamedEnts.Items()
items = streamedEnts.Items(ctx, progress.errs)
)
if lp, ok := streamedEnts.(data.LocationPather); ok {

View File

@ -52,17 +52,20 @@ var (
testFileData6 = testFileData
)
//revive:disable:context-as-argument
func testForFiles(
t *testing.T,
ctx context.Context,
expected map[string][]byte,
collections []data.RestoreCollection,
) {
//revive:enable:context-as-argument
t.Helper()
count := 0
for _, c := range collections {
for s := range c.Items() {
for s := range c.Items(ctx, fault.New(true)) {
count++
fullPath, err := c.FullPath().Append(s.UUID(), true)
@ -383,7 +386,7 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() {
require.NoError(t, err)
assert.Equal(t, 2, len(result))
testForFiles(t, expected, result)
testForFiles(t, ctx, expected, result)
}
type mockBackupCollection struct {
@ -391,7 +394,7 @@ type mockBackupCollection struct {
streams []data.Stream
}
func (c *mockBackupCollection) Items() <-chan data.Stream {
func (c *mockBackupCollection) Items(context.Context, *fault.Errors) <-chan data.Stream {
res := make(chan data.Stream)
go func() {
@ -926,6 +929,9 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() {
for _, test := range table {
suite.Run(test.name, func() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
// May slightly overallocate as only items that are actually in our map
@ -958,7 +964,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() {
assert.Len(t, result, test.expectedCollections)
assert.Less(t, int64(0), ic.i)
testForFiles(t, expected, result)
testForFiles(t, ctx, expected, result)
})
}
}

View File

@ -257,7 +257,7 @@ func checkMetadataFilesExist(
for _, col := range cols {
itemNames := []string{}
for item := range col.Items() {
for item := range col.Items(ctx, fault.New(true)) {
assert.Implements(t, (*data.StreamSize)(nil), item)
s := item.(data.StreamSize)

View File

@ -53,7 +53,7 @@ type mockColl struct {
p path.Path
}
func (mc mockColl) Items() <-chan data.Stream {
func (mc mockColl) Items(context.Context, *fault.Errors) <-chan data.Stream {
return nil
}

View File

@ -129,7 +129,7 @@ func (ss *streamStore) ReadBackupDetails(
var d details.Details
found := false
items := dc.Items()
items := dc.Items(ctx, errs)
for {
select {
@ -195,7 +195,7 @@ func (dc *streamCollection) DoNotMergeItems() bool {
// Items() always returns a channel with a single data.Stream
// representing the object to be persisted
func (dc *streamCollection) Items() <-chan data.Stream {
func (dc *streamCollection) Items(context.Context, *fault.Errors) <-chan data.Stream {
items := make(chan data.Stream, 1)
defer close(items)
items <- dc.item