move graph connector creation to new op (#2923)

Moves the generation of graphConnector to the
NewBackupOp and NewRestoreOp constructors
inside the repository.  This removes gc creation
from the backup and restore operations, and
requires a reference to gc to exist in operations.

This sets up two changes: 1/ mocking of GC
within operation (the next PR will replace it with
an interface).  2/ the ability to perform a gc
validation step within the operation construction,
and thus the capacity to look up a resource owner
display name or id, and vice versa.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🌻 Feature

#### Issue(s)

* #2825

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-03-29 16:00:30 -06:00 committed by GitHub
parent e09c120778
commit 4a395d44a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 401 additions and 242 deletions

View File

@ -227,11 +227,18 @@ func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus {
defer func() { defer func() {
if gc.region != nil { if gc.region != nil {
gc.region.End() gc.region.End()
gc.region = nil
} }
}() }()
gc.wg.Wait() gc.wg.Wait()
return &gc.status // clean up and reset statefulness
status := gc.status
gc.wg = &sync.WaitGroup{}
gc.status = support.ConnectorOperationStatus{}
return &status
} }
// UpdateStatus is used by gc initiated tasks to indicate completion // UpdateStatus is used by gc initiated tasks to indicate completion

View File

@ -111,17 +111,17 @@ func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_Status() {
go statusTestTask(&gc, 4, 1, 1) go statusTestTask(&gc, 4, 1, 1)
go statusTestTask(&gc, 4, 1, 1) go statusTestTask(&gc, 4, 1, 1)
gc.AwaitStatus() status := gc.AwaitStatus()
t := suite.T() t := suite.T()
assert.NotEmpty(t, gc.PrintableStatus()) assert.NotEmpty(t, gc.PrintableStatus())
// Expect 8 objects // Expect 8 objects
assert.Equal(t, 8, gc.Status().Metrics.Objects) assert.Equal(t, 8, status.Metrics.Objects)
// Expect 2 success // Expect 2 success
assert.Equal(t, 2, gc.Status().Metrics.Successes) assert.Equal(t, 2, status.Metrics.Successes)
// Expect 2 folders // Expect 2 folders
assert.Equal(t, 2, gc.Status().Folders) assert.Equal(t, 2, status.Folders)
} }
func (suite *DisconnectedGraphConnectorSuite) TestVerifyBackupInputs_allServices() { func (suite *DisconnectedGraphConnectorSuite) TestVerifyBackupInputs_allServices() {

View File

@ -2,6 +2,8 @@ package connector
import ( import (
"context" "context"
"runtime/trace"
"sync"
"testing" "testing"
"time" "time"
@ -13,6 +15,7 @@ import (
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/connector/mockconnector"
"github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/internal/version" "github.com/alcionai/corso/src/internal/version"
@ -132,6 +135,37 @@ func (suite *GraphConnectorUnitSuite) TestUnionSiteIDsAndWebURLs() {
} }
} }
func (suite *GraphConnectorUnitSuite) TestGraphConnector_AwaitStatus() {
ctx, flush := tester.NewContext()
defer flush()
var (
t = suite.T()
gc = &GraphConnector{
wg: &sync.WaitGroup{},
region: &trace.Region{},
}
metrics = support.CollectionMetrics{
Objects: 2,
Successes: 3,
Bytes: 4,
}
status = support.CreateStatus(ctx, support.Backup, 1, metrics, "details")
)
gc.wg.Add(1)
gc.UpdateStatus(status)
result := gc.AwaitStatus()
require.NotNil(t, result)
assert.Nil(t, gc.region, "region")
assert.Empty(t, gc.status, "status")
assert.Equal(t, 1, result.Folders)
assert.Equal(t, 2, result.Metrics.Objects)
assert.Equal(t, 3, result.Metrics.Successes)
assert.Equal(t, int64(4), result.Metrics.Bytes)
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Integration tests // Integration tests
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -60,13 +60,14 @@ func NewBackupOperation(
opts control.Options, opts control.Options,
kw *kopia.Wrapper, kw *kopia.Wrapper,
sw *store.Wrapper, sw *store.Wrapper,
gc *connector.GraphConnector,
acct account.Account, acct account.Account,
selector selectors.Selector, selector selectors.Selector,
ownerName string, ownerName string,
bus events.Eventer, bus events.Eventer,
) (BackupOperation, error) { ) (BackupOperation, error) {
op := BackupOperation{ op := BackupOperation{
operation: newOperation(opts, bus, kw, sw), operation: newOperation(opts, bus, kw, sw, gc),
ResourceOwner: selector.DiscreteOwner, ResourceOwner: selector.DiscreteOwner,
ResourceOwnerName: ownerName, ResourceOwnerName: ownerName,
Selectors: selector, Selectors: selector,
@ -74,6 +75,11 @@ func NewBackupOperation(
account: acct, account: acct,
incremental: useIncrementalBackup(selector, opts), incremental: useIncrementalBackup(selector, opts),
} }
if len(ownerName) == 0 {
op.ResourceOwnerName = op.ResourceOwner
}
if err := op.validate(); err != nil { if err := op.validate(); err != nil {
return BackupOperation{}, err return BackupOperation{}, err
} }
@ -237,12 +243,7 @@ func (op *BackupOperation) do(
return nil, clues.Wrap(err, "producing manifests and metadata") return nil, clues.Wrap(err, "producing manifests and metadata")
} }
gc, err := connectToM365(ctx, op.Selectors, op.account, op.Errors) cs, excludes, err := produceBackupDataCollections(ctx, op.gc, op.Selectors, mdColls, op.Options, op.Errors)
if err != nil {
return nil, clues.Wrap(err, "connectng to m365")
}
cs, excludes, err := produceBackupDataCollections(ctx, gc, op.Selectors, mdColls, op.Options, op.Errors)
if err != nil { if err != nil {
return nil, clues.Wrap(err, "producing backup data collections") return nil, clues.Wrap(err, "producing backup data collections")
} }
@ -278,9 +279,9 @@ func (op *BackupOperation) do(
return nil, clues.Wrap(err, "merging details") return nil, clues.Wrap(err, "merging details")
} }
opStats.gc = gc.AwaitStatus() opStats.gc = op.gc.AwaitStatus()
logger.Ctx(ctx).Debug(gc.PrintableStatus()) logger.Ctx(ctx).Debug(op.gc.PrintableStatus())
return deets, nil return deets, nil
} }

View File

@ -59,12 +59,21 @@ func prepNewTestBackupOp(
bus events.Eventer, bus events.Eventer,
sel selectors.Selector, sel selectors.Selector,
featureToggles control.Toggles, featureToggles control.Toggles,
) (BackupOperation, account.Account, *kopia.Wrapper, *kopia.ModelStore, func()) { ) (
BackupOperation,
account.Account,
*kopia.Wrapper,
*kopia.ModelStore,
*connector.GraphConnector,
func(),
) {
//revive:enable:context-as-argument //revive:enable:context-as-argument
acct := tester.NewM365Account(t) var (
acct = tester.NewM365Account(t)
// need to initialize the repository before we can test connecting to it. // need to initialize the repository before we can test connecting to it.
st := tester.NewPrefixedS3Storage(t) st = tester.NewPrefixedS3Storage(t)
k := kopia.NewConn(st) k = kopia.NewConn(st)
)
err := k.Initialize(ctx) err := k.Initialize(ctx)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
@ -96,9 +105,25 @@ func prepNewTestBackupOp(
ms.Close(ctx) ms.Close(ctx)
} }
bo := newTestBackupOp(t, ctx, kw, ms, acct, sel, bus, featureToggles, closer) connectorResource := connector.Users
if sel.Service == selectors.ServiceSharePoint {
connectorResource = connector.Sites
}
return bo, acct, kw, ms, closer gc, err := connector.NewGraphConnector(
ctx,
graph.HTTPClient(graph.NoTimeout()),
acct,
connectorResource,
fault.New(true))
if !assert.NoError(t, err, clues.ToCore(err)) {
closer()
t.FailNow()
}
bo := newTestBackupOp(t, ctx, kw, ms, gc, acct, sel, bus, featureToggles, closer)
return bo, acct, kw, ms, gc, closer
} }
// newTestBackupOp accepts the clients required to compose a backup operation, plus // newTestBackupOp accepts the clients required to compose a backup operation, plus
@ -112,6 +137,7 @@ func newTestBackupOp(
ctx context.Context, ctx context.Context,
kw *kopia.Wrapper, kw *kopia.Wrapper,
ms *kopia.ModelStore, ms *kopia.ModelStore,
gc *connector.GraphConnector,
acct account.Account, acct account.Account,
sel selectors.Selector, sel selectors.Selector,
bus events.Eventer, bus events.Eventer,
@ -126,7 +152,7 @@ func newTestBackupOp(
opts.ToggleFeatures = featureToggles opts.ToggleFeatures = featureToggles
bo, err := NewBackupOperation(ctx, opts, kw, sw, acct, sel, sel.DiscreteOwner, bus) bo, err := NewBackupOperation(ctx, opts, kw, sw, gc, acct, sel, sel.DiscreteOwner, bus)
if !assert.NoError(t, err, clues.ToCore(err)) { if !assert.NoError(t, err, clues.ToCore(err)) {
closer() closer()
t.FailNow() t.FailNow()
@ -141,20 +167,28 @@ func runAndCheckBackup(
ctx context.Context, ctx context.Context,
bo *BackupOperation, bo *BackupOperation,
mb *evmock.Bus, mb *evmock.Bus,
acceptNoData bool,
) { ) {
//revive:enable:context-as-argument //revive:enable:context-as-argument
err := bo.Run(ctx) err := bo.Run(ctx)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, bo.Results, "the backup had non-zero results") require.NotEmpty(t, bo.Results, "the backup had non-zero results")
require.NotEmpty(t, bo.Results.BackupID, "the backup generated an ID") require.NotEmpty(t, bo.Results.BackupID, "the backup generated an ID")
require.Equalf(
t,
Completed,
bo.Status,
"backup status should be Completed, got %s",
bo.Status)
require.Less(t, 0, bo.Results.ItemsWritten)
expectStatus := []opStatus{Completed}
if acceptNoData {
expectStatus = append(expectStatus, NoData)
}
require.Contains(
t,
expectStatus,
bo.Status,
"backup doesn't match expectation, wanted any of %v, got %s",
expectStatus,
bo.Status)
require.Less(t, 0, bo.Results.ItemsWritten)
assert.Less(t, 0, bo.Results.ItemsRead, "count of items read") assert.Less(t, 0, bo.Results.ItemsRead, "count of items read")
assert.Less(t, int64(0), bo.Results.BytesRead, "bytes read") assert.Less(t, int64(0), bo.Results.BytesRead, "bytes read")
assert.Less(t, int64(0), bo.Results.BytesUploaded, "bytes uploaded") assert.Less(t, int64(0), bo.Results.BytesUploaded, "bytes uploaded")
@ -360,6 +394,8 @@ func generateContainerOfItems(
fault.New(true)) fault.New(true))
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
gc.AwaitStatus()
return deets return deets
} }
@ -503,6 +539,7 @@ func (suite *BackupOpIntegrationSuite) SetupSuite() {
func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
kw := &kopia.Wrapper{} kw := &kopia.Wrapper{}
sw := &store.Wrapper{} sw := &store.Wrapper{}
gc := &connector.GraphConnector{}
acct := tester.NewM365Account(suite.T()) acct := tester.NewM365Account(suite.T())
table := []struct { table := []struct {
@ -510,13 +547,15 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
opts control.Options opts control.Options
kw *kopia.Wrapper kw *kopia.Wrapper
sw *store.Wrapper sw *store.Wrapper
gc *connector.GraphConnector
acct account.Account acct account.Account
targets []string targets []string
errCheck assert.ErrorAssertionFunc errCheck assert.ErrorAssertionFunc
}{ }{
{"good", control.Options{}, kw, sw, acct, nil, assert.NoError}, {"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError},
{"missing kopia", control.Options{}, nil, sw, acct, nil, assert.Error}, {"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error},
{"missing modelstore", control.Options{}, kw, nil, acct, nil, assert.Error}, {"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error},
{"missing graphconnector", control.Options{}, kw, sw, nil, acct, nil, assert.Error},
} }
for _, test := range table { for _, test := range table {
suite.Run(test.name, func() { suite.Run(test.name, func() {
@ -528,6 +567,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
test.opts, test.opts,
test.kw, test.kw,
test.sw, test.sw,
test.gc,
test.acct, test.acct,
selectors.Selector{DiscreteOwner: "test"}, selectors.Selector{DiscreteOwner: "test"},
"test-name", "test-name",
@ -604,14 +644,14 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() {
ffs = control.Toggles{} ffs = control.Toggles{}
) )
bo, acct, kw, ms, closer := prepNewTestBackupOp(t, ctx, mb, sel, ffs) bo, acct, kw, ms, gc, closer := prepNewTestBackupOp(t, ctx, mb, sel, ffs)
defer closer() defer closer()
m365, err := acct.M365Config() m365, err := acct.M365Config()
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
// run the tests // run the tests
runAndCheckBackup(t, ctx, &bo, mb) runAndCheckBackup(t, ctx, &bo, mb, false)
checkBackupIsInManifests(t, ctx, kw, &bo, sel, test.resourceOwner, test.category) checkBackupIsInManifests(t, ctx, kw, &bo, sel, test.resourceOwner, test.category)
checkMetadataFilesExist( checkMetadataFilesExist(
t, t,
@ -622,8 +662,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() {
m365.AzureTenantID, m365.AzureTenantID,
test.resourceOwner, test.resourceOwner,
path.ExchangeService, path.ExchangeService,
map[path.CategoryType][]string{test.category: test.metadataFiles}, map[path.CategoryType][]string{test.category: test.metadataFiles})
)
if !test.runIncremental { if !test.runIncremental {
return return
@ -634,10 +673,10 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() {
// produces fewer results than the last backup. // produces fewer results than the last backup.
var ( var (
incMB = evmock.NewBus() incMB = evmock.NewBus()
incBO = newTestBackupOp(t, ctx, kw, ms, acct, sel, incMB, ffs, closer) incBO = newTestBackupOp(t, ctx, kw, ms, gc, acct, sel, incMB, ffs, closer)
) )
runAndCheckBackup(t, ctx, &incBO, incMB) runAndCheckBackup(t, ctx, &incBO, incMB, true)
checkBackupIsInManifests(t, ctx, kw, &incBO, sel, test.resourceOwner, test.category) checkBackupIsInManifests(t, ctx, kw, &incBO, sel, test.resourceOwner, test.category)
checkMetadataFilesExist( checkMetadataFilesExist(
t, t,
@ -648,8 +687,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() {
m365.AzureTenantID, m365.AzureTenantID,
test.resourceOwner, test.resourceOwner,
path.ExchangeService, path.ExchangeService,
map[path.CategoryType][]string{test.category: test.metadataFiles}, map[path.CategoryType][]string{test.category: test.metadataFiles})
)
// do some additional checks to ensure the incremental dealt with fewer items. // do some additional checks to ensure the incremental dealt with fewer items.
assert.Greater(t, bo.Results.ItemsWritten, incBO.Results.ItemsWritten, "incremental items written") assert.Greater(t, bo.Results.ItemsWritten, incBO.Results.ItemsWritten, "incremental items written")
@ -826,11 +864,11 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchangeIncrementals() {
sel.ContactFolders(containers, selectors.PrefixMatch()), sel.ContactFolders(containers, selectors.PrefixMatch()),
) )
bo, _, kw, ms, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, ffs) bo, _, kw, ms, gc, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, ffs)
defer closer() defer closer()
// run the initial backup // run the initial backup
runAndCheckBackup(t, ctx, &bo, mb) runAndCheckBackup(t, ctx, &bo, mb, false)
// Although established as a table, these tests are no isolated from each other. // Although established as a table, these tests are no isolated from each other.
// Assume that every test's side effects cascade to all following test cases. // Assume that every test's side effects cascade to all following test cases.
@ -1057,10 +1095,11 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchangeIncrementals() {
} }
for _, test := range table { for _, test := range table {
suite.Run(test.name, func() { suite.Run(test.name, func() {
fmt.Printf("\n-----\ntest %+v\n-----\n", test.name)
var ( var (
t = suite.T() t = suite.T()
incMB = evmock.NewBus() incMB = evmock.NewBus()
incBO = newTestBackupOp(t, ctx, kw, ms, acct, sel.Selector, incMB, ffs, closer) incBO = newTestBackupOp(t, ctx, kw, ms, gc, acct, sel.Selector, incMB, ffs, closer)
) )
test.updateUserData(t) test.updateUserData(t)
@ -1111,10 +1150,10 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_oneDrive() {
sel.Include(sel.AllData()) sel.Include(sel.AllData())
bo, _, _, _, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, control.Toggles{EnablePermissionsBackup: true}) bo, _, _, _, _, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, control.Toggles{EnablePermissionsBackup: true})
defer closer() defer closer()
runAndCheckBackup(t, ctx, &bo, mb) runAndCheckBackup(t, ctx, &bo, mb, false)
} }
// TestBackup_Run ensures that Integration Testing works for OneDrive // TestBackup_Run ensures that Integration Testing works for OneDrive
@ -1206,11 +1245,11 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_oneDriveIncrementals() {
sel := selectors.NewOneDriveBackup(owners) sel := selectors.NewOneDriveBackup(owners)
sel.Include(sel.Folders(containers, selectors.PrefixMatch())) sel.Include(sel.Folders(containers, selectors.PrefixMatch()))
bo, _, kw, ms, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, ffs) bo, _, kw, ms, gc, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, ffs)
defer closer() defer closer()
// run the initial backup // run the initial backup
runAndCheckBackup(t, ctx, &bo, mb) runAndCheckBackup(t, ctx, &bo, mb, false)
var ( var (
newFile models.DriveItemable newFile models.DriveItemable
@ -1513,7 +1552,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_oneDriveIncrementals() {
var ( var (
t = suite.T() t = suite.T()
incMB = evmock.NewBus() incMB = evmock.NewBus()
incBO = newTestBackupOp(t, ctx, kw, ms, acct, sel.Selector, incMB, ffs, closer) incBO = newTestBackupOp(t, ctx, kw, ms, gc, acct, sel.Selector, incMB, ffs, closer)
) )
tester.LogTimeOfTest(suite.T()) tester.LogTimeOfTest(suite.T())
@ -1566,9 +1605,9 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_sharePoint() {
sel.Include(sel.LibraryFolders(selectors.Any())) sel.Include(sel.LibraryFolders(selectors.Any()))
bo, _, kw, _, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, control.Toggles{}) bo, _, kw, _, _, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, control.Toggles{})
defer closer() defer closer()
runAndCheckBackup(t, ctx, &bo, mb) runAndCheckBackup(t, ctx, &bo, mb, false)
checkBackupIsInManifests(t, ctx, kw, &bo, sel.Selector, suite.site, path.LibrariesCategory) checkBackupIsInManifests(t, ctx, kw, &bo, sel.Selector, suite.site, path.LibrariesCategory)
} }

View File

@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
evmock "github.com/alcionai/corso/src/internal/events/mock" evmock "github.com/alcionai/corso/src/internal/events/mock"
@ -359,6 +360,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() {
var ( var (
kw = &kopia.Wrapper{} kw = &kopia.Wrapper{}
sw = &store.Wrapper{} sw = &store.Wrapper{}
gc = &connector.GraphConnector{}
acct = account.Account{} acct = account.Account{}
now = time.Now() now = time.Now()
) )
@ -413,6 +415,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() {
control.Options{}, control.Options{},
kw, kw,
sw, sw,
gc,
acct, acct,
sel, sel,
sel.DiscreteOwner, sel.DiscreteOwner,

View File

@ -1,20 +1,15 @@
package operations package operations
import ( import (
"context"
"time" "time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/store" "github.com/alcionai/corso/src/pkg/store"
) )
@ -62,6 +57,7 @@ type operation struct {
bus events.Eventer bus events.Eventer
kopia *kopia.Wrapper kopia *kopia.Wrapper
store *store.Wrapper store *store.Wrapper
gc *connector.GraphConnector
} }
func newOperation( func newOperation(
@ -69,6 +65,7 @@ func newOperation(
bus events.Eventer, bus events.Eventer,
kw *kopia.Wrapper, kw *kopia.Wrapper,
sw *store.Wrapper, sw *store.Wrapper,
gc *connector.GraphConnector,
) operation { ) operation {
return operation{ return operation{
CreatedAt: time.Now(), CreatedAt: time.Now(),
@ -78,6 +75,7 @@ func newOperation(
bus: bus, bus: bus,
kopia: kw, kopia: kw,
store: sw, store: sw,
gc: gc,
Status: InProgress, Status: InProgress,
} }
@ -92,33 +90,9 @@ func (op operation) validate() error {
return clues.New("missing modelstore") return clues.New("missing modelstore")
} }
if op.gc == nil {
return clues.New("missing graph connector")
}
return nil return nil
} }
// produces a graph connector.
func connectToM365(
ctx context.Context,
sel selectors.Selector,
acct account.Account,
errs *fault.Bus,
) (*connector.GraphConnector, error) {
complete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Connecting to M365"))
defer func() {
complete <- struct{}{}
close(complete)
closer()
}()
// retrieve data from the producer
resource := connector.Users
if sel.Service == selectors.ServiceSharePoint {
resource = connector.Sites
}
gc, err := connector.NewGraphConnector(ctx, graph.HTTPClient(graph.NoTimeout()), acct, resource, errs)
if err != nil {
return nil, err
}
return gc, nil
}

View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/kopia"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
@ -25,27 +26,30 @@ func TestOperationSuite(t *testing.T) {
func (suite *OperationSuite) TestNewOperation() { func (suite *OperationSuite) TestNewOperation() {
t := suite.T() t := suite.T()
op := newOperation(control.Options{}, events.Bus{}, nil, nil) op := newOperation(control.Options{}, events.Bus{}, nil, nil, nil)
assert.Greater(t, op.CreatedAt, time.Time{}) assert.Greater(t, op.CreatedAt, time.Time{})
} }
func (suite *OperationSuite) TestOperation_Validate() { func (suite *OperationSuite) TestOperation_Validate() {
kwStub := &kopia.Wrapper{} kwStub := &kopia.Wrapper{}
swStub := &store.Wrapper{} swStub := &store.Wrapper{}
gcStub := &connector.GraphConnector{}
table := []struct { table := []struct {
name string name string
kw *kopia.Wrapper kw *kopia.Wrapper
sw *store.Wrapper sw *store.Wrapper
gc *connector.GraphConnector
errCheck assert.ErrorAssertionFunc errCheck assert.ErrorAssertionFunc
}{ }{
{"good", kwStub, swStub, assert.NoError}, {"good", kwStub, swStub, gcStub, assert.NoError},
{"missing kopia wrapper", nil, swStub, assert.Error}, {"missing kopia wrapper", nil, swStub, gcStub, assert.Error},
{"missing store wrapper", kwStub, nil, assert.Error}, {"missing store wrapper", kwStub, nil, gcStub, assert.Error},
{"missing graph connector", kwStub, swStub, nil, assert.Error},
} }
for _, test := range table { for _, test := range table {
suite.Run(test.name, func() { suite.Run(test.name, func() {
err := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw).validate() err := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw, test.gc).validate()
test.errCheck(suite.T(), err, clues.ToCore(err)) test.errCheck(suite.T(), err, clues.ToCore(err))
}) })
} }

View File

@ -10,6 +10,7 @@ import (
"github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/common"
"github.com/alcionai/corso/src/internal/common/crash" "github.com/alcionai/corso/src/internal/common/crash"
"github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/onedrive" "github.com/alcionai/corso/src/internal/connector/onedrive"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
@ -55,6 +56,7 @@ func NewRestoreOperation(
opts control.Options, opts control.Options,
kw *kopia.Wrapper, kw *kopia.Wrapper,
sw *store.Wrapper, sw *store.Wrapper,
gc *connector.GraphConnector,
acct account.Account, acct account.Account,
backupID model.StableID, backupID model.StableID,
sel selectors.Selector, sel selectors.Selector,
@ -62,7 +64,7 @@ func NewRestoreOperation(
bus events.Eventer, bus events.Eventer,
) (RestoreOperation, error) { ) (RestoreOperation, error) {
op := RestoreOperation{ op := RestoreOperation{
operation: newOperation(opts, bus, kw, sw), operation: newOperation(opts, bus, kw, sw, gc),
BackupID: backupID, BackupID: backupID,
Selectors: sel, Selectors: sel,
Destination: dest, Destination: dest,
@ -233,16 +235,11 @@ func (op *RestoreOperation) do(
opStats.resourceCount = 1 opStats.resourceCount = 1
opStats.cs = dcs opStats.cs = dcs
gc, err := connectToM365(ctx, op.Selectors, op.account, op.Errors)
if err != nil {
return nil, clues.Wrap(err, "connecting to M365")
}
restoreComplete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Restoring data")) restoreComplete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Restoring data"))
defer closer() defer closer()
defer close(restoreComplete) defer close(restoreComplete)
restoreDetails, err := gc.RestoreDataCollections( restoreDetails, err := op.gc.RestoreDataCollections(
ctx, ctx,
bup.Version, bup.Version,
op.account, op.account,
@ -257,9 +254,9 @@ func (op *RestoreOperation) do(
restoreComplete <- struct{}{} restoreComplete <- struct{}{}
opStats.gc = gc.AwaitStatus() opStats.gc = op.gc.AwaitStatus()
logger.Ctx(ctx).Debug(gc.PrintableStatus()) logger.Ctx(ctx).Debug(op.gc.PrintableStatus())
return restoreDetails, nil return restoreDetails, nil
} }

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/common"
"github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/exchange" "github.com/alcionai/corso/src/internal/connector/exchange"
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/connector/mockconnector"
@ -25,6 +26,7 @@ import (
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/account"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/selectors"
"github.com/alcionai/corso/src/pkg/store" "github.com/alcionai/corso/src/pkg/store"
) )
@ -48,6 +50,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
var ( var (
kw = &kopia.Wrapper{} kw = &kopia.Wrapper{}
sw = &store.Wrapper{} sw = &store.Wrapper{}
gc = &connector.GraphConnector{}
acct = account.Account{} acct = account.Account{}
now = time.Now() now = time.Now()
dest = tester.DefaultTestRestoreDestination() dest = tester.DefaultTestRestoreDestination()
@ -108,6 +111,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
control.Options{}, control.Options{},
kw, kw,
sw, sw,
gc,
acct, acct,
"foo", "foo",
selectors.Selector{DiscreteOwner: "test"}, selectors.Selector{DiscreteOwner: "test"},
@ -137,18 +141,16 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
type bupResults struct { type bupResults struct {
selectorResourceOwners []string selectorResourceOwners []string
resourceOwner string
backupID model.StableID backupID model.StableID
items int items int
gc *connector.GraphConnector
} }
type RestoreOpIntegrationSuite struct { type RestoreOpIntegrationSuite struct {
tester.Suite tester.Suite
exchange bupResults
sharepoint bupResults
kopiaCloser func(ctx context.Context) kopiaCloser func(ctx context.Context)
acct account.Account
kw *kopia.Wrapper kw *kopia.Wrapper
sw *store.Wrapper sw *store.Wrapper
ms *kopia.ModelStore ms *kopia.ModelStore
@ -168,12 +170,12 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() {
var ( var (
t = suite.T() t = suite.T()
m365UserID = tester.M365UserID(t)
acct = tester.NewM365Account(t)
st = tester.NewPrefixedS3Storage(t) st = tester.NewPrefixedS3Storage(t)
k = kopia.NewConn(st) k = kopia.NewConn(st)
) )
suite.acct = tester.NewM365Account(t)
err := k.Initialize(ctx) err := k.Initialize(ctx)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
@ -193,105 +195,6 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() {
sw := store.NewKopiaStore(ms) sw := store.NewKopiaStore(ms)
suite.sw = sw suite.sw = sw
suite.Run("exchange_setup", func() {
var (
t = suite.T()
users = []string{m365UserID}
bsel = selectors.NewExchangeBackup(users)
)
bsel.DiscreteOwner = m365UserID
bsel.Include(
bsel.MailFolders([]string{exchange.DefaultMailFolder}, selectors.PrefixMatch()),
bsel.ContactFolders([]string{exchange.DefaultContactFolder}, selectors.PrefixMatch()),
bsel.EventCalendars([]string{exchange.DefaultCalendar}, selectors.PrefixMatch()),
)
bo, err := NewBackupOperation(
ctx,
control.Options{},
kw,
sw,
acct,
bsel.Selector,
bsel.Selector.DiscreteOwner,
evmock.NewBus())
require.NoError(t, err, clues.ToCore(err))
err = bo.Run(ctx)
require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, bo.Results.BackupID)
suite.exchange = bupResults{
selectorResourceOwners: users,
resourceOwner: m365UserID,
backupID: bo.Results.BackupID,
// Discount metadata collection files (1 delta and one prev path for each category).
// These meta files are used to aid restore, but are not themselves
// restored (ie: counted as writes).
items: bo.Results.ItemsWritten - 6,
}
})
suite.Run("sharepoint_setup", func() {
var (
t = suite.T()
siteID = tester.M365SiteID(t)
sites = []string{siteID}
spsel = selectors.NewSharePointBackup(sites)
)
spsel.DiscreteOwner = siteID
// assume a folder name "test" exists in the drive.
// this is brittle, and requires us to backfill anytime
// the site under test changes, but also prevents explosive
// growth from re-backup/restore of restored files.
spsel.Include(spsel.LibraryFolders([]string{"test"}, selectors.PrefixMatch()))
bo, err := NewBackupOperation(
ctx,
control.Options{},
kw,
sw,
acct,
spsel.Selector,
spsel.Selector.DiscreteOwner,
evmock.NewBus())
require.NoError(t, err, clues.ToCore(err))
// get the count of drives
m365, err := acct.M365Config()
require.NoError(t, err, clues.ToCore(err))
adpt, err := graph.CreateAdapter(
m365.AzureTenantID,
m365.AzureClientID,
m365.AzureClientSecret)
require.NoError(t, err, clues.ToCore(err))
service := graph.NewService(adpt)
spPgr := api.NewSiteDrivePager(service, siteID, []string{"id", "name"})
drives, err := api.GetAllDrives(ctx, spPgr, true, 3)
require.NoError(t, err, clues.ToCore(err))
err = bo.Run(ctx)
require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, bo.Results.BackupID)
suite.sharepoint = bupResults{
selectorResourceOwners: sites,
resourceOwner: siteID,
backupID: bo.Results.BackupID,
// Discount metadata files (1 delta, 1 prev path)
// assume only one folder, and therefore 1 dirmeta per drive
// assume only one file in each folder, and therefore 1 meta per drive.
// These meta files are used to aid restore, but are not themselves
// restored (ie: counted as writes).
items: bo.Results.ItemsWritten - 2 - len(drives) - len(drives),
}
})
} }
func (suite *RestoreOpIntegrationSuite) TearDownSuite() { func (suite *RestoreOpIntegrationSuite) TearDownSuite() {
@ -314,6 +217,7 @@ func (suite *RestoreOpIntegrationSuite) TearDownSuite() {
func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
kw := &kopia.Wrapper{} kw := &kopia.Wrapper{}
sw := &store.Wrapper{} sw := &store.Wrapper{}
gc := &connector.GraphConnector{}
acct := tester.NewM365Account(suite.T()) acct := tester.NewM365Account(suite.T())
dest := tester.DefaultTestRestoreDestination() dest := tester.DefaultTestRestoreDestination()
@ -322,13 +226,15 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
opts control.Options opts control.Options
kw *kopia.Wrapper kw *kopia.Wrapper
sw *store.Wrapper sw *store.Wrapper
gc *connector.GraphConnector
acct account.Account acct account.Account
targets []string targets []string
errCheck assert.ErrorAssertionFunc errCheck assert.ErrorAssertionFunc
}{ }{
{"good", control.Options{}, kw, sw, acct, nil, assert.NoError}, {"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError},
{"missing kopia", control.Options{}, nil, sw, acct, nil, assert.Error}, {"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error},
{"missing modelstore", control.Options{}, kw, nil, acct, nil, assert.Error}, {"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error},
{"missing graphConnector", control.Options{}, kw, sw, nil, acct, nil, assert.Error},
} }
for _, test := range table { for _, test := range table {
suite.Run(test.name, func() { suite.Run(test.name, func() {
@ -340,6 +246,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
test.opts, test.opts,
test.kw, test.kw,
test.sw, test.sw,
test.gc,
test.acct, test.acct,
"backup-id", "backup-id",
selectors.Selector{DiscreteOwner: "test"}, selectors.Selector{DiscreteOwner: "test"},
@ -350,58 +257,195 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
} }
} }
//nolint:lll func setupExchangeBackup(
t *testing.T,
kw *kopia.Wrapper,
sw *store.Wrapper,
acct account.Account,
owner string,
) bupResults {
ctx, flush := tester.NewContext()
defer flush()
var (
users = []string{owner}
bsel = selectors.NewExchangeBackup(users)
)
gc, err := connector.NewGraphConnector(
ctx,
graph.HTTPClient(graph.NoTimeout()),
acct,
connector.Users,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
bsel.DiscreteOwner = owner
bsel.Include(
bsel.MailFolders([]string{exchange.DefaultMailFolder}, selectors.PrefixMatch()),
bsel.ContactFolders([]string{exchange.DefaultContactFolder}, selectors.PrefixMatch()),
bsel.EventCalendars([]string{exchange.DefaultCalendar}, selectors.PrefixMatch()),
)
bo, err := NewBackupOperation(
ctx,
control.Options{},
kw,
sw,
gc,
acct,
bsel.Selector,
bsel.Selector.DiscreteOwner,
evmock.NewBus())
require.NoError(t, err, clues.ToCore(err))
err = bo.Run(ctx)
require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, bo.Results.BackupID)
return bupResults{
selectorResourceOwners: users,
backupID: bo.Results.BackupID,
// Discount metadata collection files (1 delta and one prev path for each category).
// These meta files are used to aid restore, but are not themselves
// restored (ie: counted as writes).
items: bo.Results.ItemsWritten - 6,
gc: gc,
}
}
func setupSharePointBackup(
t *testing.T,
kw *kopia.Wrapper,
sw *store.Wrapper,
acct account.Account,
owner string,
) bupResults {
ctx, flush := tester.NewContext()
defer flush()
var (
sites = []string{owner}
spsel = selectors.NewSharePointBackup(sites)
)
gc, err := connector.NewGraphConnector(
ctx,
graph.HTTPClient(graph.NoTimeout()),
acct,
connector.Sites,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
spsel.DiscreteOwner = owner
// assume a folder name "test" exists in the drive.
// this is brittle, and requires us to backfill anytime
// the site under test changes, but also prevents explosive
// growth from re-backup/restore of restored files.
spsel.Include(spsel.LibraryFolders([]string{"test"}, selectors.PrefixMatch()))
bo, err := NewBackupOperation(
ctx,
control.Options{},
kw,
sw,
gc,
acct,
spsel.Selector,
spsel.Selector.DiscreteOwner,
evmock.NewBus())
require.NoError(t, err, clues.ToCore(err))
// get the count of drives
m365, err := acct.M365Config()
require.NoError(t, err, clues.ToCore(err))
adpt, err := graph.CreateAdapter(
m365.AzureTenantID,
m365.AzureClientID,
m365.AzureClientSecret)
require.NoError(t, err, clues.ToCore(err))
service := graph.NewService(adpt)
spPgr := api.NewSiteDrivePager(service, owner, []string{"id", "name"})
drives, err := api.GetAllDrives(ctx, spPgr, true, 3)
require.NoError(t, err, clues.ToCore(err))
err = bo.Run(ctx)
require.NoError(t, err, clues.ToCore(err))
require.NotEmpty(t, bo.Results.BackupID)
return bupResults{
selectorResourceOwners: sites,
backupID: bo.Results.BackupID,
// Discount metadata files (1 delta, 1 prev path)
// assume only one folder, and therefore 1 dirmeta per drive
// assume only one file in each folder, and therefore 1 meta per drive.
// These meta files are used to aid restore, but are not themselves
// restored (ie: counted as writes).
items: bo.Results.ItemsWritten - 2 - len(drives) - len(drives),
gc: gc,
}
}
func (suite *RestoreOpIntegrationSuite) TestRestore_Run() { func (suite *RestoreOpIntegrationSuite) TestRestore_Run() {
ctx, flush := tester.NewContext() ctx, flush := tester.NewContext()
defer flush() defer flush()
tables := []struct { tables := []struct {
name string name string
bID model.StableID owner string
expectedItems int
dest control.RestoreDestination dest control.RestoreDestination
getSelector func(t *testing.T) selectors.Selector getSelector func(t *testing.T, owners []string) selectors.Selector
cleanup func(t *testing.T, dest string) setup func(t *testing.T, kw *kopia.Wrapper, sw *store.Wrapper, acct account.Account, owner string) bupResults
}{ }{
{ {
name: "Exchange_Restore", name: "Exchange_Restore",
bID: suite.exchange.backupID, owner: tester.M365UserID(suite.T()),
expectedItems: suite.exchange.items,
dest: tester.DefaultTestRestoreDestination(), dest: tester.DefaultTestRestoreDestination(),
getSelector: func(t *testing.T) selectors.Selector { getSelector: func(t *testing.T, owners []string) selectors.Selector {
rsel := selectors.NewExchangeRestore(suite.exchange.selectorResourceOwners) rsel := selectors.NewExchangeRestore(owners)
rsel.Include(rsel.AllData()) rsel.Include(rsel.AllData())
return rsel.Selector return rsel.Selector
}, },
setup: setupExchangeBackup,
}, },
{ {
name: "SharePoint_Restore", name: "SharePoint_Restore",
bID: suite.sharepoint.backupID, owner: tester.M365SiteID(suite.T()),
expectedItems: suite.sharepoint.items,
dest: control.DefaultRestoreDestination(common.SimpleDateTimeOneDrive), dest: control.DefaultRestoreDestination(common.SimpleDateTimeOneDrive),
getSelector: func(t *testing.T) selectors.Selector { getSelector: func(t *testing.T, owners []string) selectors.Selector {
rsel := selectors.NewSharePointRestore(suite.sharepoint.selectorResourceOwners) rsel := selectors.NewSharePointRestore(owners)
rsel.Include(rsel.AllData()) rsel.Include(rsel.AllData())
return rsel.Selector return rsel.Selector
}, },
setup: setupSharePointBackup,
}, },
} }
for _, test := range tables { for _, test := range tables {
suite.Run(test.name, func() { suite.Run(test.name, func() {
t := suite.T() var (
mb := evmock.NewBus() t = suite.T()
mb = evmock.NewBus()
bup = test.setup(t, suite.kw, suite.sw, suite.acct, test.owner)
)
require.NotZero(t, bup.items)
require.NotEmpty(t, bup.backupID)
ro, err := NewRestoreOperation( ro, err := NewRestoreOperation(
ctx, ctx,
control.Options{FailFast: true}, control.Options{FailFast: true},
suite.kw, suite.kw,
suite.sw, suite.sw,
bup.gc,
tester.NewM365Account(t), tester.NewM365Account(t),
test.bID, bup.backupID,
test.getSelector(t), test.getSelector(t, bup.selectorResourceOwners),
test.dest, test.dest,
mb) mb)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
@ -412,38 +456,48 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() {
require.NotEmpty(t, ro.Results, "restoreOp results") require.NotEmpty(t, ro.Results, "restoreOp results")
require.NotNil(t, ds, "restored details") require.NotNil(t, ds, "restored details")
assert.Equal(t, ro.Status, Completed, "restoreOp status") assert.Equal(t, ro.Status, Completed, "restoreOp status")
assert.Equal(t, ro.Results.ItemsWritten, len(ds.Entries), "count of items written matches restored entries in details") assert.Equal(t, ro.Results.ItemsWritten, len(ds.Entries), "item write count matches len details")
assert.Less(t, 0, ro.Results.ItemsRead, "restore items read") assert.Less(t, 0, ro.Results.ItemsRead, "restore items read")
assert.Less(t, int64(0), ro.Results.BytesRead, "bytes read") assert.Less(t, int64(0), ro.Results.BytesRead, "bytes read")
assert.Equal(t, 1, ro.Results.ResourceOwners, "resource Owners") assert.Equal(t, 1, ro.Results.ResourceOwners, "resource Owners")
assert.NoError(t, ro.Errors.Failure(), "non-recoverable error", clues.ToCore(ro.Errors.Failure())) assert.NoError(t, ro.Errors.Failure(), "non-recoverable error", clues.ToCore(ro.Errors.Failure()))
assert.Empty(t, ro.Errors.Recovered(), "recoverable errors") assert.Empty(t, ro.Errors.Recovered(), "recoverable errors")
assert.Equal(t, test.expectedItems, ro.Results.ItemsWritten, "backup and restore wrote the same num of items") assert.Equal(t, bup.items, ro.Results.ItemsWritten, "backup and restore wrote the same num of items")
assert.Equal(t, 1, mb.TimesCalled[events.RestoreStart], "restore-start events") assert.Equal(t, 1, mb.TimesCalled[events.RestoreStart], "restore-start events")
assert.Equal(t, 1, mb.TimesCalled[events.RestoreEnd], "restore-end events") assert.Equal(t, 1, mb.TimesCalled[events.RestoreEnd], "restore-end events")
}) })
} }
} }
func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() { func (suite *RestoreOpIntegrationSuite) TestRestore_Run_errorNoResults() {
ctx, flush := tester.NewContext() ctx, flush := tester.NewContext()
defer flush() defer flush()
t := suite.T() var (
t = suite.T()
dest = tester.DefaultTestRestoreDestination()
mb = evmock.NewBus()
)
rsel := selectors.NewExchangeRestore(selectors.None()) rsel := selectors.NewExchangeRestore(selectors.None())
rsel.Include(rsel.AllData()) rsel.Include(rsel.AllData())
dest := tester.DefaultTestRestoreDestination() gc, err := connector.NewGraphConnector(
mb := evmock.NewBus() ctx,
graph.HTTPClient(graph.NoTimeout()),
suite.acct,
connector.Users,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
ro, err := NewRestoreOperation( ro, err := NewRestoreOperation(
ctx, ctx,
control.Options{}, control.Options{},
suite.kw, suite.kw,
suite.sw, suite.sw,
gc,
tester.NewM365Account(t), tester.NewM365Account(t),
suite.exchange.backupID, "backupID",
rsel.Selector, rsel.Selector,
dest, dest,
mb) mb)
@ -454,6 +508,6 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() {
require.Nil(t, ds, "restoreOp.Run() should not produce details") require.Nil(t, ds, "restoreOp.Run() should not produce details")
assert.Zero(t, ro.Results.ResourceOwners, "resource owners") assert.Zero(t, ro.Results.ResourceOwners, "resource owners")
assert.Zero(t, ro.Results.BytesRead, "bytes read") assert.Zero(t, ro.Results.BytesRead, "bytes read")
assert.Equal(t, 1, mb.TimesCalled[events.RestoreStart], "restore-start events") assert.Zero(t, mb.TimesCalled[events.RestoreStart], "restore-start events")
assert.Zero(t, mb.TimesCalled[events.RestoreEnd], "restore-end events") assert.Zero(t, mb.TimesCalled[events.RestoreEnd], "restore-end events")
} }

View File

@ -9,6 +9,8 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/common/crash" "github.com/alcionai/corso/src/internal/common/crash"
"github.com/alcionai/corso/src/internal/connector"
"github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/onedrive" "github.com/alcionai/corso/src/internal/connector/onedrive"
"github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/events"
"github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/kopia"
@ -283,16 +285,22 @@ func (r *repository) Close(ctx context.Context) error {
// NewBackup generates a BackupOperation runner. // NewBackup generates a BackupOperation runner.
func (r repository) NewBackup( func (r repository) NewBackup(
ctx context.Context, ctx context.Context,
selector selectors.Selector, sel selectors.Selector,
) (operations.BackupOperation, error) { ) (operations.BackupOperation, error) {
gc, err := connectToM365(ctx, sel, r.Account, fault.New(true))
if err != nil {
return operations.BackupOperation{}, errors.Wrap(err, "connecting to m365")
}
return operations.NewBackupOperation( return operations.NewBackupOperation(
ctx, ctx,
r.Opts, r.Opts,
r.dataLayer, r.dataLayer,
store.NewKopiaStore(r.modelStore), store.NewKopiaStore(r.modelStore),
gc,
r.Account, r.Account,
selector, sel,
selector.DiscreteOwner, sel.DiscreteOwner,
r.Bus) r.Bus)
} }
@ -303,11 +311,17 @@ func (r repository) NewRestore(
sel selectors.Selector, sel selectors.Selector,
dest control.RestoreDestination, dest control.RestoreDestination,
) (operations.RestoreOperation, error) { ) (operations.RestoreOperation, error) {
gc, err := connectToM365(ctx, sel, r.Account, fault.New(true))
if err != nil {
return operations.RestoreOperation{}, errors.Wrap(err, "connecting to m365")
}
return operations.NewRestoreOperation( return operations.NewRestoreOperation(
ctx, ctx,
r.Opts, r.Opts,
r.dataLayer, r.dataLayer,
store.NewKopiaStore(r.modelStore), store.NewKopiaStore(r.modelStore),
gc,
r.Account, r.Account,
model.StableID(backupID), model.StableID(backupID),
sel, sel,
@ -542,3 +556,35 @@ func getRepoModel(ctx context.Context, ms *kopia.ModelStore) (*repositoryModel,
func newRepoID(s storage.Storage) string { func newRepoID(s storage.Storage) string {
return uuid.NewString() return uuid.NewString()
} }
// ---------------------------------------------------------------------------
// helpers
// ---------------------------------------------------------------------------
// produces a graph connector.
func connectToM365(
ctx context.Context,
sel selectors.Selector,
acct account.Account,
errs *fault.Bus,
) (*connector.GraphConnector, error) {
complete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Connecting to M365"))
defer func() {
complete <- struct{}{}
close(complete)
closer()
}()
// retrieve data from the producer
resource := connector.Users
if sel.Service == selectors.ServiceSharePoint {
resource = connector.Sites
}
gc, err := connector.NewGraphConnector(ctx, graph.HTTPClient(graph.NoTimeout()), acct, resource, errs)
if err != nil {
return nil, err
}
return gc, nil
}