replace graphConnector with interface
Replaces the operations graphConnector reference with an interface. Restore and Backups have separate, unique interfaces.
This commit is contained in:
parent
43e16b6ab2
commit
feddd9b183
@ -27,19 +27,23 @@ import (
|
|||||||
// Data Collections
|
// Data Collections
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
// DataCollections utility function to launch backup operations for exchange and
|
// ProduceBackupCollections generates a slice of backup collections for the service
|
||||||
// onedrive. metadataCols contains any collections with metadata files that may
|
// specified in the selectors.
|
||||||
// be useful for the current backup. Metadata can include things like delta
|
// The metadata field can include things like delta tokens or the previous backup's
|
||||||
// tokens or the previous backup's folder hierarchy. The absence of metadataCols
|
// folder hierarchy. The absence of metadata causes the collection creation to ignore
|
||||||
// results in all data being pulled.
|
// prior history (ie, incrementals) and run a full backup.
|
||||||
func (gc *GraphConnector) DataCollections(
|
func (gc *GraphConnector) ProduceBackupCollections(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
ownerID, ownerName string,
|
||||||
sels selectors.Selector,
|
sels selectors.Selector,
|
||||||
metadata []data.RestoreCollection,
|
metadata []data.RestoreCollection,
|
||||||
ctrlOpts control.Options,
|
ctrlOpts control.Options,
|
||||||
errs *fault.Bus,
|
errs *fault.Bus,
|
||||||
) ([]data.BackupCollection, map[string]map[string]struct{}, error) {
|
) ([]data.BackupCollection, map[string]map[string]struct{}, error) {
|
||||||
ctx, end := diagnostics.Span(ctx, "gc:dataCollections", diagnostics.Index("service", sels.Service.String()))
|
ctx, end := diagnostics.Span(
|
||||||
|
ctx,
|
||||||
|
"gc:produceBackupCollections",
|
||||||
|
diagnostics.Index("service", sels.Service.String()))
|
||||||
defer end()
|
defer end()
|
||||||
|
|
||||||
err := verifyBackupInputs(sels, gc.GetSiteIDs())
|
err := verifyBackupInputs(sels, gc.GetSiteIDs())
|
||||||
@ -188,10 +192,10 @@ func checkServiceEnabled(
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RestoreDataCollections restores data from the specified collections
|
// ConsumeRestoreCollections restores data from the specified collections
|
||||||
// into M365 using the GraphAPI.
|
// into M365 using the GraphAPI.
|
||||||
// SideEffect: gc.status is updated at the completion of operation
|
// SideEffect: gc.status is updated at the completion of operation
|
||||||
func (gc *GraphConnector) RestoreDataCollections(
|
func (gc *GraphConnector) ConsumeRestoreCollections(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
backupVersion int,
|
backupVersion int,
|
||||||
acct account.Account,
|
acct account.Account,
|
||||||
|
|||||||
@ -129,7 +129,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestExchangeDataCollection
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
status := connector.AwaitStatus()
|
status := connector.Wait()
|
||||||
assert.NotZero(t, status.Metrics.Successes)
|
assert.NotZero(t, status.Metrics.Successes)
|
||||||
t.Log(status.String())
|
t.Log(status.String())
|
||||||
})
|
})
|
||||||
@ -205,8 +205,9 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestDataCollections_invali
|
|||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
t := suite.T()
|
t := suite.T()
|
||||||
|
|
||||||
collections, excludes, err := connector.DataCollections(
|
collections, excludes, err := connector.ProduceBackupCollections(
|
||||||
ctx,
|
ctx,
|
||||||
|
owners[0], owners[0],
|
||||||
test.getSelector(t),
|
test.getSelector(t),
|
||||||
nil,
|
nil,
|
||||||
control.Options{},
|
control.Options{},
|
||||||
@ -286,7 +287,7 @@ func (suite *ConnectorDataCollectionIntegrationSuite) TestSharePointDataCollecti
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
status := connector.AwaitStatus()
|
status := connector.Wait()
|
||||||
assert.NotZero(t, status.Metrics.Successes)
|
assert.NotZero(t, status.Metrics.Successes)
|
||||||
t.Log(status.String())
|
t.Log(status.String())
|
||||||
})
|
})
|
||||||
@ -336,8 +337,9 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) TestCreateShar
|
|||||||
sel := selectors.NewSharePointBackup(siteIDs)
|
sel := selectors.NewSharePointBackup(siteIDs)
|
||||||
sel.Include(sel.LibraryFolders([]string{"foo"}, selectors.PrefixMatch()))
|
sel.Include(sel.LibraryFolders([]string{"foo"}, selectors.PrefixMatch()))
|
||||||
|
|
||||||
cols, excludes, err := gc.DataCollections(
|
cols, excludes, err := gc.ProduceBackupCollections(
|
||||||
ctx,
|
ctx,
|
||||||
|
siteIDs[0], siteIDs[0],
|
||||||
sel.Selector,
|
sel.Selector,
|
||||||
nil,
|
nil,
|
||||||
control.Options{},
|
control.Options{},
|
||||||
@ -374,8 +376,9 @@ func (suite *ConnectorCreateSharePointCollectionIntegrationSuite) TestCreateShar
|
|||||||
sel := selectors.NewSharePointBackup(siteIDs)
|
sel := selectors.NewSharePointBackup(siteIDs)
|
||||||
sel.Include(sel.Lists(selectors.Any(), selectors.PrefixMatch()))
|
sel.Include(sel.Lists(selectors.Any(), selectors.PrefixMatch()))
|
||||||
|
|
||||||
cols, excludes, err := gc.DataCollections(
|
cols, excludes, err := gc.ProduceBackupCollections(
|
||||||
ctx,
|
ctx,
|
||||||
|
siteIDs[0], siteIDs[0],
|
||||||
sel.Selector,
|
sel.Selector,
|
||||||
nil,
|
nil,
|
||||||
control.Options{},
|
control.Options{},
|
||||||
|
|||||||
@ -223,7 +223,7 @@ func (gc *GraphConnector) UnionSiteIDsAndWebURLs(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// AwaitStatus waits for all gc tasks to complete and then returns status
|
// AwaitStatus waits for all gc tasks to complete and then returns status
|
||||||
func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus {
|
func (gc *GraphConnector) Wait() *support.ConnectorOperationStatus {
|
||||||
defer func() {
|
defer func() {
|
||||||
if gc.region != nil {
|
if gc.region != nil {
|
||||||
gc.region.End()
|
gc.region.End()
|
||||||
|
|||||||
@ -111,8 +111,7 @@ func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_Status() {
|
|||||||
go statusTestTask(&gc, 4, 1, 1)
|
go statusTestTask(&gc, 4, 1, 1)
|
||||||
go statusTestTask(&gc, 4, 1, 1)
|
go statusTestTask(&gc, 4, 1, 1)
|
||||||
|
|
||||||
status := gc.AwaitStatus()
|
status := gc.Wait()
|
||||||
|
|
||||||
t := suite.T()
|
t := suite.T()
|
||||||
|
|
||||||
assert.NotEmpty(t, gc.PrintableStatus())
|
assert.NotEmpty(t, gc.PrintableStatus())
|
||||||
|
|||||||
@ -256,7 +256,7 @@ func (suite *GraphConnectorIntegrationSuite) TestRestoreFailsBadService() {
|
|||||||
assert.Error(t, err, clues.ToCore(err))
|
assert.Error(t, err, clues.ToCore(err))
|
||||||
assert.NotNil(t, deets)
|
assert.NotNil(t, deets)
|
||||||
|
|
||||||
status := suite.connector.AwaitStatus()
|
status := suite.connector.Wait()
|
||||||
assert.Equal(t, 0, status.Metrics.Objects)
|
assert.Equal(t, 0, status.Metrics.Objects)
|
||||||
assert.Equal(t, 0, status.Folders)
|
assert.Equal(t, 0, status.Folders)
|
||||||
assert.Equal(t, 0, status.Metrics.Successes)
|
assert.Equal(t, 0, status.Metrics.Successes)
|
||||||
@ -335,7 +335,7 @@ func (suite *GraphConnectorIntegrationSuite) TestEmptyCollections() {
|
|||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
assert.NotNil(t, deets)
|
assert.NotNil(t, deets)
|
||||||
|
|
||||||
stats := suite.connector.AwaitStatus()
|
stats := suite.connector.Wait()
|
||||||
assert.Zero(t, stats.Metrics.Objects)
|
assert.Zero(t, stats.Metrics.Objects)
|
||||||
assert.Zero(t, stats.Folders)
|
assert.Zero(t, stats.Folders)
|
||||||
assert.Zero(t, stats.Metrics.Successes)
|
assert.Zero(t, stats.Metrics.Successes)
|
||||||
@ -412,7 +412,7 @@ func runRestore(
|
|||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
assert.NotNil(t, deets)
|
assert.NotNil(t, deets)
|
||||||
|
|
||||||
status := restoreGC.AwaitStatus()
|
status := restoreGC.Wait()
|
||||||
runTime := time.Since(start)
|
runTime := time.Since(start)
|
||||||
|
|
||||||
assert.Equal(t, numRestoreItems, status.Metrics.Objects, "restored status.Metrics.Objects")
|
assert.Equal(t, numRestoreItems, status.Metrics.Objects, "restored status.Metrics.Objects")
|
||||||
@ -457,8 +457,10 @@ func runBackupAndCompare(
|
|||||||
t.Logf("Selective backup of %s\n", backupSel)
|
t.Logf("Selective backup of %s\n", backupSel)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
dcs, excludes, err := backupGC.DataCollections(
|
dcs, excludes, err := backupGC.ProduceBackupCollections(
|
||||||
ctx,
|
ctx,
|
||||||
|
backupSel.DiscreteOwner,
|
||||||
|
backupSel.DiscreteOwner,
|
||||||
backupSel,
|
backupSel,
|
||||||
nil,
|
nil,
|
||||||
config.opts,
|
config.opts,
|
||||||
@ -480,7 +482,7 @@ func runBackupAndCompare(
|
|||||||
config.dest,
|
config.dest,
|
||||||
config.opts.RestorePermissions)
|
config.opts.RestorePermissions)
|
||||||
|
|
||||||
status := backupGC.AwaitStatus()
|
status := backupGC.Wait()
|
||||||
|
|
||||||
assert.Equalf(t, totalItems+skipped, status.Metrics.Objects,
|
assert.Equalf(t, totalItems+skipped, status.Metrics.Objects,
|
||||||
"backup status.Metrics.Objects; wanted %d items + %d skipped", totalItems, skipped)
|
"backup status.Metrics.Objects; wanted %d items + %d skipped", totalItems, skipped)
|
||||||
@ -979,7 +981,7 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames
|
|||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
require.NotNil(t, deets)
|
require.NotNil(t, deets)
|
||||||
|
|
||||||
status := restoreGC.AwaitStatus()
|
status := restoreGC.Wait()
|
||||||
// Always just 1 because it's just 1 collection.
|
// Always just 1 because it's just 1 collection.
|
||||||
assert.Equal(t, totalItems, status.Metrics.Objects, "status.Metrics.Objects")
|
assert.Equal(t, totalItems, status.Metrics.Objects, "status.Metrics.Objects")
|
||||||
assert.Equal(t, totalItems, status.Metrics.Successes, "status.Metrics.Successes")
|
assert.Equal(t, totalItems, status.Metrics.Successes, "status.Metrics.Successes")
|
||||||
@ -996,8 +998,10 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames
|
|||||||
backupSel := backupSelectorForExpected(t, test.service, expectedDests)
|
backupSel := backupSelectorForExpected(t, test.service, expectedDests)
|
||||||
t.Log("Selective backup of", backupSel)
|
t.Log("Selective backup of", backupSel)
|
||||||
|
|
||||||
dcs, excludes, err := backupGC.DataCollections(
|
dcs, excludes, err := backupGC.ProduceBackupCollections(
|
||||||
ctx,
|
ctx,
|
||||||
|
backupSel.DiscreteOwner,
|
||||||
|
backupSel.DiscreteOwner,
|
||||||
backupSel,
|
backupSel,
|
||||||
nil,
|
nil,
|
||||||
control.Options{
|
control.Options{
|
||||||
@ -1023,7 +1027,7 @@ func (suite *GraphConnectorIntegrationSuite) TestMultiFolderBackupDifferentNames
|
|||||||
control.RestoreDestination{},
|
control.RestoreDestination{},
|
||||||
true)
|
true)
|
||||||
|
|
||||||
status := backupGC.AwaitStatus()
|
status := backupGC.Wait()
|
||||||
assert.Equal(t, allItems+skipped, status.Metrics.Objects, "status.Metrics.Objects")
|
assert.Equal(t, allItems+skipped, status.Metrics.Objects, "status.Metrics.Objects")
|
||||||
assert.Equal(t, allItems+skipped, status.Metrics.Successes, "status.Metrics.Successes")
|
assert.Equal(t, allItems+skipped, status.Metrics.Successes, "status.Metrics.Successes")
|
||||||
})
|
})
|
||||||
|
|||||||
56
src/internal/connector/mockconnector/mock_data_connector.go
Normal file
56
src/internal/connector/mockconnector/mock_data_connector.go
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
package mockconnector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
|
"github.com/alcionai/corso/src/pkg/account"
|
||||||
|
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||||
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
|
"github.com/alcionai/corso/src/pkg/fault"
|
||||||
|
"github.com/alcionai/corso/src/pkg/selectors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type GraphConnector struct {
|
||||||
|
Collections []data.BackupCollection
|
||||||
|
Exclude map[string]map[string]struct{}
|
||||||
|
|
||||||
|
Deets *details.Details
|
||||||
|
|
||||||
|
Err error
|
||||||
|
|
||||||
|
Status *support.ConnectorOperationStatus
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gc GraphConnector) ProduceBackupCollections(
|
||||||
|
_ context.Context,
|
||||||
|
_, _ string,
|
||||||
|
_ selectors.Selector,
|
||||||
|
_ []data.RestoreCollection,
|
||||||
|
_ control.Options,
|
||||||
|
_ *fault.Bus,
|
||||||
|
) (
|
||||||
|
[]data.BackupCollection,
|
||||||
|
map[string]map[string]struct{},
|
||||||
|
error,
|
||||||
|
) {
|
||||||
|
return gc.Collections, gc.Exclude, gc.Err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gc GraphConnector) Wait() *support.ConnectorOperationStatus {
|
||||||
|
return gc.Status
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gc GraphConnector) ConsumeRestoreCollections(
|
||||||
|
_ context.Context,
|
||||||
|
_ int,
|
||||||
|
_ account.Account,
|
||||||
|
_ selectors.Selector,
|
||||||
|
_ control.RestoreDestination,
|
||||||
|
_ control.Options,
|
||||||
|
_ []data.RestoreCollection,
|
||||||
|
_ *fault.Bus,
|
||||||
|
) (*details.Details, error) {
|
||||||
|
return gc.Deets, gc.Err
|
||||||
|
}
|
||||||
@ -124,13 +124,13 @@ type PrevRefs struct {
|
|||||||
Location path.Path
|
Location path.Path
|
||||||
}
|
}
|
||||||
|
|
||||||
// BackupCollections takes a set of collections and creates a kopia snapshot
|
// ConsumeBackupCollections takes a set of collections and creates a kopia snapshot
|
||||||
// with the data that they contain. previousSnapshots is used for incremental
|
// with the data that they contain. previousSnapshots is used for incremental
|
||||||
// backups and should represent the base snapshot from which metadata is sourced
|
// backups and should represent the base snapshot from which metadata is sourced
|
||||||
// from as well as any incomplete snapshot checkpoints that may contain more
|
// from as well as any incomplete snapshot checkpoints that may contain more
|
||||||
// recent data than the base snapshot. The absence of previousSnapshots causes a
|
// recent data than the base snapshot. The absence of previousSnapshots causes a
|
||||||
// complete backup of all data.
|
// complete backup of all data.
|
||||||
func (w Wrapper) BackupCollections(
|
func (w Wrapper) ConsumeBackupCollections(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
previousSnapshots []IncrementalBase,
|
previousSnapshots []IncrementalBase,
|
||||||
collections []data.BackupCollection,
|
collections []data.BackupCollection,
|
||||||
@ -143,7 +143,7 @@ func (w Wrapper) BackupCollections(
|
|||||||
return nil, nil, nil, clues.Stack(errNotConnected).WithClues(ctx)
|
return nil, nil, nil, clues.Stack(errNotConnected).WithClues(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, end := diagnostics.Span(ctx, "kopia:backupCollections")
|
ctx, end := diagnostics.Span(ctx, "kopia:consumeBackupCollections")
|
||||||
defer end()
|
defer end()
|
||||||
|
|
||||||
if len(collections) == 0 && len(globalExcludeSet) == 0 {
|
if len(collections) == 0 && len(globalExcludeSet) == 0 {
|
||||||
|
|||||||
@ -276,7 +276,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
|
|||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
t := suite.T()
|
t := suite.T()
|
||||||
|
|
||||||
stats, deets, _, err := suite.w.BackupCollections(
|
stats, deets, _, err := suite.w.ConsumeBackupCollections(
|
||||||
suite.ctx,
|
suite.ctx,
|
||||||
prevSnaps,
|
prevSnaps,
|
||||||
collections,
|
collections,
|
||||||
@ -423,7 +423,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_NoDetailsForMeta() {
|
|||||||
t := suite.T()
|
t := suite.T()
|
||||||
collections := test.cols()
|
collections := test.cols()
|
||||||
|
|
||||||
stats, deets, prevShortRefs, err := suite.w.BackupCollections(
|
stats, deets, prevShortRefs, err := suite.w.ConsumeBackupCollections(
|
||||||
suite.ctx,
|
suite.ctx,
|
||||||
prevSnaps,
|
prevSnaps,
|
||||||
collections,
|
collections,
|
||||||
@ -525,7 +525,7 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() {
|
|||||||
fp2, err := suite.storePath2.Append(dc2.Names[0], true)
|
fp2, err := suite.storePath2.Append(dc2.Names[0], true)
|
||||||
require.NoError(t, err, clues.ToCore(err))
|
require.NoError(t, err, clues.ToCore(err))
|
||||||
|
|
||||||
stats, _, _, err := w.BackupCollections(
|
stats, _, _, err := w.ConsumeBackupCollections(
|
||||||
ctx,
|
ctx,
|
||||||
nil,
|
nil,
|
||||||
[]data.BackupCollection{dc1, dc2},
|
[]data.BackupCollection{dc1, dc2},
|
||||||
@ -644,7 +644,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
stats, deets, _, err := suite.w.BackupCollections(
|
stats, deets, _, err := suite.w.ConsumeBackupCollections(
|
||||||
suite.ctx,
|
suite.ctx,
|
||||||
nil,
|
nil,
|
||||||
collections,
|
collections,
|
||||||
@ -706,7 +706,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollectionsHandlesNoCollections()
|
|||||||
ctx, flush := tester.NewContext()
|
ctx, flush := tester.NewContext()
|
||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
s, d, _, err := suite.w.BackupCollections(
|
s, d, _, err := suite.w.ConsumeBackupCollections(
|
||||||
ctx,
|
ctx,
|
||||||
nil,
|
nil,
|
||||||
test.collections,
|
test.collections,
|
||||||
@ -866,7 +866,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
|
|||||||
tags[k] = ""
|
tags[k] = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
stats, deets, _, err := suite.w.BackupCollections(
|
stats, deets, _, err := suite.w.ConsumeBackupCollections(
|
||||||
suite.ctx,
|
suite.ctx,
|
||||||
nil,
|
nil,
|
||||||
collections,
|
collections,
|
||||||
@ -1018,7 +1018,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stats, _, _, err := suite.w.BackupCollections(
|
stats, _, _, err := suite.w.ConsumeBackupCollections(
|
||||||
suite.ctx,
|
suite.ctx,
|
||||||
[]IncrementalBase{
|
[]IncrementalBase{
|
||||||
{
|
{
|
||||||
|
|||||||
@ -9,7 +9,6 @@ import (
|
|||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/common"
|
"github.com/alcionai/corso/src/internal/common"
|
||||||
"github.com/alcionai/corso/src/internal/common/crash"
|
"github.com/alcionai/corso/src/internal/common/crash"
|
||||||
"github.com/alcionai/corso/src/internal/connector"
|
|
||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
"github.com/alcionai/corso/src/internal/diagnostics"
|
"github.com/alcionai/corso/src/internal/diagnostics"
|
||||||
@ -42,6 +41,7 @@ type BackupOperation struct {
|
|||||||
Version string `json:"version"`
|
Version string `json:"version"`
|
||||||
|
|
||||||
account account.Account
|
account account.Account
|
||||||
|
bp BackupProducer
|
||||||
|
|
||||||
// when true, this allows for incremental backups instead of full data pulls
|
// when true, this allows for incremental backups instead of full data pulls
|
||||||
incremental bool
|
incremental bool
|
||||||
@ -60,14 +60,14 @@ func NewBackupOperation(
|
|||||||
opts control.Options,
|
opts control.Options,
|
||||||
kw *kopia.Wrapper,
|
kw *kopia.Wrapper,
|
||||||
sw *store.Wrapper,
|
sw *store.Wrapper,
|
||||||
gc *connector.GraphConnector,
|
bp BackupProducer,
|
||||||
acct account.Account,
|
acct account.Account,
|
||||||
selector selectors.Selector,
|
selector selectors.Selector,
|
||||||
ownerName string,
|
ownerName string,
|
||||||
bus events.Eventer,
|
bus events.Eventer,
|
||||||
) (BackupOperation, error) {
|
) (BackupOperation, error) {
|
||||||
op := BackupOperation{
|
op := BackupOperation{
|
||||||
operation: newOperation(opts, bus, kw, sw, gc),
|
operation: newOperation(opts, bus, kw, sw),
|
||||||
ResourceOwner: selector.DiscreteOwner,
|
ResourceOwner: selector.DiscreteOwner,
|
||||||
ResourceOwnerName: ownerName,
|
ResourceOwnerName: ownerName,
|
||||||
Selectors: selector,
|
Selectors: selector,
|
||||||
@ -92,6 +92,10 @@ func (op BackupOperation) validate() error {
|
|||||||
return clues.New("backup requires a resource owner")
|
return clues.New("backup requires a resource owner")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if op.bp == nil {
|
||||||
|
return errors.New("missing backup producer")
|
||||||
|
}
|
||||||
|
|
||||||
return op.operation.validate()
|
return op.operation.validate()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,14 +247,22 @@ func (op *BackupOperation) do(
|
|||||||
return nil, clues.Wrap(err, "producing manifests and metadata")
|
return nil, clues.Wrap(err, "producing manifests and metadata")
|
||||||
}
|
}
|
||||||
|
|
||||||
cs, excludes, err := produceBackupDataCollections(ctx, op.gc, op.Selectors, mdColls, op.Options, op.Errors)
|
cs, excludes, err := produceBackupDataCollections(
|
||||||
|
ctx,
|
||||||
|
op.bp,
|
||||||
|
op.ResourceOwner,
|
||||||
|
op.ResourceOwnerName,
|
||||||
|
op.Selectors,
|
||||||
|
mdColls,
|
||||||
|
op.Options,
|
||||||
|
op.Errors)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, clues.Wrap(err, "producing backup data collections")
|
return nil, clues.Wrap(err, "producing backup data collections")
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx = clues.Add(ctx, "coll_count", len(cs))
|
ctx = clues.Add(ctx, "coll_count", len(cs))
|
||||||
|
|
||||||
writeStats, deets, toMerge, err := consumeBackupDataCollections(
|
writeStats, deets, toMerge, err := consumeBackupCollections(
|
||||||
ctx,
|
ctx,
|
||||||
op.kopia,
|
op.kopia,
|
||||||
op.account.ID(),
|
op.account.ID(),
|
||||||
@ -279,9 +291,9 @@ func (op *BackupOperation) do(
|
|||||||
return nil, clues.Wrap(err, "merging details")
|
return nil, clues.Wrap(err, "merging details")
|
||||||
}
|
}
|
||||||
|
|
||||||
opStats.gc = op.gc.AwaitStatus()
|
opStats.gc = op.bp.Wait()
|
||||||
|
|
||||||
logger.Ctx(ctx).Debug(op.gc.PrintableStatus())
|
logger.Ctx(ctx).Debug(opStats.gc)
|
||||||
|
|
||||||
return deets, nil
|
return deets, nil
|
||||||
}
|
}
|
||||||
@ -309,10 +321,25 @@ func useIncrementalBackup(sel selectors.Selector, opts control.Options) bool {
|
|||||||
// Producer funcs
|
// Producer funcs
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type BackupProducer interface {
|
||||||
|
ProduceBackupCollections(
|
||||||
|
ctx context.Context,
|
||||||
|
ownerID, ownerName string,
|
||||||
|
sels selectors.Selector,
|
||||||
|
metadata []data.RestoreCollection,
|
||||||
|
ctrlOpts control.Options,
|
||||||
|
errs *fault.Bus,
|
||||||
|
) ([]data.BackupCollection, map[string]map[string]struct{}, error)
|
||||||
|
// TODO: ConnectorOperationStatus should be replaced with something
|
||||||
|
// more generic.
|
||||||
|
Wait() *support.ConnectorOperationStatus
|
||||||
|
}
|
||||||
|
|
||||||
// calls the producer to generate collections of data to backup
|
// calls the producer to generate collections of data to backup
|
||||||
func produceBackupDataCollections(
|
func produceBackupDataCollections(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
gc *connector.GraphConnector,
|
bp BackupProducer,
|
||||||
|
ownerID, ownerName string,
|
||||||
sel selectors.Selector,
|
sel selectors.Selector,
|
||||||
metadata []data.RestoreCollection,
|
metadata []data.RestoreCollection,
|
||||||
ctrlOpts control.Options,
|
ctrlOpts control.Options,
|
||||||
@ -325,15 +352,15 @@ func produceBackupDataCollections(
|
|||||||
closer()
|
closer()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return gc.DataCollections(ctx, sel, metadata, ctrlOpts, errs)
|
return bp.ProduceBackupCollections(ctx, ownerID, ownerName, sel, metadata, ctrlOpts, errs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// Consumer funcs
|
// Consumer funcs
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
type backuper interface {
|
type BackupConsumer interface {
|
||||||
BackupCollections(
|
ConsumeBackupCollections(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
bases []kopia.IncrementalBase,
|
bases []kopia.IncrementalBase,
|
||||||
cs []data.BackupCollection,
|
cs []data.BackupCollection,
|
||||||
@ -389,9 +416,9 @@ func builderFromReason(ctx context.Context, tenant string, r kopia.Reason) (*pat
|
|||||||
}
|
}
|
||||||
|
|
||||||
// calls kopia to backup the collections of data
|
// calls kopia to backup the collections of data
|
||||||
func consumeBackupDataCollections(
|
func consumeBackupCollections(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
bu backuper,
|
bc BackupConsumer,
|
||||||
tenantID string,
|
tenantID string,
|
||||||
reasons []kopia.Reason,
|
reasons []kopia.Reason,
|
||||||
mans []*kopia.ManifestEntry,
|
mans []*kopia.ManifestEntry,
|
||||||
@ -465,7 +492,7 @@ func consumeBackupDataCollections(
|
|||||||
"base_backup_id", mbID)
|
"base_backup_id", mbID)
|
||||||
}
|
}
|
||||||
|
|
||||||
kopiaStats, deets, itemsSourcedFromBase, err := bu.BackupCollections(
|
kopiaStats, deets, itemsSourcedFromBase, err := bc.ConsumeBackupCollections(
|
||||||
ctx,
|
ctx,
|
||||||
bases,
|
bases,
|
||||||
cs,
|
cs,
|
||||||
|
|||||||
@ -383,7 +383,7 @@ func generateContainerOfItems(
|
|||||||
dest,
|
dest,
|
||||||
collections)
|
collections)
|
||||||
|
|
||||||
deets, err := gc.RestoreDataCollections(
|
deets, err := gc.ConsumeRestoreCollections(
|
||||||
ctx,
|
ctx,
|
||||||
backupVersion,
|
backupVersion,
|
||||||
acct,
|
acct,
|
||||||
@ -539,7 +539,7 @@ func (suite *BackupOpIntegrationSuite) SetupSuite() {
|
|||||||
func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
||||||
kw := &kopia.Wrapper{}
|
kw := &kopia.Wrapper{}
|
||||||
sw := &store.Wrapper{}
|
sw := &store.Wrapper{}
|
||||||
gc := &connector.GraphConnector{}
|
gc := &mockconnector.GraphConnector{}
|
||||||
acct := tester.NewM365Account(suite.T())
|
acct := tester.NewM365Account(suite.T())
|
||||||
|
|
||||||
table := []struct {
|
table := []struct {
|
||||||
@ -547,7 +547,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
|||||||
opts control.Options
|
opts control.Options
|
||||||
kw *kopia.Wrapper
|
kw *kopia.Wrapper
|
||||||
sw *store.Wrapper
|
sw *store.Wrapper
|
||||||
gc *connector.GraphConnector
|
bp BackupProducer
|
||||||
acct account.Account
|
acct account.Account
|
||||||
targets []string
|
targets []string
|
||||||
errCheck assert.ErrorAssertionFunc
|
errCheck assert.ErrorAssertionFunc
|
||||||
@ -555,7 +555,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
|||||||
{"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError},
|
{"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError},
|
||||||
{"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error},
|
{"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error},
|
||||||
{"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error},
|
{"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error},
|
||||||
{"missing graphconnector", control.Options{}, kw, sw, nil, acct, nil, assert.Error},
|
{"missing backup producer", control.Options{}, kw, sw, nil, acct, nil, assert.Error},
|
||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
@ -567,7 +567,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
|||||||
test.opts,
|
test.opts,
|
||||||
test.kw,
|
test.kw,
|
||||||
test.sw,
|
test.sw,
|
||||||
test.gc,
|
test.bp,
|
||||||
test.acct,
|
test.acct,
|
||||||
selectors.Selector{DiscreteOwner: "test"},
|
selectors.Selector{DiscreteOwner: "test"},
|
||||||
"test-name",
|
"test-name",
|
||||||
|
|||||||
@ -14,7 +14,7 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/connector"
|
"github.com/alcionai/corso/src/internal/connector/mockconnector"
|
||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
evmock "github.com/alcionai/corso/src/internal/events/mock"
|
evmock "github.com/alcionai/corso/src/internal/events/mock"
|
||||||
@ -85,9 +85,9 @@ func checkPaths(t *testing.T, expected, got []path.Path) {
|
|||||||
assert.ElementsMatch(t, expected, got)
|
assert.ElementsMatch(t, expected, got)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----- backup producer
|
// ----- backup consumer
|
||||||
|
|
||||||
type mockBackuper struct {
|
type mockBackupConsumer struct {
|
||||||
checkFunc func(
|
checkFunc func(
|
||||||
bases []kopia.IncrementalBase,
|
bases []kopia.IncrementalBase,
|
||||||
cs []data.BackupCollection,
|
cs []data.BackupCollection,
|
||||||
@ -95,7 +95,7 @@ type mockBackuper struct {
|
|||||||
buildTreeWithBase bool)
|
buildTreeWithBase bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mbu mockBackuper) BackupCollections(
|
func (mbu mockBackupConsumer) ConsumeBackupCollections(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
bases []kopia.IncrementalBase,
|
bases []kopia.IncrementalBase,
|
||||||
cs []data.BackupCollection,
|
cs []data.BackupCollection,
|
||||||
@ -360,7 +360,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() {
|
|||||||
var (
|
var (
|
||||||
kw = &kopia.Wrapper{}
|
kw = &kopia.Wrapper{}
|
||||||
sw = &store.Wrapper{}
|
sw = &store.Wrapper{}
|
||||||
gc = &connector.GraphConnector{}
|
gc = &mockconnector.GraphConnector{}
|
||||||
acct = account.Account{}
|
acct = account.Account{}
|
||||||
now = time.Now()
|
now = time.Now()
|
||||||
)
|
)
|
||||||
@ -564,7 +564,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_ConsumeBackupDataCollections
|
|||||||
ctx, flush := tester.NewContext()
|
ctx, flush := tester.NewContext()
|
||||||
defer flush()
|
defer flush()
|
||||||
|
|
||||||
mbu := &mockBackuper{
|
mbu := &mockBackupConsumer{
|
||||||
checkFunc: func(
|
checkFunc: func(
|
||||||
bases []kopia.IncrementalBase,
|
bases []kopia.IncrementalBase,
|
||||||
cs []data.BackupCollection,
|
cs []data.BackupCollection,
|
||||||
@ -576,7 +576,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_ConsumeBackupDataCollections
|
|||||||
}
|
}
|
||||||
|
|
||||||
//nolint:errcheck
|
//nolint:errcheck
|
||||||
consumeBackupDataCollections(
|
consumeBackupCollections(
|
||||||
ctx,
|
ctx,
|
||||||
mbu,
|
mbu,
|
||||||
tenant,
|
tenant,
|
||||||
|
|||||||
@ -5,7 +5,6 @@ import (
|
|||||||
|
|
||||||
"github.com/alcionai/clues"
|
"github.com/alcionai/clues"
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/connector"
|
|
||||||
"github.com/alcionai/corso/src/internal/events"
|
"github.com/alcionai/corso/src/internal/events"
|
||||||
"github.com/alcionai/corso/src/internal/kopia"
|
"github.com/alcionai/corso/src/internal/kopia"
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
@ -57,7 +56,6 @@ type operation struct {
|
|||||||
bus events.Eventer
|
bus events.Eventer
|
||||||
kopia *kopia.Wrapper
|
kopia *kopia.Wrapper
|
||||||
store *store.Wrapper
|
store *store.Wrapper
|
||||||
gc *connector.GraphConnector
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newOperation(
|
func newOperation(
|
||||||
@ -65,7 +63,6 @@ func newOperation(
|
|||||||
bus events.Eventer,
|
bus events.Eventer,
|
||||||
kw *kopia.Wrapper,
|
kw *kopia.Wrapper,
|
||||||
sw *store.Wrapper,
|
sw *store.Wrapper,
|
||||||
gc *connector.GraphConnector,
|
|
||||||
) operation {
|
) operation {
|
||||||
return operation{
|
return operation{
|
||||||
CreatedAt: time.Now(),
|
CreatedAt: time.Now(),
|
||||||
@ -75,7 +72,6 @@ func newOperation(
|
|||||||
bus: bus,
|
bus: bus,
|
||||||
kopia: kw,
|
kopia: kw,
|
||||||
store: sw,
|
store: sw,
|
||||||
gc: gc,
|
|
||||||
|
|
||||||
Status: InProgress,
|
Status: InProgress,
|
||||||
}
|
}
|
||||||
@ -90,9 +86,5 @@ func (op operation) validate() error {
|
|||||||
return clues.New("missing modelstore")
|
return clues.New("missing modelstore")
|
||||||
}
|
}
|
||||||
|
|
||||||
if op.gc == nil {
|
|
||||||
return clues.New("missing graph connector")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -8,7 +8,6 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/connector"
|
|
||||||
"github.com/alcionai/corso/src/internal/events"
|
"github.com/alcionai/corso/src/internal/events"
|
||||||
"github.com/alcionai/corso/src/internal/kopia"
|
"github.com/alcionai/corso/src/internal/kopia"
|
||||||
"github.com/alcionai/corso/src/internal/tester"
|
"github.com/alcionai/corso/src/internal/tester"
|
||||||
@ -26,30 +25,27 @@ func TestOperationSuite(t *testing.T) {
|
|||||||
|
|
||||||
func (suite *OperationSuite) TestNewOperation() {
|
func (suite *OperationSuite) TestNewOperation() {
|
||||||
t := suite.T()
|
t := suite.T()
|
||||||
op := newOperation(control.Options{}, events.Bus{}, nil, nil, nil)
|
op := newOperation(control.Options{}, events.Bus{}, nil, nil)
|
||||||
assert.Greater(t, op.CreatedAt, time.Time{})
|
assert.Greater(t, op.CreatedAt, time.Time{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *OperationSuite) TestOperation_Validate() {
|
func (suite *OperationSuite) TestOperation_Validate() {
|
||||||
kwStub := &kopia.Wrapper{}
|
kwStub := &kopia.Wrapper{}
|
||||||
swStub := &store.Wrapper{}
|
swStub := &store.Wrapper{}
|
||||||
gcStub := &connector.GraphConnector{}
|
|
||||||
|
|
||||||
table := []struct {
|
table := []struct {
|
||||||
name string
|
name string
|
||||||
kw *kopia.Wrapper
|
kw *kopia.Wrapper
|
||||||
sw *store.Wrapper
|
sw *store.Wrapper
|
||||||
gc *connector.GraphConnector
|
|
||||||
errCheck assert.ErrorAssertionFunc
|
errCheck assert.ErrorAssertionFunc
|
||||||
}{
|
}{
|
||||||
{"good", kwStub, swStub, gcStub, assert.NoError},
|
{"good", kwStub, swStub, assert.NoError},
|
||||||
{"missing kopia wrapper", nil, swStub, gcStub, assert.Error},
|
{"missing kopia wrapper", nil, swStub, assert.Error},
|
||||||
{"missing store wrapper", kwStub, nil, gcStub, assert.Error},
|
{"missing store wrapper", kwStub, nil, assert.Error},
|
||||||
{"missing graph connector", kwStub, swStub, nil, assert.Error},
|
|
||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
err := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw, test.gc).validate()
|
err := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw).validate()
|
||||||
test.errCheck(suite.T(), err, clues.ToCore(err))
|
test.errCheck(suite.T(), err, clues.ToCore(err))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -7,10 +7,10 @@ import (
|
|||||||
|
|
||||||
"github.com/alcionai/clues"
|
"github.com/alcionai/clues"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/common"
|
"github.com/alcionai/corso/src/internal/common"
|
||||||
"github.com/alcionai/corso/src/internal/common/crash"
|
"github.com/alcionai/corso/src/internal/common/crash"
|
||||||
"github.com/alcionai/corso/src/internal/connector"
|
|
||||||
"github.com/alcionai/corso/src/internal/connector/onedrive"
|
"github.com/alcionai/corso/src/internal/connector/onedrive"
|
||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
@ -42,6 +42,7 @@ type RestoreOperation struct {
|
|||||||
Version string `json:"version"`
|
Version string `json:"version"`
|
||||||
|
|
||||||
account account.Account
|
account account.Account
|
||||||
|
rc RestoreConsumer
|
||||||
}
|
}
|
||||||
|
|
||||||
// RestoreResults aggregate the details of the results of the operation.
|
// RestoreResults aggregate the details of the results of the operation.
|
||||||
@ -56,7 +57,7 @@ func NewRestoreOperation(
|
|||||||
opts control.Options,
|
opts control.Options,
|
||||||
kw *kopia.Wrapper,
|
kw *kopia.Wrapper,
|
||||||
sw *store.Wrapper,
|
sw *store.Wrapper,
|
||||||
gc *connector.GraphConnector,
|
rc RestoreConsumer,
|
||||||
acct account.Account,
|
acct account.Account,
|
||||||
backupID model.StableID,
|
backupID model.StableID,
|
||||||
sel selectors.Selector,
|
sel selectors.Selector,
|
||||||
@ -64,12 +65,13 @@ func NewRestoreOperation(
|
|||||||
bus events.Eventer,
|
bus events.Eventer,
|
||||||
) (RestoreOperation, error) {
|
) (RestoreOperation, error) {
|
||||||
op := RestoreOperation{
|
op := RestoreOperation{
|
||||||
operation: newOperation(opts, bus, kw, sw, gc),
|
operation: newOperation(opts, bus, kw, sw),
|
||||||
BackupID: backupID,
|
BackupID: backupID,
|
||||||
Selectors: sel,
|
Selectors: sel,
|
||||||
Destination: dest,
|
Destination: dest,
|
||||||
Version: "v0",
|
Version: "v0",
|
||||||
account: acct,
|
account: acct,
|
||||||
|
rc: rc,
|
||||||
}
|
}
|
||||||
if err := op.validate(); err != nil {
|
if err := op.validate(); err != nil {
|
||||||
return RestoreOperation{}, err
|
return RestoreOperation{}, err
|
||||||
@ -79,6 +81,10 @@ func NewRestoreOperation(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (op RestoreOperation) validate() error {
|
func (op RestoreOperation) validate() error {
|
||||||
|
if op.rc == nil {
|
||||||
|
return clues.New("missing restore consumer")
|
||||||
|
}
|
||||||
|
|
||||||
return op.operation.validate()
|
return op.operation.validate()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -235,12 +241,9 @@ func (op *RestoreOperation) do(
|
|||||||
opStats.resourceCount = 1
|
opStats.resourceCount = 1
|
||||||
opStats.cs = dcs
|
opStats.cs = dcs
|
||||||
|
|
||||||
restoreComplete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Restoring data"))
|
deets, err = consumeRestoreCollections(
|
||||||
defer closer()
|
|
||||||
defer close(restoreComplete)
|
|
||||||
|
|
||||||
restoreDetails, err := op.gc.RestoreDataCollections(
|
|
||||||
ctx,
|
ctx,
|
||||||
|
op.rc,
|
||||||
bup.Version,
|
bup.Version,
|
||||||
op.account,
|
op.account,
|
||||||
op.Selectors,
|
op.Selectors,
|
||||||
@ -252,13 +255,11 @@ func (op *RestoreOperation) do(
|
|||||||
return nil, clues.Wrap(err, "restoring collections")
|
return nil, clues.Wrap(err, "restoring collections")
|
||||||
}
|
}
|
||||||
|
|
||||||
restoreComplete <- struct{}{}
|
opStats.gc = op.rc.Wait()
|
||||||
|
|
||||||
opStats.gc = op.gc.AwaitStatus()
|
logger.Ctx(ctx).Debug(opStats.gc)
|
||||||
|
|
||||||
logger.Ctx(ctx).Debug(op.gc.PrintableStatus())
|
return deets, nil
|
||||||
|
|
||||||
return restoreDetails, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// persists details and statistics about the restore operation.
|
// persists details and statistics about the restore operation.
|
||||||
@ -312,6 +313,60 @@ func (op *RestoreOperation) persistResults(
|
|||||||
return op.Errors.Failure()
|
return op.Errors.Failure()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Restorer funcs
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type RestoreConsumer interface {
|
||||||
|
ConsumeRestoreCollections(
|
||||||
|
ctx context.Context,
|
||||||
|
backupVersion int,
|
||||||
|
acct account.Account,
|
||||||
|
selector selectors.Selector,
|
||||||
|
dest control.RestoreDestination,
|
||||||
|
opts control.Options,
|
||||||
|
dcs []data.RestoreCollection,
|
||||||
|
errs *fault.Bus,
|
||||||
|
) (*details.Details, error)
|
||||||
|
// TODO: ConnectorOperationStatus should be replaced with something
|
||||||
|
// more generic.
|
||||||
|
Wait() *support.ConnectorOperationStatus
|
||||||
|
}
|
||||||
|
|
||||||
|
func consumeRestoreCollections(
|
||||||
|
ctx context.Context,
|
||||||
|
rc RestoreConsumer,
|
||||||
|
backupVersion int,
|
||||||
|
acct account.Account,
|
||||||
|
sel selectors.Selector,
|
||||||
|
dest control.RestoreDestination,
|
||||||
|
opts control.Options,
|
||||||
|
dcs []data.RestoreCollection,
|
||||||
|
errs *fault.Bus,
|
||||||
|
) (*details.Details, error) {
|
||||||
|
complete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Restoring data"))
|
||||||
|
defer func() {
|
||||||
|
complete <- struct{}{}
|
||||||
|
close(complete)
|
||||||
|
closer()
|
||||||
|
}()
|
||||||
|
|
||||||
|
deets, err := rc.ConsumeRestoreCollections(
|
||||||
|
ctx,
|
||||||
|
backupVersion,
|
||||||
|
acct,
|
||||||
|
sel,
|
||||||
|
dest,
|
||||||
|
opts,
|
||||||
|
dcs,
|
||||||
|
errs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "restoring collections")
|
||||||
|
}
|
||||||
|
|
||||||
|
return deets, nil
|
||||||
|
}
|
||||||
|
|
||||||
// formatDetailsForRestoration reduces the provided detail entries according to the
|
// formatDetailsForRestoration reduces the provided detail entries according to the
|
||||||
// selector specifications.
|
// selector specifications.
|
||||||
func formatDetailsForRestoration(
|
func formatDetailsForRestoration(
|
||||||
|
|||||||
@ -50,7 +50,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
|
|||||||
var (
|
var (
|
||||||
kw = &kopia.Wrapper{}
|
kw = &kopia.Wrapper{}
|
||||||
sw = &store.Wrapper{}
|
sw = &store.Wrapper{}
|
||||||
gc = &connector.GraphConnector{}
|
gc = &mockconnector.GraphConnector{}
|
||||||
acct = account.Account{}
|
acct = account.Account{}
|
||||||
now = time.Now()
|
now = time.Now()
|
||||||
dest = tester.DefaultTestRestoreDestination()
|
dest = tester.DefaultTestRestoreDestination()
|
||||||
@ -217,7 +217,7 @@ func (suite *RestoreOpIntegrationSuite) TearDownSuite() {
|
|||||||
func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
|
func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
|
||||||
kw := &kopia.Wrapper{}
|
kw := &kopia.Wrapper{}
|
||||||
sw := &store.Wrapper{}
|
sw := &store.Wrapper{}
|
||||||
gc := &connector.GraphConnector{}
|
gc := &mockconnector.GraphConnector{}
|
||||||
acct := tester.NewM365Account(suite.T())
|
acct := tester.NewM365Account(suite.T())
|
||||||
dest := tester.DefaultTestRestoreDestination()
|
dest := tester.DefaultTestRestoreDestination()
|
||||||
|
|
||||||
@ -226,7 +226,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
|
|||||||
opts control.Options
|
opts control.Options
|
||||||
kw *kopia.Wrapper
|
kw *kopia.Wrapper
|
||||||
sw *store.Wrapper
|
sw *store.Wrapper
|
||||||
gc *connector.GraphConnector
|
rc RestoreConsumer
|
||||||
acct account.Account
|
acct account.Account
|
||||||
targets []string
|
targets []string
|
||||||
errCheck assert.ErrorAssertionFunc
|
errCheck assert.ErrorAssertionFunc
|
||||||
@ -234,7 +234,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
|
|||||||
{"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError},
|
{"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError},
|
||||||
{"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error},
|
{"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error},
|
||||||
{"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error},
|
{"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error},
|
||||||
{"missing graphConnector", control.Options{}, kw, sw, nil, acct, nil, assert.Error},
|
{"missing restore consumer", control.Options{}, kw, sw, nil, acct, nil, assert.Error},
|
||||||
}
|
}
|
||||||
for _, test := range table {
|
for _, test := range table {
|
||||||
suite.Run(test.name, func() {
|
suite.Run(test.name, func() {
|
||||||
@ -246,7 +246,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
|
|||||||
test.opts,
|
test.opts,
|
||||||
test.kw,
|
test.kw,
|
||||||
test.sw,
|
test.sw,
|
||||||
test.gc,
|
test.rc,
|
||||||
test.acct,
|
test.acct,
|
||||||
"backup-id",
|
"backup-id",
|
||||||
selectors.Selector{DiscreteOwner: "test"},
|
selectors.Selector{DiscreteOwner: "test"},
|
||||||
|
|||||||
@ -222,7 +222,7 @@ func collect(
|
|||||||
}
|
}
|
||||||
|
|
||||||
type backuper interface {
|
type backuper interface {
|
||||||
BackupCollections(
|
ConsumeBackupCollections(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
bases []kopia.IncrementalBase,
|
bases []kopia.IncrementalBase,
|
||||||
cs []data.BackupCollection,
|
cs []data.BackupCollection,
|
||||||
@ -240,7 +240,7 @@ func write(
|
|||||||
dbcs []data.BackupCollection,
|
dbcs []data.BackupCollection,
|
||||||
errs *fault.Bus,
|
errs *fault.Bus,
|
||||||
) (string, error) {
|
) (string, error) {
|
||||||
backupStats, _, _, err := bup.BackupCollections(
|
backupStats, _, _, err := bup.ConsumeBackupCollections(
|
||||||
ctx,
|
ctx,
|
||||||
nil,
|
nil,
|
||||||
dbcs,
|
dbcs,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user