From 28ad304bb786f35d09fd6e142866b8c3599d0149 Mon Sep 17 00:00:00 2001 From: Keepers Date: Thu, 16 Feb 2023 12:09:20 -0700 Subject: [PATCH] update items to accept ctx, fault.Errors (#2493) ## Description In order for corso to track recoverable errors, we need to pass a fault.Errors struct into the items stream. As long as we're doing that, we might as well pass along the available ctx as well. ## Does this PR need a docs update or release note? - [x] :no_entry: No ## Type of change - [x] :broom: Tech Debt/Cleanup ## Issue(s) * #1970 ## Test Plan - [x] :zap: Unit test - [x] :green_heart: E2E --- .../connector/data_collections_test.go | 7 +- .../connector/exchange/data_collections.go | 5 +- .../exchange/data_collections_test.go | 10 +-- .../exchange/exchange_data_collection.go | 72 ++++++------------- .../connector/exchange/service_restore.go | 2 +- .../connector/graph/metadata_collection.go | 8 ++- .../graph/metadata_collection_test.go | 15 ++-- .../connector/graph_connector_helper_test.go | 5 +- .../connector/graph_connector_test.go | 4 +- .../mockconnector/mock_data_collection.go | 7 +- .../mock_data_collection_test.go | 19 +++-- .../connector/mockconnector/mock_data_list.go | 7 +- src/internal/connector/onedrive/collection.go | 8 ++- .../connector/onedrive/collection_test.go | 28 +++++--- .../connector/onedrive/collections.go | 2 +- src/internal/connector/onedrive/restore.go | 2 +- .../connector/sharepoint/collection.go | 8 ++- .../connector/sharepoint/collection_test.go | 6 +- src/internal/connector/sharepoint/restore.go | 4 +- src/internal/data/data_collection.go | 3 +- src/internal/kopia/data_collection.go | 6 +- src/internal/kopia/data_collection_test.go | 6 +- src/internal/kopia/upload.go | 2 +- src/internal/kopia/wrapper_test.go | 14 ++-- .../operations/backup_integration_test.go | 2 +- src/internal/operations/manifests_test.go | 2 +- src/internal/streamstore/streamstore.go | 4 +- 27 files changed, 151 insertions(+), 107 deletions(-) diff --git a/src/internal/connector/data_collections_test.go b/src/internal/connector/data_collections_test.go index c2038f006..475365700 100644 --- a/src/internal/connector/data_collections_test.go +++ b/src/internal/connector/data_collections_test.go @@ -109,7 +109,6 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection control.Options{}, fault.New(true)) require.NoError(t, err) - assert.Empty(t, excludes) for range collections { @@ -122,7 +121,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection assert.GreaterOrEqual(t, 2, len(collections), "expected 1 <= num collections <= 2") for _, col := range collections { - for object := range col.Items() { + for object := range col.Items(ctx, fault.New(true)) { buf := &bytes.Buffer{} _, err := buf.ReadFrom(object.ToReader()) assert.NoError(t, err, "received a buf.Read error") @@ -273,7 +272,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti assert.Less(t, test.expected, len(collections)) for _, coll := range collections { - for object := range coll.Items() { + for object := range coll.Items(ctx, fault.New(true)) { buf := &bytes.Buffer{} _, err := buf.ReadFrom(object.ToReader()) assert.NoError(t, err, "reading item") @@ -377,7 +376,7 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) TestCreateShar for _, collection := range cols { t.Logf("Path: %s\n", collection.FullPath().String()) - for item := range collection.Items() { + for item := range collection.Items(ctx, fault.New(true)) { t.Log("File: " + item.UUID()) bs, err := io.ReadAll(item.ToReader()) diff --git a/src/internal/connector/exchange/data_collections.go b/src/internal/connector/exchange/data_collections.go index fcfa8ab8a..5becad190 100644 --- a/src/internal/connector/exchange/data_collections.go +++ b/src/internal/connector/exchange/data_collections.go @@ -64,6 +64,7 @@ type DeltaPath struct { func parseMetadataCollections( ctx context.Context, colls []data.RestoreCollection, + errs *fault.Errors, ) (CatDeltaPaths, error) { // cdp stores metadata cdp := CatDeltaPaths{ @@ -83,7 +84,7 @@ func parseMetadataCollections( for _, coll := range colls { var ( breakLoop bool - items = coll.Items() + items = coll.Items(ctx, errs) category = coll.FullPath().Category() ) @@ -179,7 +180,7 @@ func DataCollections( collections = []data.BackupCollection{} ) - cdps, err := parseMetadataCollections(ctx, metadata) + cdps, err := parseMetadataCollections(ctx, metadata, errs) if err != nil { return nil, nil, err } diff --git a/src/internal/connector/exchange/data_collections_test.go b/src/internal/connector/exchange/data_collections_test.go index 7b8674b58..c4920eef2 100644 --- a/src/internal/connector/exchange/data_collections_test.go +++ b/src/internal/connector/exchange/data_collections_test.go @@ -178,7 +178,7 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { cdps, err := parseMetadataCollections(ctx, []data.RestoreCollection{ data.NotFoundRestoreCollection{Collection: coll}, - }) + }, fault.New(true)) test.expectError(t, err) emails := cdps[path.EmailCategory] @@ -353,7 +353,7 @@ func (suite *DataCollectionsIntegrationSuite) TestDelta() { cdps, err := parseMetadataCollections(ctx, []data.RestoreCollection{ data.NotFoundRestoreCollection{Collection: metadata}, - }) + }, fault.New(true)) require.NoError(t, err) dps := cdps[test.scope.Category().PathType()] @@ -422,7 +422,7 @@ func (suite *DataCollectionsIntegrationSuite) TestMailSerializationRegression() for _, edc := range collections { t.Run(edc.FullPath().String(), func(t *testing.T) { isMetadata := edc.FullPath().Service() == path.ExchangeMetadataService - streamChannel := edc.Items() + streamChannel := edc.Items(ctx, fault.New(true)) // Verify that each message can be restored for stream := range streamChannel { @@ -494,7 +494,7 @@ func (suite *DataCollectionsIntegrationSuite) TestContactSerializationRegression isMetadata := edc.FullPath().Service() == path.ExchangeMetadataService count := 0 - for stream := range edc.Items() { + for stream := range edc.Items(ctx, fault.New(true)) { buf := &bytes.Buffer{} read, err := buf.ReadFrom(stream.ToReader()) assert.NoError(t, err) @@ -606,7 +606,7 @@ func (suite *DataCollectionsIntegrationSuite) TestEventsSerializationRegression( assert.Equal(t, "", edc.FullPath().Folder(false)) } - for item := range edc.Items() { + for item := range edc.Items(ctx, fault.New(true)) { buf := &bytes.Buffer{} read, err := buf.ReadFrom(item.ToReader()) diff --git a/src/internal/connector/exchange/exchange_data_collection.go b/src/internal/connector/exchange/exchange_data_collection.go index a7d07ae66..4caec431a 100644 --- a/src/internal/connector/exchange/exchange_data_collection.go +++ b/src/internal/connector/exchange/exchange_data_collection.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "github.com/alcionai/clues" "github.com/microsoft/kiota-abstractions-go/serialization" "github.com/alcionai/corso/src/internal/connector/graph" @@ -126,8 +127,8 @@ func NewCollection( // Items utility function to asynchronously execute process to fill data channel with // M365 exchange objects and returns the data channel -func (col *Collection) Items() <-chan data.Stream { - go col.streamItems(context.TODO()) +func (col *Collection) Items(ctx context.Context, errs *fault.Errors) <-chan data.Stream { + go col.streamItems(ctx, errs) return col.data } @@ -162,9 +163,8 @@ func (col Collection) DoNotMergeItems() bool { // streamItems is a utility function that uses col.collectionType to be able to serialize // all the M365IDs defined in the added field. data channel is closed by this function -func (col *Collection) streamItems(ctx context.Context) { +func (col *Collection) streamItems(ctx context.Context, errs *fault.Errors) { var ( - errs error success int64 totalBytes int64 wg sync.WaitGroup @@ -177,7 +177,7 @@ func (col *Collection) streamItems(ctx context.Context) { ) defer func() { - col.finishPopulation(ctx, int(success), totalBytes, errs) + col.finishPopulation(ctx, int(success), totalBytes, errs.Err()) }() if len(col.added)+len(col.removed) > 0 { @@ -224,17 +224,9 @@ func (col *Collection) streamItems(ctx context.Context) { }(id) } - updaterMu := sync.Mutex{} - errUpdater := func(user string, err error) { - updaterMu.Lock() - defer updaterMu.Unlock() - - errs = support.WrapAndAppend(user, err, errs) - } - // add any new items for id := range col.added { - if col.ctrl.FailFast && errs != nil { + if errs.Err() != nil { break } @@ -246,13 +238,7 @@ func (col *Collection) streamItems(ctx context.Context) { defer wg.Done() defer func() { <-semaphoreCh }() - var ( - item serialization.Parsable - info *details.ExchangeInfo - err error - ) - - item, info, err = getItemWithRetries( + item, info, err := getItemWithRetries( ctx, user, id, @@ -265,9 +251,9 @@ func (col *Collection) streamItems(ctx context.Context) { // investigation upset. if graph.IsErrDeletedInFlight(err) { atomic.AddInt64(&success, 1) - log.Infow("item not found", "err", err) + log.With("err", err).Infow("item not found", clues.InErr(err).Slice()...) } else { - errUpdater(user, support.ConnectorStackErrorTraceWrap(err, "fetching item")) + errs.Add(clues.Wrap(err, "fetching item")) } return @@ -275,7 +261,7 @@ func (col *Collection) streamItems(ctx context.Context) { data, err := col.items.Serialize(ctx, item, user, id) if err != nil { - errUpdater(user, err) + errs.Add(clues.Wrap(err, "serializing item")) return } @@ -307,40 +293,24 @@ func getItemWithRetries( items itemer, errs *fault.Errors, ) (serialization.Parsable, *details.ExchangeInfo, error) { - var ( - item serialization.Parsable - info *details.ExchangeInfo - err error - ) - - for i := 1; i <= numberOfRetries; i++ { - item, info, err = items.GetItem(ctx, userID, itemID, errs) - if err == nil { - break - } - - // If the data is no longer available just return here and chalk it up - // as a success. There's no reason to retry; it's gone Let it go. - if graph.IsErrDeletedInFlight(err) { - return nil, nil, err - } - - if i < numberOfRetries { - time.Sleep(time.Duration(3*(i+1)) * time.Second) - } - } - + item, info, err := items.GetItem(ctx, userID, itemID, errs) if err != nil { return nil, nil, err } - return item, info, err + return item, info, nil } // terminatePopulateSequence is a utility function used to close a Collection's data channel // and to send the status update through the channel. -func (col *Collection) finishPopulation(ctx context.Context, success int, totalBytes int64, errs error) { +func (col *Collection) finishPopulation( + ctx context.Context, + success int, + totalBytes int64, + err error, +) { close(col.data) + attempted := len(col.added) + len(col.removed) status := support.CreateStatus(ctx, support.Backup, @@ -350,9 +320,11 @@ func (col *Collection) finishPopulation(ctx context.Context, success int, totalB Successes: success, TotalBytes: totalBytes, }, - errs, + err, col.fullPath.Folder(false)) + logger.Ctx(ctx).Debugw("done streaming items", "status", status.String()) + col.statusUpdater(status) } diff --git a/src/internal/connector/exchange/service_restore.go b/src/internal/connector/exchange/service_restore.go index 3d11d358f..67c9494fb 100644 --- a/src/internal/connector/exchange/service_restore.go +++ b/src/internal/connector/exchange/service_restore.go @@ -372,7 +372,7 @@ func restoreCollection( var ( metrics support.CollectionMetrics - items = dc.Items() + items = dc.Items(ctx, errs) directory = dc.FullPath() service = directory.Service() category = directory.Category() diff --git a/src/internal/connector/graph/metadata_collection.go b/src/internal/connector/graph/metadata_collection.go index 0274b082c..c705eb4ce 100644 --- a/src/internal/connector/graph/metadata_collection.go +++ b/src/internal/connector/graph/metadata_collection.go @@ -10,6 +10,7 @@ import ( "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -131,7 +132,10 @@ func (md MetadataCollection) DoNotMergeItems() bool { return false } -func (md MetadataCollection) Items() <-chan data.Stream { +func (md MetadataCollection) Items( + ctx context.Context, + errs *fault.Errors, +) <-chan data.Stream { res := make(chan data.Stream) go func() { @@ -142,7 +146,7 @@ func (md MetadataCollection) Items() <-chan data.Stream { // statusUpdater may not have accounted for the fact that this collection // will be running. status := support.CreateStatus( - context.TODO(), + ctx, support.Backup, 1, support.CollectionMetrics{ diff --git a/src/internal/connector/graph/metadata_collection_test.go b/src/internal/connector/graph/metadata_collection_test.go index 246542d4e..5067b3247 100644 --- a/src/internal/connector/graph/metadata_collection_test.go +++ b/src/internal/connector/graph/metadata_collection_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -41,6 +43,9 @@ func (suite *MetadataCollectionUnitSuite) TestFullPath() { } func (suite *MetadataCollectionUnitSuite) TestItems() { + ctx, flush := tester.NewContext() + defer flush() + t := suite.T() itemNames := []string{ @@ -87,7 +92,7 @@ func (suite *MetadataCollectionUnitSuite) TestItems() { gotData := [][]byte{} gotNames := []string{} - for s := range c.Items() { + for s := range c.Items(ctx, fault.New(true)) { gotNames = append(gotNames, s.UUID()) buf, err := io.ReadAll(s.ToReader()) @@ -152,14 +157,16 @@ func (suite *MetadataCollectionUnitSuite) TestMakeMetadataCollection() { for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { + ctx, flush := tester.NewContext() + defer flush() + col, err := MakeMetadataCollection( tenant, user, test.service, test.cat, []MetadataCollectionEntry{test.metadata}, - func(*support.ConnectorOperationStatus) {}, - ) + func(*support.ConnectorOperationStatus) {}) test.errCheck(t, err) if err != nil { @@ -172,7 +179,7 @@ func (suite *MetadataCollectionUnitSuite) TestMakeMetadataCollection() { } itemCount := 0 - for item := range col.Items() { + for item := range col.Items(ctx, fault.New(true)) { assert.Equal(t, test.metadata.fileName, item.UUID()) gotMap := map[string]string{} diff --git a/src/internal/connector/graph_connector_helper_test.go b/src/internal/connector/graph_connector_helper_test.go index d0ce306fe..82fc38e59 100644 --- a/src/internal/connector/graph_connector_helper_test.go +++ b/src/internal/connector/graph_connector_helper_test.go @@ -800,13 +800,16 @@ func checkHasCollections( assert.ElementsMatch(t, expectedNames, gotNames) } +//revive:disable:context-as-argument func checkCollections( t *testing.T, + ctx context.Context, expectedItems int, expected map[string]map[string][]byte, got []data.BackupCollection, restorePermissions bool, ) int { + //revive:enable:context-as-argument collectionsWithItems := []data.BackupCollection{} skipped := 0 @@ -821,7 +824,7 @@ func checkCollections( // Need to iterate through all items even if we don't expect to find a match // because otherwise we'll deadlock waiting for GC status. Unexpected or // missing collection paths will be reported by checkHasCollections. - for item := range returned.Items() { + for item := range returned.Items(ctx, fault.New(true)) { // Skip metadata collections as they aren't directly related to items to // backup. Don't add them to the item count either since the item count // is for actual pull items. diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index f263d0f42..49adbcb14 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -493,7 +493,7 @@ func runBackupAndCompare( // Pull the data prior to waiting for the status as otherwise it will // deadlock. - skipped := checkCollections(t, totalKopiaItems, expectedData, dcs, config.opts.RestorePermissions) + skipped := checkCollections(t, ctx, totalKopiaItems, expectedData, dcs, config.opts.RestorePermissions) status := backupGC.AwaitStatus() @@ -998,7 +998,7 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames // Pull the data prior to waiting for the status as otherwise it will // deadlock. - skipped := checkCollections(t, allItems, allExpectedData, dcs, true) + skipped := checkCollections(t, ctx, allItems, allExpectedData, dcs, true) status := backupGC.AwaitStatus() assert.Equal(t, allItems+skipped, status.ObjectCount, "status.ObjectCount") diff --git a/src/internal/connector/mockconnector/mock_data_collection.go b/src/internal/connector/mockconnector/mock_data_collection.go index 53d663f08..4718c2a29 100644 --- a/src/internal/connector/mockconnector/mock_data_collection.go +++ b/src/internal/connector/mockconnector/mock_data_collection.go @@ -2,6 +2,7 @@ package mockconnector import ( "bytes" + "context" "io" "math/rand" "time" @@ -10,6 +11,7 @@ import ( "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -107,7 +109,10 @@ func (medc MockExchangeDataCollection) DoNotMergeItems() bool { return med // Items returns a channel that has the next items in the collection. The // channel is closed when there are no more items available. -func (medc *MockExchangeDataCollection) Items() <-chan data.Stream { +func (medc *MockExchangeDataCollection) Items( + ctx context.Context, + _ *fault.Errors, // unused +) <-chan data.Stream { res := make(chan data.Stream) go func() { diff --git a/src/internal/connector/mockconnector/mock_data_collection_test.go b/src/internal/connector/mockconnector/mock_data_collection_test.go index 1fcf3e652..5cbe4b315 100644 --- a/src/internal/connector/mockconnector/mock_data_collection_test.go +++ b/src/internal/connector/mockconnector/mock_data_collection_test.go @@ -14,6 +14,8 @@ import ( "github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/fault" ) type MockExchangeCollectionSuite struct { @@ -25,11 +27,13 @@ func TestMockExchangeCollectionSuite(t *testing.T) { } func (suite *MockExchangeCollectionSuite) TestMockExchangeCollection() { - mdc := mockconnector.NewMockExchangeCollection(nil, nil, 2) + ctx, flush := tester.NewContext() + defer flush() + mdc := mockconnector.NewMockExchangeCollection(nil, nil, 2) messagesRead := 0 - for item := range mdc.Items() { + for item := range mdc.Items(ctx, fault.New(true)) { _, err := io.ReadAll(item.ToReader()) assert.NoError(suite.T(), err) messagesRead++ @@ -39,12 +43,14 @@ func (suite *MockExchangeCollectionSuite) TestMockExchangeCollection() { } func (suite *MockExchangeCollectionSuite) TestMockExchangeCollectionItemSize() { + ctx, flush := tester.NewContext() + defer flush() + t := suite.T() mdc := mockconnector.NewMockExchangeCollection(nil, nil, 2) - mdc.Data[1] = []byte("This is some buffer of data so that the size is different than the default") - for item := range mdc.Items() { + for item := range mdc.Items(ctx, fault.New(true)) { buf, err := io.ReadAll(item.ToReader()) assert.NoError(t, err) @@ -57,11 +63,14 @@ func (suite *MockExchangeCollectionSuite) TestMockExchangeCollectionItemSize() { // NewExchangeCollectionMail_Hydration tests that mock exchange mail data collection can be used for restoration // functions by verifying no failures on (de)serializing steps using kiota serialization library func (suite *MockExchangeCollectionSuite) TestMockExchangeCollection_NewExchangeCollectionMail_Hydration() { + ctx, flush := tester.NewContext() + defer flush() + t := suite.T() mdc := mockconnector.NewMockExchangeCollection(nil, nil, 3) buf := &bytes.Buffer{} - for stream := range mdc.Items() { + for stream := range mdc.Items(ctx, fault.New(true)) { _, err := buf.ReadFrom(stream.ToReader()) assert.NoError(t, err) diff --git a/src/internal/connector/mockconnector/mock_data_list.go b/src/internal/connector/mockconnector/mock_data_list.go index 78d37105b..8fe9d06ab 100644 --- a/src/internal/connector/mockconnector/mock_data_list.go +++ b/src/internal/connector/mockconnector/mock_data_list.go @@ -2,6 +2,7 @@ package mockconnector import ( "bytes" + "context" "io" "testing" @@ -10,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -44,7 +46,10 @@ func (mlc *MockListCollection) PreviousPath() path.Path { return nil } -func (mlc *MockListCollection) Items() <-chan data.Stream { +func (mlc *MockListCollection) Items( + ctx context.Context, + _ *fault.Errors, // unused +) <-chan data.Stream { res := make(chan data.Stream) go func() { diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 3538c8dd2..5f5884599 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -20,6 +20,7 @@ import ( "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" ) @@ -138,8 +139,11 @@ func (oc *Collection) Add(item models.DriveItemable) { } // Items() returns the channel containing M365 Exchange objects -func (oc *Collection) Items() <-chan data.Stream { - go oc.populateItems(context.Background()) +func (oc *Collection) Items( + ctx context.Context, + errs *fault.Errors, // TODO: currently unused while onedrive isn't up to date with clues/fault +) <-chan data.Stream { + go oc.populateItems(ctx) return oc.data } diff --git a/src/internal/connector/onedrive/collection_test.go b/src/internal/connector/onedrive/collection_test.go index 5379a53e2..5d843830f 100644 --- a/src/internal/connector/onedrive/collection_test.go +++ b/src/internal/connector/onedrive/collection_test.go @@ -20,8 +20,10 @@ import ( "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -150,6 +152,9 @@ func (suite *CollectionUnitTestSuite) TestCollection() { } for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { + ctx, flush := tester.NewContext() + defer flush() + var ( wg = sync.WaitGroup{} collStatus = support.ConnectorOperationStatus{} @@ -204,7 +209,7 @@ func (suite *CollectionUnitTestSuite) TestCollection() { // Read items from the collection wg.Add(1) - for item := range coll.Items() { + for item := range coll.Items(ctx, fault.New(true)) { readItems = append(readItems, item) } @@ -284,9 +289,11 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() { } for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { + ctx, flush := tester.NewContext() + defer flush() + var ( testItemID = "fakeItemID" - collStatus = support.ConnectorOperationStatus{} wg = sync.WaitGroup{} ) @@ -328,7 +335,7 @@ func (suite *CollectionUnitTestSuite) TestCollectionReadError() { return io.NopCloser(strings.NewReader(`{}`)), 2, nil } - collItem, ok := <-coll.Items() + collItem, ok := <-coll.Items(ctx, fault.New(true)) assert.True(t, ok) _, err = io.ReadAll(collItem.ToReader()) @@ -355,13 +362,15 @@ func (suite *CollectionUnitTestSuite) TestCollectionDisablePermissionsBackup() { } for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { + ctx, flush := tester.NewContext() + defer flush() + var ( testItemID = "fakeItemID" testItemName = "Fake Item" testItemSize = int64(10) - - collStatus = support.ConnectorOperationStatus{} - wg = sync.WaitGroup{} + collStatus = support.ConnectorOperationStatus{} + wg = sync.WaitGroup{} ) wg.Add(1) @@ -408,7 +417,7 @@ func (suite *CollectionUnitTestSuite) TestCollectionDisablePermissionsBackup() { } readItems := []data.Stream{} - for item := range coll.Items() { + for item := range coll.Items(ctx, fault.New(true)) { readItems = append(readItems, item) } @@ -442,6 +451,9 @@ func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTim } for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { + ctx, flush := tester.NewContext() + defer flush() + var ( testItemID = "fakeItemID" testItemName = "Fake Item" @@ -495,7 +507,7 @@ func (suite *CollectionUnitTestSuite) TestCollectionPermissionBackupLatestModTim } readItems := []data.Stream{} - for item := range coll.Items() { + for item := range coll.Items(ctx, fault.New(true)) { readItems = append(readItems, item) } diff --git a/src/internal/connector/onedrive/collections.go b/src/internal/connector/onedrive/collections.go index 773e56087..279993edf 100644 --- a/src/internal/connector/onedrive/collections.go +++ b/src/internal/connector/onedrive/collections.go @@ -128,7 +128,7 @@ func deserializeMetadata( prevFolders := map[string]map[string]string{} for _, col := range cols { - items := col.Items() + items := col.Items(ctx, nil) // TODO: fault.Errors instead of nil for breakLoop := false; !breakLoop; { select { diff --git a/src/internal/connector/onedrive/restore.go b/src/internal/connector/onedrive/restore.go index 53c64a999..4df57798a 100644 --- a/src/internal/connector/onedrive/restore.go +++ b/src/internal/connector/onedrive/restore.go @@ -242,7 +242,7 @@ func RestoreCollection( } // Restore items from the collection - items := dc.Items() + items := dc.Items(ctx, nil) // TODO: fault.Errors instead of nil for { select { diff --git a/src/internal/connector/sharepoint/collection.go b/src/internal/connector/sharepoint/collection.go index d4150fec0..ca0e7563e 100644 --- a/src/internal/connector/sharepoint/collection.go +++ b/src/internal/connector/sharepoint/collection.go @@ -19,6 +19,7 @@ import ( "github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" ) @@ -109,8 +110,11 @@ func (sc Collection) DoNotMergeItems() bool { return false } -func (sc *Collection) Items() <-chan data.Stream { - go sc.populate(context.TODO()) +func (sc *Collection) Items( + ctx context.Context, + errs *fault.Errors, +) <-chan data.Stream { + go sc.populate(ctx) return sc.data } diff --git a/src/internal/connector/sharepoint/collection_test.go b/src/internal/connector/sharepoint/collection_test.go index 2f2a2c472..f0a3ed4da 100644 --- a/src/internal/connector/sharepoint/collection_test.go +++ b/src/internal/connector/sharepoint/collection_test.go @@ -20,6 +20,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -147,12 +148,15 @@ func (suite *SharePointCollectionSuite) TestCollection_Items() { for _, test := range tables { t.Run(test.name, func(t *testing.T) { + ctx, flush := tester.NewContext() + defer flush() + col := NewCollection(test.getDir(t), nil, test.category, nil, control.Defaults()) col.data <- test.getItem(t, test.itemName) readItems := []data.Stream{} - for item := range col.Items() { + for item := range col.Items(ctx, fault.New(true)) { readItems = append(readItems, item) } diff --git a/src/internal/connector/sharepoint/restore.go b/src/internal/connector/sharepoint/restore.go index bb257ec6b..15ab902ac 100644 --- a/src/internal/connector/sharepoint/restore.go +++ b/src/internal/connector/sharepoint/restore.go @@ -236,7 +236,7 @@ func RestoreListCollection( siteID := directory.ResourceOwner() // Restore items from the collection - items := dc.Items() + items := dc.Items(ctx, nil) // TODO: fault.Errors instead of nil for { select { @@ -316,7 +316,7 @@ func RestorePageCollection( siteID := directory.ResourceOwner() // Restore items from collection - items := dc.Items() + items := dc.Items(ctx, nil) // TODO: fault.Errors instead of nil for { select { diff --git a/src/internal/data/data_collection.go b/src/internal/data/data_collection.go index 3f72bed2e..f32f17135 100644 --- a/src/internal/data/data_collection.go +++ b/src/internal/data/data_collection.go @@ -7,6 +7,7 @@ import ( "time" "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -32,7 +33,7 @@ type Collection interface { // Each returned struct contains the next item in the collection // The channel is closed when there are no more items in the collection or if // an unrecoverable error caused an early termination in the sender. - Items() <-chan Stream + Items(ctx context.Context, errs *fault.Errors) <-chan Stream // FullPath returns a path struct that acts as a metadata tag for this // Collection. FullPath() path.Path diff --git a/src/internal/kopia/data_collection.go b/src/internal/kopia/data_collection.go index 23ab824a5..f4c7a8313 100644 --- a/src/internal/kopia/data_collection.go +++ b/src/internal/kopia/data_collection.go @@ -8,6 +8,7 @@ import ( "github.com/kopia/kopia/fs" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -23,7 +24,10 @@ type kopiaDataCollection struct { counter ByteCounter } -func (kdc *kopiaDataCollection) Items() <-chan data.Stream { +func (kdc *kopiaDataCollection) Items( + ctx context.Context, + errs *fault.Errors, +) <-chan data.Stream { res := make(chan data.Stream) go func() { diff --git a/src/internal/kopia/data_collection_test.go b/src/internal/kopia/data_collection_test.go index d363c843f..108dc7868 100644 --- a/src/internal/kopia/data_collection_test.go +++ b/src/internal/kopia/data_collection_test.go @@ -16,6 +16,7 @@ import ( "github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -94,6 +95,9 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() { for _, test := range table { suite.Run(test.name, func() { + ctx, flush := tester.NewContext() + defer flush() + t := suite.T() c := kopiaDataCollection{ @@ -102,7 +106,7 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() { } count := 0 - for returnedStream := range c.Items() { + for returnedStream := range c.Items(ctx, fault.New(true)) { require.Less(t, count, len(test.streams)) assert.Equal(t, returnedStream.UUID(), uuids[count]) diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index 64feed06c..e1cb8c923 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -286,7 +286,7 @@ func collectionEntries( // Track which items have already been seen so we can skip them if we see // them again in the data from the base snapshot. seen = map[string]struct{}{} - items = streamedEnts.Items() + items = streamedEnts.Items(ctx, progress.errs) ) if lp, ok := streamedEnts.(data.LocationPather); ok { diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 67bb311c1..8c1505ed3 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -52,17 +52,20 @@ var ( testFileData6 = testFileData ) +//revive:disable:context-as-argument func testForFiles( t *testing.T, + ctx context.Context, expected map[string][]byte, collections []data.RestoreCollection, ) { + //revive:enable:context-as-argument t.Helper() count := 0 for _, c := range collections { - for s := range c.Items() { + for s := range c.Items(ctx, fault.New(true)) { count++ fullPath, err := c.FullPath().Append(s.UUID(), true) @@ -383,7 +386,7 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { require.NoError(t, err) assert.Equal(t, 2, len(result)) - testForFiles(t, expected, result) + testForFiles(t, ctx, expected, result) } type mockBackupCollection struct { @@ -391,7 +394,7 @@ type mockBackupCollection struct { streams []data.Stream } -func (c *mockBackupCollection) Items() <-chan data.Stream { +func (c *mockBackupCollection) Items(context.Context, *fault.Errors) <-chan data.Stream { res := make(chan data.Stream) go func() { @@ -926,6 +929,9 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() { for _, test := range table { suite.Run(test.name, func() { + ctx, flush := tester.NewContext() + defer flush() + t := suite.T() // May slightly overallocate as only items that are actually in our map @@ -958,7 +964,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() { assert.Len(t, result, test.expectedCollections) assert.Less(t, int64(0), ic.i) - testForFiles(t, expected, result) + testForFiles(t, ctx, expected, result) }) } } diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index c14090bde..521e87a80 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -257,7 +257,7 @@ func checkMetadataFilesExist( for _, col := range cols { itemNames := []string{} - for item := range col.Items() { + for item := range col.Items(ctx, fault.New(true)) { assert.Implements(t, (*data.StreamSize)(nil), item) s := item.(data.StreamSize) diff --git a/src/internal/operations/manifests_test.go b/src/internal/operations/manifests_test.go index a234b81e8..3e6b12307 100644 --- a/src/internal/operations/manifests_test.go +++ b/src/internal/operations/manifests_test.go @@ -53,7 +53,7 @@ type mockColl struct { p path.Path } -func (mc mockColl) Items() <-chan data.Stream { +func (mc mockColl) Items(context.Context, *fault.Errors) <-chan data.Stream { return nil } diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index a0d55136d..e5cd2c1ce 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -129,7 +129,7 @@ func (ss *streamStore) ReadBackupDetails( var d details.Details found := false - items := dc.Items() + items := dc.Items(ctx, errs) for { select { @@ -195,7 +195,7 @@ func (dc *streamCollection) DoNotMergeItems() bool { // Items() always returns a channel with a single data.Stream // representing the object to be persisted -func (dc *streamCollection) Items() <-chan data.Stream { +func (dc *streamCollection) Items(context.Context, *fault.Errors) <-chan data.Stream { items := make(chan data.Stream, 1) defer close(items) items <- dc.item