From feddd9b183e1941edfae88442710727059f5f50a Mon Sep 17 00:00:00 2001 From: ryanfkeepers Date: Thu, 23 Mar 2023 14:43:25 -0600 Subject: [PATCH] replace graphConnector with interface Replaces the operations graphConnector reference with an interface. Restore and Backups have separate, unique interfaces. --- src/internal/connector/data_collections.go | 22 ++--- .../connector/data_collections_test.go | 13 +-- src/internal/connector/graph_connector.go | 2 +- .../graph_connector_disconnected_test.go | 3 +- .../connector/graph_connector_test.go | 20 +++-- .../mockconnector/mock_data_connector.go | 56 +++++++++++++ src/internal/kopia/wrapper.go | 6 +- src/internal/kopia/wrapper_test.go | 14 ++-- src/internal/operations/backup.go | 55 +++++++++---- .../operations/backup_integration_test.go | 10 +-- src/internal/operations/backup_test.go | 14 ++-- src/internal/operations/operation.go | 8 -- src/internal/operations/operation_test.go | 14 ++-- src/internal/operations/restore.go | 81 ++++++++++++++++--- src/internal/operations/restore_test.go | 10 +-- src/internal/streamstore/streamstore.go | 4 +- 16 files changed, 234 insertions(+), 98 deletions(-) create mode 100644 src/internal/connector/mockconnector/mock_data_connector.go diff --git a/src/internal/connector/data_collections.go b/src/internal/connector/data_collections.go index 41489ff4a..39fa887a3 100644 --- a/src/internal/connector/data_collections.go +++ b/src/internal/connector/data_collections.go @@ -27,19 +27,23 @@ import ( // Data Collections // --------------------------------------------------------------------------- -// DataCollections utility function to launch backup operations for exchange and -// onedrive. metadataCols contains any collections with metadata files that may -// be useful for the current backup. Metadata can include things like delta -// tokens or the previous backup's folder hierarchy. The absence of metadataCols -// results in all data being pulled. -func (gc *GraphConnector) DataCollections( +// ProduceBackupCollections generates a slice of backup collections for the service +// specified in the selectors. +// The metadata field can include things like delta tokens or the previous backup's +// folder hierarchy. The absence of metadata causes the collection creation to ignore +// prior history (ie, incrementals) and run a full backup. +func (gc *GraphConnector) ProduceBackupCollections( ctx context.Context, + ownerID, ownerName string, sels selectors.Selector, metadata []data.RestoreCollection, ctrlOpts control.Options, errs *fault.Bus, ) ([]data.BackupCollection, map[string]map[string]struct{}, error) { - ctx, end := diagnostics.Span(ctx, "gc:dataCollections", diagnostics.Index("service", sels.Service.String())) + ctx, end := diagnostics.Span( + ctx, + "gc:produceBackupCollections", + diagnostics.Index("service", sels.Service.String())) defer end() err := verifyBackupInputs(sels, gc.GetSiteIDs()) @@ -188,10 +192,10 @@ func checkServiceEnabled( return true, nil } -// RestoreDataCollections restores data from the specified collections +// ConsumeRestoreCollections restores data from the specified collections // into M365 using the GraphAPI. // SideEffect: gc.status is updated at the completion of operation -func (gc *GraphConnector) RestoreDataCollections( +func (gc *GraphConnector) ConsumeRestoreCollections( ctx context.Context, backupVersion int, acct account.Account, diff --git a/src/internal/connector/data_collections_test.go b/src/internal/connector/data_collections_test.go index 44cffd192..e17001153 100644 --- a/src/internal/connector/data_collections_test.go +++ b/src/internal/connector/data_collections_test.go @@ -129,7 +129,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection } } - status := connector.AwaitStatus() + status := connector.Wait() assert.NotZero(t, status.Metrics.Successes) t.Log(status.String()) }) @@ -205,8 +205,9 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestDataCollections_invali suite.Run(test.name, func() { t := suite.T() - collections, excludes, err := connector.DataCollections( + collections, excludes, err := connector.ProduceBackupCollections( ctx, + owners[0], owners[0], test.getSelector(t), nil, control.Options{}, @@ -286,7 +287,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti } } - status := connector.AwaitStatus() + status := connector.Wait() assert.NotZero(t, status.Metrics.Successes) t.Log(status.String()) }) @@ -336,8 +337,9 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) TestCreateShar sel := selectors.NewSharePointBackup(siteIDs) sel.Include(sel.LibraryFolders([]string{"foo"}, selectors.PrefixMatch())) - cols, excludes, err := gc.DataCollections( + cols, excludes, err := gc.ProduceBackupCollections( ctx, + siteIDs[0], siteIDs[0], sel.Selector, nil, control.Options{}, @@ -374,8 +376,9 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) TestCreateShar sel := selectors.NewSharePointBackup(siteIDs) sel.Include(sel.Lists(selectors.Any(), selectors.PrefixMatch())) - cols, excludes, err := gc.DataCollections( + cols, excludes, err := gc.ProduceBackupCollections( ctx, + siteIDs[0], siteIDs[0], sel.Selector, nil, control.Options{}, diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 2f8652f81..1fa7a62a7 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -223,7 +223,7 @@ func (gc *GraphConnector) UnionSiteIDsAndWebURLs( } // AwaitStatus waits for all gc tasks to complete and then returns status -func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus { +func (gc *GraphConnector) Wait() *support.ConnectorOperationStatus { defer func() { if gc.region != nil { gc.region.End() diff --git a/src/internal/connector/graph_connector_disconnected_test.go b/src/internal/connector/graph_connector_disconnected_test.go index d92a018e4..019d5415a 100644 --- a/src/internal/connector/graph_connector_disconnected_test.go +++ b/src/internal/connector/graph_connector_disconnected_test.go @@ -111,8 +111,7 @@ func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_Status() { go statusTestTask(&gc, 4, 1, 1) go statusTestTask(&gc, 4, 1, 1) - status := gc.AwaitStatus() - + status := gc.Wait() t := suite.T() assert.NotEmpty(t, gc.PrintableStatus()) diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index dae0ed4cc..61825c87c 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -256,7 +256,7 @@ func (suite *GraphConnectorIntegrationSuite) TestRestoreFailsBadService() { assert.Error(t, err, clues.ToCore(err)) assert.NotNil(t, deets) - status := suite.connector.AwaitStatus() + status := suite.connector.Wait() assert.Equal(t, 0, status.Metrics.Objects) assert.Equal(t, 0, status.Folders) assert.Equal(t, 0, status.Metrics.Successes) @@ -335,7 +335,7 @@ func (suite *GraphConnectorIntegrationSuite) TestEmptyCollections() { require.NoError(t, err, clues.ToCore(err)) assert.NotNil(t, deets) - stats := suite.connector.AwaitStatus() + stats := suite.connector.Wait() assert.Zero(t, stats.Metrics.Objects) assert.Zero(t, stats.Folders) assert.Zero(t, stats.Metrics.Successes) @@ -412,7 +412,7 @@ func runRestore( require.NoError(t, err, clues.ToCore(err)) assert.NotNil(t, deets) - status := restoreGC.AwaitStatus() + status := restoreGC.Wait() runTime := time.Since(start) assert.Equal(t, numRestoreItems, status.Metrics.Objects, "restored status.Metrics.Objects") @@ -457,8 +457,10 @@ func runBackupAndCompare( t.Logf("Selective backup of %s\n", backupSel) start := time.Now() - dcs, excludes, err := backupGC.DataCollections( + dcs, excludes, err := backupGC.ProduceBackupCollections( ctx, + backupSel.DiscreteOwner, + backupSel.DiscreteOwner, backupSel, nil, config.opts, @@ -480,7 +482,7 @@ func runBackupAndCompare( config.dest, config.opts.RestorePermissions) - status := backupGC.AwaitStatus() + status := backupGC.Wait() assert.Equalf(t, totalItems+skipped, status.Metrics.Objects, "backup status.Metrics.Objects; wanted %d items + %d skipped", totalItems, skipped) @@ -979,7 +981,7 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames require.NoError(t, err, clues.ToCore(err)) require.NotNil(t, deets) - status := restoreGC.AwaitStatus() + status := restoreGC.Wait() // Always just 1 because it's just 1 collection. assert.Equal(t, totalItems, status.Metrics.Objects, "status.Metrics.Objects") assert.Equal(t, totalItems, status.Metrics.Successes, "status.Metrics.Successes") @@ -996,8 +998,10 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames backupSel := backupSelectorForExpected(t, test.service, expectedDests) t.Log("Selective backup of", backupSel) - dcs, excludes, err := backupGC.DataCollections( + dcs, excludes, err := backupGC.ProduceBackupCollections( ctx, + backupSel.DiscreteOwner, + backupSel.DiscreteOwner, backupSel, nil, control.Options{ @@ -1023,7 +1027,7 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames control.RestoreDestination{}, true) - status := backupGC.AwaitStatus() + status := backupGC.Wait() assert.Equal(t, allItems+skipped, status.Metrics.Objects, "status.Metrics.Objects") assert.Equal(t, allItems+skipped, status.Metrics.Successes, "status.Metrics.Successes") }) diff --git a/src/internal/connector/mockconnector/mock_data_connector.go b/src/internal/connector/mockconnector/mock_data_connector.go new file mode 100644 index 000000000..26b99a5a7 --- /dev/null +++ b/src/internal/connector/mockconnector/mock_data_connector.go @@ -0,0 +1,56 @@ +package mockconnector + +import ( + "context" + + "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/selectors" +) + +type GraphConnector struct { + Collections []data.BackupCollection + Exclude map[string]map[string]struct{} + + Deets *details.Details + + Err error + + Status *support.ConnectorOperationStatus +} + +func (gc GraphConnector) ProduceBackupCollections( + _ context.Context, + _, _ string, + _ selectors.Selector, + _ []data.RestoreCollection, + _ control.Options, + _ *fault.Bus, +) ( + []data.BackupCollection, + map[string]map[string]struct{}, + error, +) { + return gc.Collections, gc.Exclude, gc.Err +} + +func (gc GraphConnector) Wait() *support.ConnectorOperationStatus { + return gc.Status +} + +func (gc GraphConnector) ConsumeRestoreCollections( + _ context.Context, + _ int, + _ account.Account, + _ selectors.Selector, + _ control.RestoreDestination, + _ control.Options, + _ []data.RestoreCollection, + _ *fault.Bus, +) (*details.Details, error) { + return gc.Deets, gc.Err +} diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 89258ec78..54fae8b85 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -124,13 +124,13 @@ type PrevRefs struct { Location path.Path } -// BackupCollections takes a set of collections and creates a kopia snapshot +// ConsumeBackupCollections takes a set of collections and creates a kopia snapshot // with the data that they contain. previousSnapshots is used for incremental // backups and should represent the base snapshot from which metadata is sourced // from as well as any incomplete snapshot checkpoints that may contain more // recent data than the base snapshot. The absence of previousSnapshots causes a // complete backup of all data. -func (w Wrapper) BackupCollections( +func (w Wrapper) ConsumeBackupCollections( ctx context.Context, previousSnapshots []IncrementalBase, collections []data.BackupCollection, @@ -143,7 +143,7 @@ func (w Wrapper) BackupCollections( return nil, nil, nil, clues.Stack(errNotConnected).WithClues(ctx) } - ctx, end := diagnostics.Span(ctx, "kopia:backupCollections") + ctx, end := diagnostics.Span(ctx, "kopia:consumeBackupCollections") defer end() if len(collections) == 0 && len(globalExcludeSet) == 0 { diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 2f03a1fb2..2ef5ee833 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -276,7 +276,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { suite.Run(test.name, func() { t := suite.T() - stats, deets, _, err := suite.w.BackupCollections( + stats, deets, _, err := suite.w.ConsumeBackupCollections( suite.ctx, prevSnaps, collections, @@ -423,7 +423,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_NoDetailsForMeta() { t := suite.T() collections := test.cols() - stats, deets, prevShortRefs, err := suite.w.BackupCollections( + stats, deets, prevShortRefs, err := suite.w.ConsumeBackupCollections( suite.ctx, prevSnaps, collections, @@ -525,7 +525,7 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { fp2, err := suite.storePath2.Append(dc2.Names[0], true) require.NoError(t, err, clues.ToCore(err)) - stats, _, _, err := w.BackupCollections( + stats, _, _, err := w.ConsumeBackupCollections( ctx, nil, []data.BackupCollection{dc1, dc2}, @@ -644,7 +644,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { }, } - stats, deets, _, err := suite.w.BackupCollections( + stats, deets, _, err := suite.w.ConsumeBackupCollections( suite.ctx, nil, collections, @@ -706,7 +706,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollectionsHandlesNoCollections() ctx, flush := tester.NewContext() defer flush() - s, d, _, err := suite.w.BackupCollections( + s, d, _, err := suite.w.ConsumeBackupCollections( ctx, nil, test.collections, @@ -866,7 +866,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { tags[k] = "" } - stats, deets, _, err := suite.w.BackupCollections( + stats, deets, _, err := suite.w.ConsumeBackupCollections( suite.ctx, nil, collections, @@ -1018,7 +1018,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { } } - stats, _, _, err := suite.w.BackupCollections( + stats, _, _, err := suite.w.ConsumeBackupCollections( suite.ctx, []IncrementalBase{ { diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 89b223cd2..edef54f26 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -9,7 +9,6 @@ import ( "github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/common/crash" - "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/diagnostics" @@ -42,6 +41,7 @@ type BackupOperation struct { Version string `json:"version"` account account.Account + bp BackupProducer // when true, this allows for incremental backups instead of full data pulls incremental bool @@ -60,14 +60,14 @@ func NewBackupOperation( opts control.Options, kw *kopia.Wrapper, sw *store.Wrapper, - gc *connector.GraphConnector, + bp BackupProducer, acct account.Account, selector selectors.Selector, ownerName string, bus events.Eventer, ) (BackupOperation, error) { op := BackupOperation{ - operation: newOperation(opts, bus, kw, sw, gc), + operation: newOperation(opts, bus, kw, sw), ResourceOwner: selector.DiscreteOwner, ResourceOwnerName: ownerName, Selectors: selector, @@ -92,6 +92,10 @@ func (op BackupOperation) validate() error { return clues.New("backup requires a resource owner") } + if op.bp == nil { + return errors.New("missing backup producer") + } + return op.operation.validate() } @@ -243,14 +247,22 @@ func (op *BackupOperation) do( return nil, clues.Wrap(err, "producing manifests and metadata") } - cs, excludes, err := produceBackupDataCollections(ctx, op.gc, op.Selectors, mdColls, op.Options, op.Errors) + cs, excludes, err := produceBackupDataCollections( + ctx, + op.bp, + op.ResourceOwner, + op.ResourceOwnerName, + op.Selectors, + mdColls, + op.Options, + op.Errors) if err != nil { return nil, clues.Wrap(err, "producing backup data collections") } ctx = clues.Add(ctx, "coll_count", len(cs)) - writeStats, deets, toMerge, err := consumeBackupDataCollections( + writeStats, deets, toMerge, err := consumeBackupCollections( ctx, op.kopia, op.account.ID(), @@ -279,9 +291,9 @@ func (op *BackupOperation) do( return nil, clues.Wrap(err, "merging details") } - opStats.gc = op.gc.AwaitStatus() + opStats.gc = op.bp.Wait() - logger.Ctx(ctx).Debug(op.gc.PrintableStatus()) + logger.Ctx(ctx).Debug(opStats.gc) return deets, nil } @@ -309,10 +321,25 @@ func useIncrementalBackup(sel selectors.Selector, opts control.Options) bool { // Producer funcs // --------------------------------------------------------------------------- +type BackupProducer interface { + ProduceBackupCollections( + ctx context.Context, + ownerID, ownerName string, + sels selectors.Selector, + metadata []data.RestoreCollection, + ctrlOpts control.Options, + errs *fault.Bus, + ) ([]data.BackupCollection, map[string]map[string]struct{}, error) + // TODO: ConnectorOperationStatus should be replaced with something + // more generic. + Wait() *support.ConnectorOperationStatus +} + // calls the producer to generate collections of data to backup func produceBackupDataCollections( ctx context.Context, - gc *connector.GraphConnector, + bp BackupProducer, + ownerID, ownerName string, sel selectors.Selector, metadata []data.RestoreCollection, ctrlOpts control.Options, @@ -325,15 +352,15 @@ func produceBackupDataCollections( closer() }() - return gc.DataCollections(ctx, sel, metadata, ctrlOpts, errs) + return bp.ProduceBackupCollections(ctx, ownerID, ownerName, sel, metadata, ctrlOpts, errs) } // --------------------------------------------------------------------------- // Consumer funcs // --------------------------------------------------------------------------- -type backuper interface { - BackupCollections( +type BackupConsumer interface { + ConsumeBackupCollections( ctx context.Context, bases []kopia.IncrementalBase, cs []data.BackupCollection, @@ -389,9 +416,9 @@ func builderFromReason(ctx context.Context, tenant string, r kopia.Reason) (*pat } // calls kopia to backup the collections of data -func consumeBackupDataCollections( +func consumeBackupCollections( ctx context.Context, - bu backuper, + bc BackupConsumer, tenantID string, reasons []kopia.Reason, mans []*kopia.ManifestEntry, @@ -465,7 +492,7 @@ func consumeBackupDataCollections( "base_backup_id", mbID) } - kopiaStats, deets, itemsSourcedFromBase, err := bu.BackupCollections( + kopiaStats, deets, itemsSourcedFromBase, err := bc.ConsumeBackupCollections( ctx, bases, cs, diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index d7604b008..c4833736f 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -383,7 +383,7 @@ func generateContainerOfItems( dest, collections) - deets, err := gc.RestoreDataCollections( + deets, err := gc.ConsumeRestoreCollections( ctx, backupVersion, acct, @@ -539,7 +539,7 @@ func (suite *BackupOpIntegrationSuite) SetupSuite() { func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { kw := &kopia.Wrapper{} sw := &store.Wrapper{} - gc := &connector.GraphConnector{} + gc := &mockconnector.GraphConnector{} acct := tester.NewM365Account(suite.T()) table := []struct { @@ -547,7 +547,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { opts control.Options kw *kopia.Wrapper sw *store.Wrapper - gc *connector.GraphConnector + bp BackupProducer acct account.Account targets []string errCheck assert.ErrorAssertionFunc @@ -555,7 +555,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { {"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError}, {"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error}, {"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error}, - {"missing graphconnector", control.Options{}, kw, sw, nil, acct, nil, assert.Error}, + {"missing backup producer", control.Options{}, kw, sw, nil, acct, nil, assert.Error}, } for _, test := range table { suite.Run(test.name, func() { @@ -567,7 +567,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { test.opts, test.kw, test.sw, - test.gc, + test.bp, test.acct, selectors.Selector{DiscreteOwner: "test"}, "test-name", diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index 74641704a..85b900588 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -14,7 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/alcionai/corso/src/internal/connector" + "github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" evmock "github.com/alcionai/corso/src/internal/events/mock" @@ -85,9 +85,9 @@ func checkPaths(t *testing.T, expected, got []path.Path) { assert.ElementsMatch(t, expected, got) } -// ----- backup producer +// ----- backup consumer -type mockBackuper struct { +type mockBackupConsumer struct { checkFunc func( bases []kopia.IncrementalBase, cs []data.BackupCollection, @@ -95,7 +95,7 @@ type mockBackuper struct { buildTreeWithBase bool) } -func (mbu mockBackuper) BackupCollections( +func (mbu mockBackupConsumer) ConsumeBackupCollections( ctx context.Context, bases []kopia.IncrementalBase, cs []data.BackupCollection, @@ -360,7 +360,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() { var ( kw = &kopia.Wrapper{} sw = &store.Wrapper{} - gc = &connector.GraphConnector{} + gc = &mockconnector.GraphConnector{} acct = account.Account{} now = time.Now() ) @@ -564,7 +564,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_ConsumeBackupDataCollections ctx, flush := tester.NewContext() defer flush() - mbu := &mockBackuper{ + mbu := &mockBackupConsumer{ checkFunc: func( bases []kopia.IncrementalBase, cs []data.BackupCollection, @@ -576,7 +576,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_ConsumeBackupDataCollections } //nolint:errcheck - consumeBackupDataCollections( + consumeBackupCollections( ctx, mbu, tenant, diff --git a/src/internal/operations/operation.go b/src/internal/operations/operation.go index 8dc27c87a..e4cd201db 100644 --- a/src/internal/operations/operation.go +++ b/src/internal/operations/operation.go @@ -5,7 +5,6 @@ import ( "github.com/alcionai/clues" - "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/pkg/control" @@ -57,7 +56,6 @@ type operation struct { bus events.Eventer kopia *kopia.Wrapper store *store.Wrapper - gc *connector.GraphConnector } func newOperation( @@ -65,7 +63,6 @@ func newOperation( bus events.Eventer, kw *kopia.Wrapper, sw *store.Wrapper, - gc *connector.GraphConnector, ) operation { return operation{ CreatedAt: time.Now(), @@ -75,7 +72,6 @@ func newOperation( bus: bus, kopia: kw, store: sw, - gc: gc, Status: InProgress, } @@ -90,9 +86,5 @@ func (op operation) validate() error { return clues.New("missing modelstore") } - if op.gc == nil { - return clues.New("missing graph connector") - } - return nil } diff --git a/src/internal/operations/operation_test.go b/src/internal/operations/operation_test.go index 5d0425022..27cf6185f 100644 --- a/src/internal/operations/operation_test.go +++ b/src/internal/operations/operation_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" - "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/tester" @@ -26,30 +25,27 @@ func TestOperationSuite(t *testing.T) { func (suite *OperationSuite) TestNewOperation() { t := suite.T() - op := newOperation(control.Options{}, events.Bus{}, nil, nil, nil) + op := newOperation(control.Options{}, events.Bus{}, nil, nil) assert.Greater(t, op.CreatedAt, time.Time{}) } func (suite *OperationSuite) TestOperation_Validate() { kwStub := &kopia.Wrapper{} swStub := &store.Wrapper{} - gcStub := &connector.GraphConnector{} table := []struct { name string kw *kopia.Wrapper sw *store.Wrapper - gc *connector.GraphConnector errCheck assert.ErrorAssertionFunc }{ - {"good", kwStub, swStub, gcStub, assert.NoError}, - {"missing kopia wrapper", nil, swStub, gcStub, assert.Error}, - {"missing store wrapper", kwStub, nil, gcStub, assert.Error}, - {"missing graph connector", kwStub, swStub, nil, assert.Error}, + {"good", kwStub, swStub, assert.NoError}, + {"missing kopia wrapper", nil, swStub, assert.Error}, + {"missing store wrapper", kwStub, nil, assert.Error}, } for _, test := range table { suite.Run(test.name, func() { - err := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw, test.gc).validate() + err := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw).validate() test.errCheck(suite.T(), err, clues.ToCore(err)) }) } diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 0365765af..36f84ffc9 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -7,10 +7,10 @@ import ( "github.com/alcionai/clues" "github.com/google/uuid" + "github.com/pkg/errors" "github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/common/crash" - "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector/onedrive" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" @@ -42,6 +42,7 @@ type RestoreOperation struct { Version string `json:"version"` account account.Account + rc RestoreConsumer } // RestoreResults aggregate the details of the results of the operation. @@ -56,7 +57,7 @@ func NewRestoreOperation( opts control.Options, kw *kopia.Wrapper, sw *store.Wrapper, - gc *connector.GraphConnector, + rc RestoreConsumer, acct account.Account, backupID model.StableID, sel selectors.Selector, @@ -64,12 +65,13 @@ func NewRestoreOperation( bus events.Eventer, ) (RestoreOperation, error) { op := RestoreOperation{ - operation: newOperation(opts, bus, kw, sw, gc), + operation: newOperation(opts, bus, kw, sw), BackupID: backupID, Selectors: sel, Destination: dest, Version: "v0", account: acct, + rc: rc, } if err := op.validate(); err != nil { return RestoreOperation{}, err @@ -79,6 +81,10 @@ func NewRestoreOperation( } func (op RestoreOperation) validate() error { + if op.rc == nil { + return clues.New("missing restore consumer") + } + return op.operation.validate() } @@ -235,12 +241,9 @@ func (op *RestoreOperation) do( opStats.resourceCount = 1 opStats.cs = dcs - restoreComplete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Restoring data")) - defer closer() - defer close(restoreComplete) - - restoreDetails, err := op.gc.RestoreDataCollections( + deets, err = consumeRestoreCollections( ctx, + op.rc, bup.Version, op.account, op.Selectors, @@ -252,13 +255,11 @@ func (op *RestoreOperation) do( return nil, clues.Wrap(err, "restoring collections") } - restoreComplete <- struct{}{} + opStats.gc = op.rc.Wait() - opStats.gc = op.gc.AwaitStatus() + logger.Ctx(ctx).Debug(opStats.gc) - logger.Ctx(ctx).Debug(op.gc.PrintableStatus()) - - return restoreDetails, nil + return deets, nil } // persists details and statistics about the restore operation. @@ -312,6 +313,60 @@ func (op *RestoreOperation) persistResults( return op.Errors.Failure() } +// --------------------------------------------------------------------------- +// Restorer funcs +// --------------------------------------------------------------------------- + +type RestoreConsumer interface { + ConsumeRestoreCollections( + ctx context.Context, + backupVersion int, + acct account.Account, + selector selectors.Selector, + dest control.RestoreDestination, + opts control.Options, + dcs []data.RestoreCollection, + errs *fault.Bus, + ) (*details.Details, error) + // TODO: ConnectorOperationStatus should be replaced with something + // more generic. + Wait() *support.ConnectorOperationStatus +} + +func consumeRestoreCollections( + ctx context.Context, + rc RestoreConsumer, + backupVersion int, + acct account.Account, + sel selectors.Selector, + dest control.RestoreDestination, + opts control.Options, + dcs []data.RestoreCollection, + errs *fault.Bus, +) (*details.Details, error) { + complete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Restoring data")) + defer func() { + complete <- struct{}{} + close(complete) + closer() + }() + + deets, err := rc.ConsumeRestoreCollections( + ctx, + backupVersion, + acct, + sel, + dest, + opts, + dcs, + errs) + if err != nil { + return nil, errors.Wrap(err, "restoring collections") + } + + return deets, nil +} + // formatDetailsForRestoration reduces the provided detail entries according to the // selector specifications. func formatDetailsForRestoration( diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index 0ffbf587e..32aee31d8 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -50,7 +50,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { var ( kw = &kopia.Wrapper{} sw = &store.Wrapper{} - gc = &connector.GraphConnector{} + gc = &mockconnector.GraphConnector{} acct = account.Account{} now = time.Now() dest = tester.DefaultTestRestoreDestination() @@ -217,7 +217,7 @@ func (suite *RestoreOpIntegrationSuite) TearDownSuite() { func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { kw := &kopia.Wrapper{} sw := &store.Wrapper{} - gc := &connector.GraphConnector{} + gc := &mockconnector.GraphConnector{} acct := tester.NewM365Account(suite.T()) dest := tester.DefaultTestRestoreDestination() @@ -226,7 +226,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { opts control.Options kw *kopia.Wrapper sw *store.Wrapper - gc *connector.GraphConnector + rc RestoreConsumer acct account.Account targets []string errCheck assert.ErrorAssertionFunc @@ -234,7 +234,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { {"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError}, {"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error}, {"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error}, - {"missing graphConnector", control.Options{}, kw, sw, nil, acct, nil, assert.Error}, + {"missing restore consumer", control.Options{}, kw, sw, nil, acct, nil, assert.Error}, } for _, test := range table { suite.Run(test.name, func() { @@ -246,7 +246,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { test.opts, test.kw, test.sw, - test.gc, + test.rc, test.acct, "backup-id", selectors.Selector{DiscreteOwner: "test"}, diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index ec728df67..6a1c955c0 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -222,7 +222,7 @@ func collect( } type backuper interface { - BackupCollections( + ConsumeBackupCollections( ctx context.Context, bases []kopia.IncrementalBase, cs []data.BackupCollection, @@ -240,7 +240,7 @@ func write( dbcs []data.BackupCollection, errs *fault.Bus, ) (string, error) { - backupStats, _, _, err := bup.BackupCollections( + backupStats, _, _, err := bup.ConsumeBackupCollections( ctx, nil, dbcs,