From 7e04cefa2c30103ff8fd0a2a73bdc86d321c1937 Mon Sep 17 00:00:00 2001 From: Keepers Date: Fri, 31 Mar 2023 09:56:19 -0600 Subject: [PATCH] replace graphConnector with interface (#2932) Replaces the operations graphConnector reference with an interface. Restore and Backups have separate, unique interfaces. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :sunflower: Feature #### Issue(s) * #2825 #### Test Plan - [x] :zap: Unit test - [x] :green_heart: E2E --- src/cmd/factory/impl/common.go | 2 +- src/internal/connector/data_collections.go | 22 +++-- .../connector/data_collections_test.go | 17 ++-- src/internal/connector/graph_connector.go | 20 ++++- .../graph_connector_disconnected_test.go | 9 +- .../connector/graph_connector_test.go | 76 +++++++++-------- .../mockconnector/mock_data_connector.go | 55 ++++++++++++ src/internal/data/metrics.go | 17 ++++ src/internal/kopia/wrapper.go | 19 +++-- src/internal/kopia/wrapper_test.go | 30 +++---- src/internal/operations/backup.go | 58 ++++++------- .../operations/backup_integration_test.go | 17 ++-- src/internal/operations/backup_test.go | 31 ++++--- src/internal/operations/inject/inject.go | 66 +++++++++++++++ src/internal/operations/manifests.go | 7 +- src/internal/operations/manifests_test.go | 76 +++++++++-------- src/internal/operations/operation.go | 8 -- src/internal/operations/operation_test.go | 14 ++-- src/internal/operations/restore.go | 84 ++++++++++++------- src/internal/operations/restore_test.go | 26 +++--- src/internal/streamstore/streamstore.go | 32 ++----- 21 files changed, 430 insertions(+), 256 deletions(-) create mode 100644 src/internal/connector/mockconnector/mock_data_connector.go create mode 100644 src/internal/data/metrics.go create mode 100644 src/internal/operations/inject/inject.go diff --git a/src/cmd/factory/impl/common.go b/src/cmd/factory/impl/common.go index 6215b58d8..4a8ee1b52 100644 --- a/src/cmd/factory/impl/common.go +++ b/src/cmd/factory/impl/common.go @@ -96,7 +96,7 @@ func generateAndRestoreItems( print.Infof(ctx, "Generating %d %s items in %s\n", howMany, cat, Destination) - return gc.RestoreDataCollections(ctx, version.Backup, acct, sel, dest, opts, dataColls, errs) + return gc.ConsumeRestoreCollections(ctx, version.Backup, acct, sel, dest, opts, dataColls, errs) } // ------------------------------------------------------------------------------------------ diff --git a/src/internal/connector/data_collections.go b/src/internal/connector/data_collections.go index 728e3b52f..f7f2ba143 100644 --- a/src/internal/connector/data_collections.go +++ b/src/internal/connector/data_collections.go @@ -27,19 +27,23 @@ import ( // Data Collections // --------------------------------------------------------------------------- -// DataCollections utility function to launch backup operations for exchange and -// onedrive. metadataCols contains any collections with metadata files that may -// be useful for the current backup. Metadata can include things like delta -// tokens or the previous backup's folder hierarchy. The absence of metadataCols -// results in all data being pulled. -func (gc *GraphConnector) DataCollections( +// ProduceBackupCollections generates a slice of data.BackupCollections for the service +// specified in the selectors. +// The metadata field can include things like delta tokens or the previous backup's +// folder hierarchy. The absence of metadata causes the collection creation to ignore +// prior history (ie, incrementals) and run a full backup. +func (gc *GraphConnector) ProduceBackupCollections( ctx context.Context, + ownerID, ownerName string, sels selectors.Selector, metadata []data.RestoreCollection, ctrlOpts control.Options, errs *fault.Bus, ) ([]data.BackupCollection, map[string]map[string]struct{}, error) { - ctx, end := diagnostics.Span(ctx, "gc:dataCollections", diagnostics.Index("service", sels.Service.String())) + ctx, end := diagnostics.Span( + ctx, + "gc:produceBackupCollections", + diagnostics.Index("service", sels.Service.String())) defer end() err := verifyBackupInputs(sels, gc.GetSiteIDs()) @@ -188,10 +192,10 @@ func checkServiceEnabled( return true, nil } -// RestoreDataCollections restores data from the specified collections +// ConsumeRestoreCollections restores data from the specified collections // into M365 using the GraphAPI. // SideEffect: gc.status is updated at the completion of operation -func (gc *GraphConnector) RestoreDataCollections( +func (gc *GraphConnector) ConsumeRestoreCollections( ctx context.Context, backupVersion int, acct account.Account, diff --git a/src/internal/connector/data_collections_test.go b/src/internal/connector/data_collections_test.go index 44cffd192..00d16c882 100644 --- a/src/internal/connector/data_collections_test.go +++ b/src/internal/connector/data_collections_test.go @@ -129,8 +129,8 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection } } - status := connector.AwaitStatus() - assert.NotZero(t, status.Metrics.Successes) + status := connector.Wait() + assert.NotZero(t, status.Successes) t.Log(status.String()) }) } @@ -205,8 +205,9 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestDataCollections_invali suite.Run(test.name, func() { t := suite.T() - collections, excludes, err := connector.DataCollections( + collections, excludes, err := connector.ProduceBackupCollections( ctx, + owners[0], owners[0], test.getSelector(t), nil, control.Options{}, @@ -286,8 +287,8 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti } } - status := connector.AwaitStatus() - assert.NotZero(t, status.Metrics.Successes) + status := connector.Wait() + assert.NotZero(t, status.Successes) t.Log(status.String()) }) } @@ -336,8 +337,9 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) TestCreateShar sel := selectors.NewSharePointBackup(siteIDs) sel.Include(sel.LibraryFolders([]string{"foo"}, selectors.PrefixMatch())) - cols, excludes, err := gc.DataCollections( + cols, excludes, err := gc.ProduceBackupCollections( ctx, + siteIDs[0], siteIDs[0], sel.Selector, nil, control.Options{}, @@ -374,8 +376,9 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) TestCreateShar sel := selectors.NewSharePointBackup(siteIDs) sel.Include(sel.Lists(selectors.Any(), selectors.PrefixMatch())) - cols, excludes, err := gc.DataCollections( + cols, excludes, err := gc.ProduceBackupCollections( ctx, + siteIDs[0], siteIDs[0], sel.Selector, nil, control.Options{}, diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 2f8652f81..692a9b83d 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -22,7 +22,9 @@ import ( "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/sharepoint" "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/diagnostics" + "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/filters" @@ -32,6 +34,12 @@ import ( // Graph Connector // --------------------------------------------------------------------------- +// must comply with BackupProducer and RestoreConsumer +var ( + _ inject.BackupProducer = &GraphConnector{} + _ inject.RestoreConsumer = &GraphConnector{} +) + // GraphConnector is a struct used to wrap the GraphServiceClient and // GraphRequestAdapter from the msgraph-sdk-go. Additional fields are for // bookkeeping and interfacing with other component. @@ -223,7 +231,7 @@ func (gc *GraphConnector) UnionSiteIDsAndWebURLs( } // AwaitStatus waits for all gc tasks to complete and then returns status -func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus { +func (gc *GraphConnector) Wait() *data.CollectionStats { defer func() { if gc.region != nil { gc.region.End() @@ -233,12 +241,18 @@ func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus { gc.wg.Wait() // clean up and reset statefulness - status := gc.status + dcs := data.CollectionStats{ + Folders: gc.status.Folders, + Objects: gc.status.Metrics.Objects, + Successes: gc.status.Metrics.Successes, + Bytes: gc.status.Metrics.Bytes, + Details: gc.status.String(), + } gc.wg = &sync.WaitGroup{} gc.status = support.ConnectorOperationStatus{} - return &status + return &dcs } // UpdateStatus is used by gc initiated tasks to indicate completion diff --git a/src/internal/connector/graph_connector_disconnected_test.go b/src/internal/connector/graph_connector_disconnected_test.go index d92a018e4..2d327246b 100644 --- a/src/internal/connector/graph_connector_disconnected_test.go +++ b/src/internal/connector/graph_connector_disconnected_test.go @@ -111,17 +111,16 @@ func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_Status() { go statusTestTask(&gc, 4, 1, 1) go statusTestTask(&gc, 4, 1, 1) - status := gc.AwaitStatus() - + stats := gc.Wait() t := suite.T() assert.NotEmpty(t, gc.PrintableStatus()) // Expect 8 objects - assert.Equal(t, 8, status.Metrics.Objects) + assert.Equal(t, 8, stats.Objects) // Expect 2 success - assert.Equal(t, 2, status.Metrics.Successes) + assert.Equal(t, 2, stats.Successes) // Expect 2 folders - assert.Equal(t, 2, status.Folders) + assert.Equal(t, 2, stats.Folders) } func (suite *DisconnectedGraphConnectorSuite) TestVerifyBackupInputs_allServices() { diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index dae0ed4cc..2a5fbc1ac 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -135,7 +135,7 @@ func (suite *GraphConnectorUnitSuite) TestUnionSiteIDsAndWebURLs() { } } -func (suite *GraphConnectorUnitSuite) TestGraphConnector_AwaitStatus() { +func (suite *GraphConnectorUnitSuite) TestGraphConnector_Wait() { ctx, flush := tester.NewContext() defer flush() @@ -156,14 +156,14 @@ func (suite *GraphConnectorUnitSuite) TestGraphConnector_AwaitStatus() { gc.wg.Add(1) gc.UpdateStatus(status) - result := gc.AwaitStatus() + result := gc.Wait() require.NotNil(t, result) assert.Nil(t, gc.region, "region") assert.Empty(t, gc.status, "status") assert.Equal(t, 1, result.Folders) - assert.Equal(t, 2, result.Metrics.Objects) - assert.Equal(t, 3, result.Metrics.Successes) - assert.Equal(t, int64(4), result.Metrics.Bytes) + assert.Equal(t, 2, result.Objects) + assert.Equal(t, 3, result.Successes) + assert.Equal(t, int64(4), result.Bytes) } // --------------------------------------------------------------------------- @@ -241,7 +241,7 @@ func (suite *GraphConnectorIntegrationSuite) TestRestoreFailsBadService() { } ) - deets, err := suite.connector.RestoreDataCollections( + deets, err := suite.connector.ConsumeRestoreCollections( ctx, version.Backup, acct, @@ -256,10 +256,10 @@ func (suite *GraphConnectorIntegrationSuite) TestRestoreFailsBadService() { assert.Error(t, err, clues.ToCore(err)) assert.NotNil(t, deets) - status := suite.connector.AwaitStatus() - assert.Equal(t, 0, status.Metrics.Objects) + status := suite.connector.Wait() + assert.Equal(t, 0, status.Objects) assert.Equal(t, 0, status.Folders) - assert.Equal(t, 0, status.Metrics.Successes) + assert.Equal(t, 0, status.Successes) } func (suite *GraphConnectorIntegrationSuite) TestEmptyCollections() { @@ -320,7 +320,7 @@ func (suite *GraphConnectorIntegrationSuite) TestEmptyCollections() { ctx, flush := tester.NewContext() defer flush() - deets, err := suite.connector.RestoreDataCollections( + deets, err := suite.connector.ConsumeRestoreCollections( ctx, version.Backup, suite.acct, @@ -335,10 +335,10 @@ func (suite *GraphConnectorIntegrationSuite) TestEmptyCollections() { require.NoError(t, err, clues.ToCore(err)) assert.NotNil(t, deets) - stats := suite.connector.AwaitStatus() - assert.Zero(t, stats.Metrics.Objects) + stats := suite.connector.Wait() + assert.Zero(t, stats.Objects) assert.Zero(t, stats.Folders) - assert.Zero(t, stats.Metrics.Successes) + assert.Zero(t, stats.Successes) }) } } @@ -400,7 +400,7 @@ func runRestore( restoreGC := loadConnector(ctx, t, graph.HTTPClient(graph.NoTimeout()), config.resource) restoreSel := getSelectorWith(t, config.service, config.resourceOwners, true) - deets, err := restoreGC.RestoreDataCollections( + deets, err := restoreGC.ConsumeRestoreCollections( ctx, backupVersion, config.acct, @@ -412,11 +412,11 @@ func runRestore( require.NoError(t, err, clues.ToCore(err)) assert.NotNil(t, deets) - status := restoreGC.AwaitStatus() + status := restoreGC.Wait() runTime := time.Since(start) - assert.Equal(t, numRestoreItems, status.Metrics.Objects, "restored status.Metrics.Objects") - assert.Equal(t, numRestoreItems, status.Metrics.Successes, "restored status.Metrics.Successes") + assert.Equal(t, numRestoreItems, status.Objects, "restored status.Objects") + assert.Equal(t, numRestoreItems, status.Successes, "restored status.Successes") assert.Len( t, deets.Entries, @@ -457,8 +457,10 @@ func runBackupAndCompare( t.Logf("Selective backup of %s\n", backupSel) start := time.Now() - dcs, excludes, err := backupGC.DataCollections( + dcs, excludes, err := backupGC.ProduceBackupCollections( ctx, + backupSel.DiscreteOwner, + backupSel.DiscreteOwner, backupSel, nil, config.opts, @@ -480,12 +482,12 @@ func runBackupAndCompare( config.dest, config.opts.RestorePermissions) - status := backupGC.AwaitStatus() + status := backupGC.Wait() - assert.Equalf(t, totalItems+skipped, status.Metrics.Objects, - "backup status.Metrics.Objects; wanted %d items + %d skipped", totalItems, skipped) - assert.Equalf(t, totalItems+skipped, status.Metrics.Successes, - "backup status.Metrics.Successes; wanted %d items + %d skipped", totalItems, skipped) + assert.Equalf(t, totalItems+skipped, status.Objects, + "backup status.Objects; wanted %d items + %d skipped", totalItems, skipped) + assert.Equalf(t, totalItems+skipped, status.Successes, + "backup status.Successes; wanted %d items + %d skipped", totalItems, skipped) } func runRestoreBackupTest( @@ -964,7 +966,7 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames ) restoreGC := loadConnector(ctx, t, graph.HTTPClient(graph.NoTimeout()), test.resource) - deets, err := restoreGC.RestoreDataCollections( + deets, err := restoreGC.ConsumeRestoreCollections( ctx, version.Backup, suite.acct, @@ -979,12 +981,12 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames require.NoError(t, err, clues.ToCore(err)) require.NotNil(t, deets) - status := restoreGC.AwaitStatus() + status := restoreGC.Wait() // Always just 1 because it's just 1 collection. - assert.Equal(t, totalItems, status.Metrics.Objects, "status.Metrics.Objects") - assert.Equal(t, totalItems, status.Metrics.Successes, "status.Metrics.Successes") - assert.Len( - t, deets.Entries, totalItems, + assert.Equal(t, totalItems, status.Objects, "status.Objects") + assert.Equal(t, totalItems, status.Successes, "status.Successes") + assert.Equal( + t, totalItems, len(deets.Entries), "details entries contains same item count as total successful items restored") t.Log("Restore complete") @@ -996,8 +998,10 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames backupSel := backupSelectorForExpected(t, test.service, expectedDests) t.Log("Selective backup of", backupSel) - dcs, excludes, err := backupGC.DataCollections( + dcs, excludes, err := backupGC.ProduceBackupCollections( ctx, + backupSel.DiscreteOwner, + backupSel.DiscreteOwner, backupSel, nil, control.Options{ @@ -1023,9 +1027,9 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames control.RestoreDestination{}, true) - status := backupGC.AwaitStatus() - assert.Equal(t, allItems+skipped, status.Metrics.Objects, "status.Metrics.Objects") - assert.Equal(t, allItems+skipped, status.Metrics.Successes, "status.Metrics.Successes") + status := backupGC.Wait() + assert.Equal(t, allItems+skipped, status.Objects, "status.Objects") + assert.Equal(t, allItems+skipped, status.Successes, "status.Successes") }) } } @@ -1147,8 +1151,10 @@ func (suite *GraphConnectorIntegrationSuite) TestBackup_CreatesPrefixCollections start = time.Now() ) - dcs, excludes, err := backupGC.DataCollections( + dcs, excludes, err := backupGC.ProduceBackupCollections( ctx, + backupSel.DiscreteOwner, + backupSel.DiscreteOwner, backupSel, nil, control.Options{ @@ -1191,7 +1197,7 @@ func (suite *GraphConnectorIntegrationSuite) TestBackup_CreatesPrefixCollections assert.ElementsMatch(t, test.categories, foundCategories) - backupGC.AwaitStatus() + backupGC.Wait() assert.NoError(t, errs.Failure()) }) diff --git a/src/internal/connector/mockconnector/mock_data_connector.go b/src/internal/connector/mockconnector/mock_data_connector.go new file mode 100644 index 000000000..453345a2c --- /dev/null +++ b/src/internal/connector/mockconnector/mock_data_connector.go @@ -0,0 +1,55 @@ +package mockconnector + +import ( + "context" + + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/selectors" +) + +type GraphConnector struct { + Collections []data.BackupCollection + Exclude map[string]map[string]struct{} + + Deets *details.Details + + Err error + + Stats data.CollectionStats +} + +func (gc GraphConnector) ProduceBackupCollections( + _ context.Context, + _, _ string, + _ selectors.Selector, + _ []data.RestoreCollection, + _ control.Options, + _ *fault.Bus, +) ( + []data.BackupCollection, + map[string]map[string]struct{}, + error, +) { + return gc.Collections, gc.Exclude, gc.Err +} + +func (gc GraphConnector) Wait() *data.CollectionStats { + return &gc.Stats +} + +func (gc GraphConnector) ConsumeRestoreCollections( + _ context.Context, + _ int, + _ account.Account, + _ selectors.Selector, + _ control.RestoreDestination, + _ control.Options, + _ []data.RestoreCollection, + _ *fault.Bus, +) (*details.Details, error) { + return gc.Deets, gc.Err +} diff --git a/src/internal/data/metrics.go b/src/internal/data/metrics.go new file mode 100644 index 000000000..f34d20a16 --- /dev/null +++ b/src/internal/data/metrics.go @@ -0,0 +1,17 @@ +package data + +type CollectionStats struct { + Folders, + Objects, + Successes int + Bytes int64 + Details string +} + +func (cs CollectionStats) IsZero() bool { + return cs.Folders+cs.Objects+cs.Successes+int(cs.Bytes) == 0 +} + +func (cs CollectionStats) String() string { + return cs.Details +} diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 89258ec78..eff16e31a 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -93,6 +93,13 @@ func NewWrapper(c *conn) (*Wrapper, error) { return &Wrapper{c}, nil } +// FIXME: Circular references. +// must comply with restore producer and backup consumer +// var ( +// _ inject.BackupConsumer = &Wrapper{} +// _ inject.RestoreProducer = &Wrapper{} +// ) + type Wrapper struct { c *conn } @@ -124,13 +131,13 @@ type PrevRefs struct { Location path.Path } -// BackupCollections takes a set of collections and creates a kopia snapshot +// ConsumeBackupCollections takes a set of collections and creates a kopia snapshot // with the data that they contain. previousSnapshots is used for incremental // backups and should represent the base snapshot from which metadata is sourced // from as well as any incomplete snapshot checkpoints that may contain more // recent data than the base snapshot. The absence of previousSnapshots causes a // complete backup of all data. -func (w Wrapper) BackupCollections( +func (w Wrapper) ConsumeBackupCollections( ctx context.Context, previousSnapshots []IncrementalBase, collections []data.BackupCollection, @@ -143,7 +150,7 @@ func (w Wrapper) BackupCollections( return nil, nil, nil, clues.Stack(errNotConnected).WithClues(ctx) } - ctx, end := diagnostics.Span(ctx, "kopia:backupCollections") + ctx, end := diagnostics.Span(ctx, "kopia:consumeBackupCollections") defer end() if len(collections) == 0 && len(globalExcludeSet) == 0 { @@ -382,21 +389,21 @@ type ByteCounter interface { Count(numBytes int64) } -// RestoreMultipleItems looks up all paths- assuming each is an item declaration, +// ProduceRestoreCollections looks up all paths- assuming each is an item declaration, // not a directory- in the snapshot with id snapshotID. The path should be the // full path of the item from the root. Returns the results as a slice of single- // item DataCollections, where the DataCollection.FullPath() matches the path. // If the item does not exist in kopia or is not a file an error is returned. // The UUID of the returned DataStreams will be the name of the kopia file the // data is sourced from. -func (w Wrapper) RestoreMultipleItems( +func (w Wrapper) ProduceRestoreCollections( ctx context.Context, snapshotID string, paths []path.Path, bcounter ByteCounter, errs *fault.Bus, ) ([]data.RestoreCollection, error) { - ctx, end := diagnostics.Span(ctx, "kopia:restoreMultipleItems") + ctx, end := diagnostics.Span(ctx, "kopia:produceRestoreCollections") defer end() if len(paths) == 0 { diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 2f03a1fb2..bd5d0d724 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -276,7 +276,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { suite.Run(test.name, func() { t := suite.T() - stats, deets, _, err := suite.w.BackupCollections( + stats, deets, _, err := suite.w.ConsumeBackupCollections( suite.ctx, prevSnaps, collections, @@ -423,7 +423,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_NoDetailsForMeta() { t := suite.T() collections := test.cols() - stats, deets, prevShortRefs, err := suite.w.BackupCollections( + stats, deets, prevShortRefs, err := suite.w.ConsumeBackupCollections( suite.ctx, prevSnaps, collections, @@ -525,7 +525,7 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { fp2, err := suite.storePath2.Append(dc2.Names[0], true) require.NoError(t, err, clues.ToCore(err)) - stats, _, _, err := w.BackupCollections( + stats, _, _, err := w.ConsumeBackupCollections( ctx, nil, []data.BackupCollection{dc1, dc2}, @@ -543,7 +543,7 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { fp2.String(): dc2.Data[0], } - result, err := w.RestoreMultipleItems( + result, err := w.ProduceRestoreCollections( ctx, string(stats.SnapshotID), []path.Path{ @@ -644,7 +644,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { }, } - stats, deets, _, err := suite.w.BackupCollections( + stats, deets, _, err := suite.w.ConsumeBackupCollections( suite.ctx, nil, collections, @@ -666,7 +666,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { ic := i64counter{} - _, err = suite.w.RestoreMultipleItems( + _, err = suite.w.ProduceRestoreCollections( suite.ctx, string(stats.SnapshotID), []path.Path{failedPath}, @@ -706,7 +706,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollectionsHandlesNoCollections() ctx, flush := tester.NewContext() defer flush() - s, d, _, err := suite.w.BackupCollections( + s, d, _, err := suite.w.ConsumeBackupCollections( ctx, nil, test.collections, @@ -866,7 +866,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { tags[k] = "" } - stats, deets, _, err := suite.w.BackupCollections( + stats, deets, _, err := suite.w.ConsumeBackupCollections( suite.ctx, nil, collections, @@ -1018,7 +1018,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { } } - stats, _, _, err := suite.w.BackupCollections( + stats, _, _, err := suite.w.ConsumeBackupCollections( suite.ctx, []IncrementalBase{ { @@ -1045,7 +1045,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { ic := i64counter{} - _, err = suite.w.RestoreMultipleItems( + _, err = suite.w.ProduceRestoreCollections( suite.ctx, string(stats.SnapshotID), []path.Path{ @@ -1058,7 +1058,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { } } -func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() { +func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections() { doesntExist, err := path.Build( testTenant, testUser, @@ -1148,7 +1148,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() { ic := i64counter{} - result, err := suite.w.RestoreMultipleItems( + result, err := suite.w.ProduceRestoreCollections( suite.ctx, string(suite.snapshotID), test.inputPaths, @@ -1167,7 +1167,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() { } } -func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems_Errors() { +func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_Errors() { itemPath, err := suite.testPath1.Append(testFileName, true) require.NoError(suite.T(), err, clues.ToCore(err)) @@ -1197,7 +1197,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems_Errors() suite.Run(test.name, func() { t := suite.T() - c, err := suite.w.RestoreMultipleItems( + c, err := suite.w.ProduceRestoreCollections( suite.ctx, test.snapshotID, test.paths, @@ -1219,7 +1219,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestDeleteSnapshot() { itemPath := suite.files[suite.testPath1.String()][0].itemPath ic := i64counter{} - c, err := suite.w.RestoreMultipleItems( + c, err := suite.w.ProduceRestoreCollections( suite.ctx, string(suite.snapshotID), []path.Path{itemPath}, diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 89b223cd2..c97d7a1ec 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -9,14 +9,13 @@ import ( "github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/common/crash" - "github.com/alcionai/corso/src/internal/connector" - "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/observe" + "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/internal/stats" "github.com/alcionai/corso/src/internal/streamstore" "github.com/alcionai/corso/src/pkg/account" @@ -42,6 +41,7 @@ type BackupOperation struct { Version string `json:"version"` account account.Account + bp inject.BackupProducer // when true, this allows for incremental backups instead of full data pulls incremental bool @@ -60,20 +60,21 @@ func NewBackupOperation( opts control.Options, kw *kopia.Wrapper, sw *store.Wrapper, - gc *connector.GraphConnector, + bp inject.BackupProducer, acct account.Account, selector selectors.Selector, ownerName string, bus events.Eventer, ) (BackupOperation, error) { op := BackupOperation{ - operation: newOperation(opts, bus, kw, sw, gc), + operation: newOperation(opts, bus, kw, sw), ResourceOwner: selector.DiscreteOwner, ResourceOwnerName: ownerName, Selectors: selector, Version: "v0", account: acct, incremental: useIncrementalBackup(selector, opts), + bp: bp, } if len(ownerName) == 0 { @@ -92,6 +93,10 @@ func (op BackupOperation) validate() error { return clues.New("backup requires a resource owner") } + if op.bp == nil { + return clues.New("missing backup producer") + } + return op.operation.validate() } @@ -101,7 +106,7 @@ func (op BackupOperation) validate() error { // get populated asynchronously. type backupStats struct { k *kopia.BackupStats - gc *support.ConnectorOperationStatus + gc *data.CollectionStats resourceCount int } @@ -243,14 +248,22 @@ func (op *BackupOperation) do( return nil, clues.Wrap(err, "producing manifests and metadata") } - cs, excludes, err := produceBackupDataCollections(ctx, op.gc, op.Selectors, mdColls, op.Options, op.Errors) + cs, excludes, err := produceBackupDataCollections( + ctx, + op.bp, + op.ResourceOwner, + op.ResourceOwnerName, + op.Selectors, + mdColls, + op.Options, + op.Errors) if err != nil { return nil, clues.Wrap(err, "producing backup data collections") } ctx = clues.Add(ctx, "coll_count", len(cs)) - writeStats, deets, toMerge, err := consumeBackupDataCollections( + writeStats, deets, toMerge, err := consumeBackupCollections( ctx, op.kopia, op.account.ID(), @@ -279,9 +292,9 @@ func (op *BackupOperation) do( return nil, clues.Wrap(err, "merging details") } - opStats.gc = op.gc.AwaitStatus() + opStats.gc = op.bp.Wait() - logger.Ctx(ctx).Debug(op.gc.PrintableStatus()) + logger.Ctx(ctx).Debug(opStats.gc) return deets, nil } @@ -312,7 +325,8 @@ func useIncrementalBackup(sel selectors.Selector, opts control.Options) bool { // calls the producer to generate collections of data to backup func produceBackupDataCollections( ctx context.Context, - gc *connector.GraphConnector, + bp inject.BackupProducer, + ownerID, ownerName string, sel selectors.Selector, metadata []data.RestoreCollection, ctrlOpts control.Options, @@ -325,25 +339,13 @@ func produceBackupDataCollections( closer() }() - return gc.DataCollections(ctx, sel, metadata, ctrlOpts, errs) + return bp.ProduceBackupCollections(ctx, ownerID, ownerName, sel, metadata, ctrlOpts, errs) } // --------------------------------------------------------------------------- // Consumer funcs // --------------------------------------------------------------------------- -type backuper interface { - BackupCollections( - ctx context.Context, - bases []kopia.IncrementalBase, - cs []data.BackupCollection, - excluded map[string]map[string]struct{}, - tags map[string]string, - buildTreeWithBase bool, - errs *fault.Bus, - ) (*kopia.BackupStats, *details.Builder, map[string]kopia.PrevRefs, error) -} - func selectorToReasons(sel selectors.Selector) []kopia.Reason { service := sel.PathService() reasons := []kopia.Reason{} @@ -389,9 +391,9 @@ func builderFromReason(ctx context.Context, tenant string, r kopia.Reason) (*pat } // calls kopia to backup the collections of data -func consumeBackupDataCollections( +func consumeBackupCollections( ctx context.Context, - bu backuper, + bc inject.BackupConsumer, tenantID string, reasons []kopia.Reason, mans []*kopia.ManifestEntry, @@ -465,7 +467,7 @@ func consumeBackupDataCollections( "base_backup_id", mbID) } - kopiaStats, deets, itemsSourcedFromBase, err := bu.BackupCollections( + kopiaStats, deets, itemsSourcedFromBase, err := bc.ConsumeBackupCollections( ctx, bases, cs, @@ -663,11 +665,11 @@ func (op *BackupOperation) persistResults( return clues.New("backup population never completed") } - if op.Status != Failed && opStats.gc.Metrics.Successes == 0 { + if op.Status != Failed && opStats.gc.IsZero() { op.Status = NoData } - op.Results.ItemsRead = opStats.gc.Metrics.Successes + op.Results.ItemsRead = opStats.gc.Successes return op.Errors.Failure() } diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index 3ae3d8a72..744a87837 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -30,6 +30,7 @@ import ( evmock "github.com/alcionai/corso/src/internal/events/mock" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" + "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/version" "github.com/alcionai/corso/src/pkg/account" @@ -288,7 +289,7 @@ func checkMetadataFilesExist( pathsByRef[dir.ShortRef()] = append(pathsByRef[dir.ShortRef()], fName) } - cols, err := kw.RestoreMultipleItems(ctx, bup.SnapshotID, paths, nil, fault.New(true)) + cols, err := kw.ProduceRestoreCollections(ctx, bup.SnapshotID, paths, nil, fault.New(true)) assert.NoError(t, err, clues.ToCore(err)) for _, col := range cols { @@ -383,7 +384,7 @@ func generateContainerOfItems( dest, collections) - deets, err := gc.RestoreDataCollections( + deets, err := gc.ConsumeRestoreCollections( ctx, backupVersion, acct, @@ -394,7 +395,9 @@ func generateContainerOfItems( fault.New(true)) require.NoError(t, err, clues.ToCore(err)) - gc.AwaitStatus() + // have to wait here, both to ensure the process + // finishes, and also to clean up the gc status + gc.Wait() return deets } @@ -539,7 +542,7 @@ func (suite *BackupOpIntegrationSuite) SetupSuite() { func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { kw := &kopia.Wrapper{} sw := &store.Wrapper{} - gc := &connector.GraphConnector{} + gc := &mockconnector.GraphConnector{} acct := tester.NewM365Account(suite.T()) table := []struct { @@ -547,7 +550,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { opts control.Options kw *kopia.Wrapper sw *store.Wrapper - gc *connector.GraphConnector + bp inject.BackupProducer acct account.Account targets []string errCheck assert.ErrorAssertionFunc @@ -555,7 +558,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { {"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError}, {"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error}, {"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error}, - {"missing graphconnector", control.Options{}, kw, sw, nil, acct, nil, assert.Error}, + {"missing backup producer", control.Options{}, kw, sw, nil, acct, nil, assert.Error}, } for _, test := range table { suite.Run(test.name, func() { @@ -567,7 +570,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { test.opts, test.kw, test.sw, - test.gc, + test.bp, test.acct, selectors.Selector{DiscreteOwner: "test"}, "test-name", diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index 74641704a..96f9de9c7 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -14,8 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/alcionai/corso/src/internal/connector" - "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/data" evmock "github.com/alcionai/corso/src/internal/events/mock" "github.com/alcionai/corso/src/internal/kopia" @@ -38,7 +37,7 @@ import ( // ----- restore producer -type mockRestorer struct { +type mockRestoreProducer struct { gotPaths []path.Path colls []data.RestoreCollection collsByID map[string][]data.RestoreCollection // snapshotID: []RestoreCollection @@ -48,7 +47,7 @@ type mockRestorer struct { type restoreFunc func(id string, ps []path.Path) ([]data.RestoreCollection, error) -func (mr *mockRestorer) buildRestoreFunc( +func (mr *mockRestoreProducer) buildRestoreFunc( t *testing.T, oid string, ops []path.Path, @@ -61,7 +60,7 @@ func (mr *mockRestorer) buildRestoreFunc( } } -func (mr *mockRestorer) RestoreMultipleItems( +func (mr *mockRestoreProducer) ProduceRestoreCollections( ctx context.Context, snapshotID string, paths []path.Path, @@ -85,9 +84,9 @@ func checkPaths(t *testing.T, expected, got []path.Path) { assert.ElementsMatch(t, expected, got) } -// ----- backup producer +// ----- backup consumer -type mockBackuper struct { +type mockBackupConsumer struct { checkFunc func( bases []kopia.IncrementalBase, cs []data.BackupCollection, @@ -95,7 +94,7 @@ type mockBackuper struct { buildTreeWithBase bool) } -func (mbu mockBackuper) BackupCollections( +func (mbu mockBackupConsumer) ConsumeBackupCollections( ctx context.Context, bases []kopia.IncrementalBase, cs []data.BackupCollection, @@ -360,7 +359,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() { var ( kw = &kopia.Wrapper{} sw = &store.Wrapper{} - gc = &connector.GraphConnector{} + gc = &mockconnector.GraphConnector{} acct = account.Account{} now = time.Now() ) @@ -381,9 +380,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() { TotalHashedBytes: 1, TotalUploadedBytes: 1, }, - gc: &support.ConnectorOperationStatus{ - Metrics: support.CollectionMetrics{Successes: 1}, - }, + gc: &data.CollectionStats{Successes: 1}, }, }, { @@ -392,7 +389,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() { fail: assert.AnError, stats: backupStats{ k: &kopia.BackupStats{}, - gc: &support.ConnectorOperationStatus{}, + gc: &data.CollectionStats{}, }, }, { @@ -400,7 +397,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() { expectErr: assert.NoError, stats: backupStats{ k: &kopia.BackupStats{}, - gc: &support.ConnectorOperationStatus{}, + gc: &data.CollectionStats{}, }, }, } @@ -427,7 +424,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() { test.expectErr(t, op.persistResults(now, &test.stats)) assert.Equal(t, test.expectStatus.String(), op.Status.String(), "status") - assert.Equal(t, test.stats.gc.Metrics.Successes, op.Results.ItemsRead, "items read") + assert.Equal(t, test.stats.gc.Successes, op.Results.ItemsRead, "items read") assert.Equal(t, test.stats.k.TotalFileCount, op.Results.ItemsWritten, "items written") assert.Equal(t, test.stats.k.TotalHashedBytes, op.Results.BytesRead, "bytes read") assert.Equal(t, test.stats.k.TotalUploadedBytes, op.Results.BytesUploaded, "bytes written") @@ -564,7 +561,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_ConsumeBackupDataCollections ctx, flush := tester.NewContext() defer flush() - mbu := &mockBackuper{ + mbu := &mockBackupConsumer{ checkFunc: func( bases []kopia.IncrementalBase, cs []data.BackupCollection, @@ -576,7 +573,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_ConsumeBackupDataCollections } //nolint:errcheck - consumeBackupDataCollections( + consumeBackupCollections( ctx, mbu, tenant, diff --git a/src/internal/operations/inject/inject.go b/src/internal/operations/inject/inject.go new file mode 100644 index 000000000..dfc0c8cab --- /dev/null +++ b/src/internal/operations/inject/inject.go @@ -0,0 +1,66 @@ +package inject + +import ( + "context" + + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/kopia" + "github.com/alcionai/corso/src/pkg/account" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" + "github.com/alcionai/corso/src/pkg/path" + "github.com/alcionai/corso/src/pkg/selectors" +) + +type ( + BackupProducer interface { + ProduceBackupCollections( + ctx context.Context, + ownerID, ownerName string, + sels selectors.Selector, + metadata []data.RestoreCollection, + ctrlOpts control.Options, + errs *fault.Bus, + ) ([]data.BackupCollection, map[string]map[string]struct{}, error) + + Wait() *data.CollectionStats + } + + BackupConsumer interface { + ConsumeBackupCollections( + ctx context.Context, + bases []kopia.IncrementalBase, + cs []data.BackupCollection, + excluded map[string]map[string]struct{}, + tags map[string]string, + buildTreeWithBase bool, + errs *fault.Bus, + ) (*kopia.BackupStats, *details.Builder, map[string]kopia.PrevRefs, error) + } + + RestoreProducer interface { + ProduceRestoreCollections( + ctx context.Context, + snapshotID string, + paths []path.Path, + bc kopia.ByteCounter, + errs *fault.Bus, + ) ([]data.RestoreCollection, error) + } + + RestoreConsumer interface { + ConsumeRestoreCollections( + ctx context.Context, + backupVersion int, + acct account.Account, + selector selectors.Selector, + dest control.RestoreDestination, + opts control.Options, + dcs []data.RestoreCollection, + errs *fault.Bus, + ) (*details.Details, error) + + Wait() *data.CollectionStats + } +) diff --git a/src/internal/operations/manifests.go b/src/internal/operations/manifests.go index b4505c915..f8f2c5041 100644 --- a/src/internal/operations/manifests.go +++ b/src/internal/operations/manifests.go @@ -11,6 +11,7 @@ import ( "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" + "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/pkg/backup" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" @@ -27,7 +28,7 @@ type manifestFetcher interface { type manifestRestorer interface { manifestFetcher - restorer + inject.RestoreProducer } type getBackuper interface { @@ -173,7 +174,7 @@ func verifyDistinctBases(ctx context.Context, mans []*kopia.ManifestEntry) error // collectMetadata retrieves all metadata files associated with the manifest. func collectMetadata( ctx context.Context, - r restorer, + r inject.RestoreProducer, man *kopia.ManifestEntry, fileNames []string, tenantID string, @@ -201,7 +202,7 @@ func collectMetadata( } } - dcs, err := r.RestoreMultipleItems(ctx, string(man.ID), paths, nil, errs) + dcs, err := r.ProduceRestoreCollections(ctx, string(man.ID), paths, nil, errs) if err != nil { // Restore is best-effort and we want to keep it that way since we want to // return as much metadata as we can to reduce the work we'll need to do. diff --git a/src/internal/operations/manifests_test.go b/src/internal/operations/manifests_test.go index df0c54632..9f2a3b409 100644 --- a/src/internal/operations/manifests_test.go +++ b/src/internal/operations/manifests_test.go @@ -24,9 +24,9 @@ import ( // --------------------------------------------------------------------------- type mockManifestRestorer struct { - mockRestorer + mockRestoreProducer mans []*kopia.ManifestEntry - mrErr error // err varname already claimed by mockRestorer + mrErr error // err varname already claimed by mockRestoreProducer } func (mmr mockManifestRestorer) FetchPrevSnapshotManifests( @@ -225,7 +225,7 @@ func (suite *OperationsManifestsUnitSuite) TestCollectMetadata() { paths := test.expectPaths(t, test.fileNames) - mr := mockRestorer{err: test.expectErr} + mr := mockRestoreProducer{err: test.expectErr} mr.buildRestoreFunc(t, test.manID, paths) man := &kopia.ManifestEntry{ @@ -447,8 +447,8 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { { name: "don't get metadata, no mans", mr: mockManifestRestorer{ - mockRestorer: mockRestorer{}, - mans: []*kopia.ManifestEntry{}, + mockRestoreProducer: mockRestoreProducer{}, + mans: []*kopia.ManifestEntry{}, }, gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, @@ -460,8 +460,8 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { { name: "don't get metadata", mr: mockManifestRestorer{ - mockRestorer: mockRestorer{}, - mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "", "", "")}, + mockRestoreProducer: mockRestoreProducer{}, + mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "", "", "")}, }, gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, @@ -473,8 +473,8 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { { name: "don't get metadata, incomplete manifest", mr: mockManifestRestorer{ - mockRestorer: mockRestorer{}, - mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "", "ir", "")}, + mockRestoreProducer: mockRestoreProducer{}, + mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "", "ir", "")}, }, gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, @@ -486,8 +486,8 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { { name: "fetch manifests errors", mr: mockManifestRestorer{ - mockRestorer: mockRestorer{}, - mrErr: assert.AnError, + mockRestoreProducer: mockRestoreProducer{}, + mrErr: assert.AnError, }, gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, @@ -499,7 +499,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { { name: "verify distinct bases fails", mr: mockManifestRestorer{ - mockRestorer: mockRestorer{}, + mockRestoreProducer: mockRestoreProducer{}, mans: []*kopia.ManifestEntry{ makeMan(path.EmailCategory, "", "", ""), makeMan(path.EmailCategory, "", "", ""), @@ -515,8 +515,8 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { { name: "no manifests", mr: mockManifestRestorer{ - mockRestorer: mockRestorer{}, - mans: []*kopia.ManifestEntry{}, + mockRestoreProducer: mockRestoreProducer{}, + mans: []*kopia.ManifestEntry{}, }, gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, @@ -528,7 +528,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { { name: "only incomplete manifests", mr: mockManifestRestorer{ - mockRestorer: mockRestorer{}, + mockRestoreProducer: mockRestoreProducer{}, mans: []*kopia.ManifestEntry{ makeMan(path.EmailCategory, "", "ir", ""), makeMan(path.ContactsCategory, "", "ir", ""), @@ -544,9 +544,11 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { { name: "man missing backup id", mr: mockManifestRestorer{ - mockRestorer: mockRestorer{collsByID: map[string][]data.RestoreCollection{ - "id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "id_coll"}}}, - }}, + mockRestoreProducer: mockRestoreProducer{ + collsByID: map[string][]data.RestoreCollection{ + "id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "id_coll"}}}, + }, + }, mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "id", "", "")}, }, gb: mockGetBackuper{detailsID: did}, @@ -559,8 +561,8 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { { name: "backup missing details id", mr: mockManifestRestorer{ - mockRestorer: mockRestorer{}, - mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "", "", "bid")}, + mockRestoreProducer: mockRestoreProducer{}, + mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "", "", "bid")}, }, gb: mockGetBackuper{}, reasons: []kopia.Reason{}, @@ -571,10 +573,12 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { { name: "one complete, one incomplete", mr: mockManifestRestorer{ - mockRestorer: mockRestorer{collsByID: map[string][]data.RestoreCollection{ - "id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "id_coll"}}}, - "incmpl_id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "incmpl_id_coll"}}}, - }}, + mockRestoreProducer: mockRestoreProducer{ + collsByID: map[string][]data.RestoreCollection{ + "id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "id_coll"}}}, + "incmpl_id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "incmpl_id_coll"}}}, + }, + }, mans: []*kopia.ManifestEntry{ makeMan(path.EmailCategory, "id", "", "bid"), makeMan(path.EmailCategory, "incmpl_id", "ir", ""), @@ -590,9 +594,11 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { { name: "single valid man", mr: mockManifestRestorer{ - mockRestorer: mockRestorer{collsByID: map[string][]data.RestoreCollection{ - "id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "id_coll"}}}, - }}, + mockRestoreProducer: mockRestoreProducer{ + collsByID: map[string][]data.RestoreCollection{ + "id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "id_coll"}}}, + }, + }, mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "id", "", "bid")}, }, gb: mockGetBackuper{detailsID: did}, @@ -605,10 +611,12 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { { name: "multiple valid mans", mr: mockManifestRestorer{ - mockRestorer: mockRestorer{collsByID: map[string][]data.RestoreCollection{ - "mail": {data.NotFoundRestoreCollection{Collection: mockColl{id: "mail_coll"}}}, - "contact": {data.NotFoundRestoreCollection{Collection: mockColl{id: "contact_coll"}}}, - }}, + mockRestoreProducer: mockRestoreProducer{ + collsByID: map[string][]data.RestoreCollection{ + "mail": {data.NotFoundRestoreCollection{Collection: mockColl{id: "mail_coll"}}}, + "contact": {data.NotFoundRestoreCollection{Collection: mockColl{id: "contact_coll"}}}, + }, + }, mans: []*kopia.ManifestEntry{ makeMan(path.EmailCategory, "mail", "", "bid"), makeMan(path.ContactsCategory, "contact", "", "bid"), @@ -627,8 +635,8 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { { name: "error collecting metadata", mr: mockManifestRestorer{ - mockRestorer: mockRestorer{err: assert.AnError}, - mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "", "", "bid")}, + mockRestoreProducer: mockRestoreProducer{err: assert.AnError}, + mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "", "", "bid")}, }, gb: mockGetBackuper{detailsID: did}, reasons: []kopia.Reason{}, @@ -961,7 +969,7 @@ func (suite *BackupManifestUnitSuite) TestBackupOperation_CollectMetadata() { ctx, flush := tester.NewContext() defer flush() - mr := &mockRestorer{} + mr := &mockRestoreProducer{} _, err := collectMetadata(ctx, mr, test.inputMan, test.inputFiles, tenant, fault.New(true)) assert.NoError(t, err, clues.ToCore(err)) diff --git a/src/internal/operations/operation.go b/src/internal/operations/operation.go index 8dc27c87a..e4cd201db 100644 --- a/src/internal/operations/operation.go +++ b/src/internal/operations/operation.go @@ -5,7 +5,6 @@ import ( "github.com/alcionai/clues" - "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/pkg/control" @@ -57,7 +56,6 @@ type operation struct { bus events.Eventer kopia *kopia.Wrapper store *store.Wrapper - gc *connector.GraphConnector } func newOperation( @@ -65,7 +63,6 @@ func newOperation( bus events.Eventer, kw *kopia.Wrapper, sw *store.Wrapper, - gc *connector.GraphConnector, ) operation { return operation{ CreatedAt: time.Now(), @@ -75,7 +72,6 @@ func newOperation( bus: bus, kopia: kw, store: sw, - gc: gc, Status: InProgress, } @@ -90,9 +86,5 @@ func (op operation) validate() error { return clues.New("missing modelstore") } - if op.gc == nil { - return clues.New("missing graph connector") - } - return nil } diff --git a/src/internal/operations/operation_test.go b/src/internal/operations/operation_test.go index 5d0425022..27cf6185f 100644 --- a/src/internal/operations/operation_test.go +++ b/src/internal/operations/operation_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" - "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/tester" @@ -26,30 +25,27 @@ func TestOperationSuite(t *testing.T) { func (suite *OperationSuite) TestNewOperation() { t := suite.T() - op := newOperation(control.Options{}, events.Bus{}, nil, nil, nil) + op := newOperation(control.Options{}, events.Bus{}, nil, nil) assert.Greater(t, op.CreatedAt, time.Time{}) } func (suite *OperationSuite) TestOperation_Validate() { kwStub := &kopia.Wrapper{} swStub := &store.Wrapper{} - gcStub := &connector.GraphConnector{} table := []struct { name string kw *kopia.Wrapper sw *store.Wrapper - gc *connector.GraphConnector errCheck assert.ErrorAssertionFunc }{ - {"good", kwStub, swStub, gcStub, assert.NoError}, - {"missing kopia wrapper", nil, swStub, gcStub, assert.Error}, - {"missing store wrapper", kwStub, nil, gcStub, assert.Error}, - {"missing graph connector", kwStub, swStub, nil, assert.Error}, + {"good", kwStub, swStub, assert.NoError}, + {"missing kopia wrapper", nil, swStub, assert.Error}, + {"missing store wrapper", kwStub, nil, assert.Error}, } for _, test := range table { suite.Run(test.name, func() { - err := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw, test.gc).validate() + err := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw).validate() test.errCheck(suite.T(), err, clues.ToCore(err)) }) } diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 0365765af..1dac69098 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -10,15 +10,14 @@ import ( "github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/common/crash" - "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector/onedrive" - "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/diagnostics" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/observe" + "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/internal/stats" "github.com/alcionai/corso/src/internal/streamstore" "github.com/alcionai/corso/src/pkg/account" @@ -42,6 +41,7 @@ type RestoreOperation struct { Version string `json:"version"` account account.Account + rc inject.RestoreConsumer } // RestoreResults aggregate the details of the results of the operation. @@ -56,7 +56,7 @@ func NewRestoreOperation( opts control.Options, kw *kopia.Wrapper, sw *store.Wrapper, - gc *connector.GraphConnector, + rc inject.RestoreConsumer, acct account.Account, backupID model.StableID, sel selectors.Selector, @@ -64,12 +64,13 @@ func NewRestoreOperation( bus events.Eventer, ) (RestoreOperation, error) { op := RestoreOperation{ - operation: newOperation(opts, bus, kw, sw, gc), + operation: newOperation(opts, bus, kw, sw), BackupID: backupID, Selectors: sel, Destination: dest, Version: "v0", account: acct, + rc: rc, } if err := op.validate(); err != nil { return RestoreOperation{}, err @@ -79,6 +80,10 @@ func NewRestoreOperation( } func (op RestoreOperation) validate() error { + if op.rc == nil { + return clues.New("missing restore consumer") + } + return op.operation.validate() } @@ -88,7 +93,7 @@ func (op RestoreOperation) validate() error { // get populated asynchronously. type restoreStats struct { cs []data.RestoreCollection - gc *support.ConnectorOperationStatus + gc *data.CollectionStats bytesRead *stats.ByteCounter resourceCount int @@ -96,16 +101,6 @@ type restoreStats struct { restoreID string } -type restorer interface { - RestoreMultipleItems( - ctx context.Context, - snapshotID string, - paths []path.Path, - bc kopia.ByteCounter, - errs *fault.Bus, - ) ([]data.RestoreCollection, error) -} - // Run begins a synchronous restore operation. func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.Details, err error) { defer func() { @@ -222,7 +217,7 @@ func (op *RestoreOperation) do( defer closer() defer close(kopiaComplete) - dcs, err := op.kopia.RestoreMultipleItems(ctx, bup.SnapshotID, paths, opStats.bytesRead, op.Errors) + dcs, err := op.kopia.ProduceRestoreCollections(ctx, bup.SnapshotID, paths, opStats.bytesRead, op.Errors) if err != nil { return nil, clues.Wrap(err, "producing collections to restore") } @@ -235,12 +230,9 @@ func (op *RestoreOperation) do( opStats.resourceCount = 1 opStats.cs = dcs - restoreComplete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Restoring data")) - defer closer() - defer close(restoreComplete) - - restoreDetails, err := op.gc.RestoreDataCollections( + deets, err = consumeRestoreCollections( ctx, + op.rc, bup.Version, op.account, op.Selectors, @@ -252,13 +244,11 @@ func (op *RestoreOperation) do( return nil, clues.Wrap(err, "restoring collections") } - restoreComplete <- struct{}{} + opStats.gc = op.rc.Wait() - opStats.gc = op.gc.AwaitStatus() + logger.Ctx(ctx).Debug(opStats.gc) - logger.Ctx(ctx).Debug(op.gc.PrintableStatus()) - - return restoreDetails, nil + return deets, nil } // persists details and statistics about the restore operation. @@ -285,11 +275,11 @@ func (op *RestoreOperation) persistResults( return clues.New("restoration never completed") } - if op.Status != Failed && opStats.gc.Metrics.Successes == 0 { + if op.Status != Failed && opStats.gc.IsZero() { op.Status = NoData } - op.Results.ItemsWritten = opStats.gc.Metrics.Successes + op.Results.ItemsWritten = opStats.gc.Successes op.bus.Event( ctx, @@ -312,6 +302,44 @@ func (op *RestoreOperation) persistResults( return op.Errors.Failure() } +// --------------------------------------------------------------------------- +// Restorer funcs +// --------------------------------------------------------------------------- + +func consumeRestoreCollections( + ctx context.Context, + rc inject.RestoreConsumer, + backupVersion int, + acct account.Account, + sel selectors.Selector, + dest control.RestoreDestination, + opts control.Options, + dcs []data.RestoreCollection, + errs *fault.Bus, +) (*details.Details, error) { + complete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Restoring data")) + defer func() { + complete <- struct{}{} + close(complete) + closer() + }() + + deets, err := rc.ConsumeRestoreCollections( + ctx, + backupVersion, + acct, + sel, + dest, + opts, + dcs, + errs) + if err != nil { + return nil, clues.Wrap(err, "restoring collections") + } + + return deets, nil +} + // formatDetailsForRestoration reduces the provided detail entries according to the // selector specifications. func formatDetailsForRestoration( diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index 0ffbf587e..4344e08ee 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -16,12 +16,12 @@ import ( "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/connector/onedrive/api" - "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/events" evmock "github.com/alcionai/corso/src/internal/events/mock" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/model" + "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/internal/stats" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/account" @@ -50,7 +50,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { var ( kw = &kopia.Wrapper{} sw = &store.Wrapper{} - gc = &connector.GraphConnector{} + gc = &mockconnector.GraphConnector{} acct = account.Account{} now = time.Now() dest = tester.DefaultTestRestoreDestination() @@ -75,11 +75,9 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { Collection: &mockconnector.MockExchangeDataCollection{}, }, }, - gc: &support.ConnectorOperationStatus{ - Metrics: support.CollectionMetrics{ - Objects: 1, - Successes: 1, - }, + gc: &data.CollectionStats{ + Objects: 1, + Successes: 1, }, }, }, @@ -89,7 +87,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { fail: assert.AnError, stats: restoreStats{ bytesRead: &stats.ByteCounter{}, - gc: &support.ConnectorOperationStatus{}, + gc: &data.CollectionStats{}, }, }, { @@ -98,7 +96,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { stats: restoreStats{ bytesRead: &stats.ByteCounter{}, cs: []data.RestoreCollection{}, - gc: &support.ConnectorOperationStatus{}, + gc: &data.CollectionStats{}, }, }, } @@ -126,7 +124,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { assert.Equal(t, test.expectStatus.String(), op.Status.String(), "status") assert.Equal(t, len(test.stats.cs), op.Results.ItemsRead, "items read") - assert.Equal(t, test.stats.gc.Metrics.Successes, op.Results.ItemsWritten, "items written") + assert.Equal(t, test.stats.gc.Successes, op.Results.ItemsWritten, "items written") assert.Equal(t, test.stats.bytesRead.NumBytes, op.Results.BytesRead, "resource owners") assert.Equal(t, test.stats.resourceCount, op.Results.ResourceOwners, "resource owners") assert.Equal(t, now, op.Results.StartedAt, "started at") @@ -217,7 +215,7 @@ func (suite *RestoreOpIntegrationSuite) TearDownSuite() { func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { kw := &kopia.Wrapper{} sw := &store.Wrapper{} - gc := &connector.GraphConnector{} + gc := &mockconnector.GraphConnector{} acct := tester.NewM365Account(suite.T()) dest := tester.DefaultTestRestoreDestination() @@ -226,7 +224,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { opts control.Options kw *kopia.Wrapper sw *store.Wrapper - gc *connector.GraphConnector + rc inject.RestoreConsumer acct account.Account targets []string errCheck assert.ErrorAssertionFunc @@ -234,7 +232,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { {"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError}, {"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error}, {"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error}, - {"missing graphConnector", control.Options{}, kw, sw, nil, acct, nil, assert.Error}, + {"missing restore consumer", control.Options{}, kw, sw, nil, acct, nil, assert.Error}, } for _, test := range table { suite.Run(test.name, func() { @@ -246,7 +244,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { test.opts, test.kw, test.sw, - test.gc, + test.rc, test.acct, "backup-id", selectors.Selector{DiscreteOwner: "test"}, diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index ec728df67..57fe5b8f1 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -11,8 +11,8 @@ import ( "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/kopia" + "github.com/alcionai/corso/src/internal/operations/inject" "github.com/alcionai/corso/src/internal/stats" - "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -221,26 +221,14 @@ func collect( return &dc, nil } -type backuper interface { - BackupCollections( - ctx context.Context, - bases []kopia.IncrementalBase, - cs []data.BackupCollection, - globalExcludeSet map[string]map[string]struct{}, - tags map[string]string, - buildTreeWithBase bool, - errs *fault.Bus, - ) (*kopia.BackupStats, *details.Builder, map[string]kopia.PrevRefs, error) -} - // write persists bytes to the store func write( ctx context.Context, - bup backuper, + bup inject.BackupConsumer, dbcs []data.BackupCollection, errs *fault.Bus, ) (string, error) { - backupStats, _, _, err := bup.BackupCollections( + backupStats, _, _, err := bup.ConsumeBackupCollections( ctx, nil, dbcs, @@ -255,16 +243,6 @@ func write( return backupStats.SnapshotID, nil } -type restorer interface { - RestoreMultipleItems( - ctx context.Context, - snapshotID string, - paths []path.Path, - bc kopia.ByteCounter, - errs *fault.Bus, - ) ([]data.RestoreCollection, error) -} - // read retrieves an object from the store func read( ctx context.Context, @@ -272,7 +250,7 @@ func read( tenantID string, service path.ServiceType, col Collectable, - rer restorer, + rer inject.RestoreProducer, errs *fault.Bus, ) error { // construct the path of the container @@ -285,7 +263,7 @@ func read( ctx = clues.Add(ctx, "snapshot_id", snapshotID) - cs, err := rer.RestoreMultipleItems( + cs, err := rer.ProduceRestoreCollections( ctx, snapshotID, []path.Path{p},