diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 149ab0ba8..2f8652f81 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -227,11 +227,18 @@ func (gc *GraphConnector) AwaitStatus() *support.ConnectorOperationStatus { defer func() { if gc.region != nil { gc.region.End() + gc.region = nil } }() gc.wg.Wait() - return &gc.status + // clean up and reset statefulness + status := gc.status + + gc.wg = &sync.WaitGroup{} + gc.status = support.ConnectorOperationStatus{} + + return &status } // UpdateStatus is used by gc initiated tasks to indicate completion diff --git a/src/internal/connector/graph_connector_disconnected_test.go b/src/internal/connector/graph_connector_disconnected_test.go index 0d4c6ca30..d92a018e4 100644 --- a/src/internal/connector/graph_connector_disconnected_test.go +++ b/src/internal/connector/graph_connector_disconnected_test.go @@ -111,17 +111,17 @@ func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_Status() { go statusTestTask(&gc, 4, 1, 1) go statusTestTask(&gc, 4, 1, 1) - gc.AwaitStatus() + status := gc.AwaitStatus() t := suite.T() assert.NotEmpty(t, gc.PrintableStatus()) // Expect 8 objects - assert.Equal(t, 8, gc.Status().Metrics.Objects) + assert.Equal(t, 8, status.Metrics.Objects) // Expect 2 success - assert.Equal(t, 2, gc.Status().Metrics.Successes) + assert.Equal(t, 2, status.Metrics.Successes) // Expect 2 folders - assert.Equal(t, 2, gc.Status().Folders) + assert.Equal(t, 2, status.Folders) } func (suite *DisconnectedGraphConnectorSuite) TestVerifyBackupInputs_allServices() { diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index 2ee84257c..dae0ed4cc 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -2,6 +2,8 @@ package connector import ( "context" + "runtime/trace" + "sync" "testing" "time" @@ -13,6 +15,7 @@ import ( "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/mockconnector" + "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/version" @@ -132,6 +135,37 @@ func (suite *GraphConnectorUnitSuite) TestUnionSiteIDsAndWebURLs() { } } +func (suite *GraphConnectorUnitSuite) TestGraphConnector_AwaitStatus() { + ctx, flush := tester.NewContext() + defer flush() + + var ( + t = suite.T() + gc = &GraphConnector{ + wg: &sync.WaitGroup{}, + region: &trace.Region{}, + } + metrics = support.CollectionMetrics{ + Objects: 2, + Successes: 3, + Bytes: 4, + } + status = support.CreateStatus(ctx, support.Backup, 1, metrics, "details") + ) + + gc.wg.Add(1) + gc.UpdateStatus(status) + + result := gc.AwaitStatus() + require.NotNil(t, result) + assert.Nil(t, gc.region, "region") + assert.Empty(t, gc.status, "status") + assert.Equal(t, 1, result.Folders) + assert.Equal(t, 2, result.Metrics.Objects) + assert.Equal(t, 3, result.Metrics.Successes) + assert.Equal(t, int64(4), result.Metrics.Bytes) +} + // --------------------------------------------------------------------------- // Integration tests // --------------------------------------------------------------------------- diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 314db3fa3..89b223cd2 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -60,13 +60,14 @@ func NewBackupOperation( opts control.Options, kw *kopia.Wrapper, sw *store.Wrapper, + gc *connector.GraphConnector, acct account.Account, selector selectors.Selector, ownerName string, bus events.Eventer, ) (BackupOperation, error) { op := BackupOperation{ - operation: newOperation(opts, bus, kw, sw), + operation: newOperation(opts, bus, kw, sw, gc), ResourceOwner: selector.DiscreteOwner, ResourceOwnerName: ownerName, Selectors: selector, @@ -74,6 +75,11 @@ func NewBackupOperation( account: acct, incremental: useIncrementalBackup(selector, opts), } + + if len(ownerName) == 0 { + op.ResourceOwnerName = op.ResourceOwner + } + if err := op.validate(); err != nil { return BackupOperation{}, err } @@ -237,12 +243,7 @@ func (op *BackupOperation) do( return nil, clues.Wrap(err, "producing manifests and metadata") } - gc, err := connectToM365(ctx, op.Selectors, op.account, op.Errors) - if err != nil { - return nil, clues.Wrap(err, "connectng to m365") - } - - cs, excludes, err := produceBackupDataCollections(ctx, gc, op.Selectors, mdColls, op.Options, op.Errors) + cs, excludes, err := produceBackupDataCollections(ctx, op.gc, op.Selectors, mdColls, op.Options, op.Errors) if err != nil { return nil, clues.Wrap(err, "producing backup data collections") } @@ -278,9 +279,9 @@ func (op *BackupOperation) do( return nil, clues.Wrap(err, "merging details") } - opStats.gc = gc.AwaitStatus() + opStats.gc = op.gc.AwaitStatus() - logger.Ctx(ctx).Debug(gc.PrintableStatus()) + logger.Ctx(ctx).Debug(op.gc.PrintableStatus()) return deets, nil } diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index 4a92ccf3c..d7604b008 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -59,12 +59,21 @@ func prepNewTestBackupOp( bus events.Eventer, sel selectors.Selector, featureToggles control.Toggles, -) (BackupOperation, account.Account, *kopia.Wrapper, *kopia.ModelStore, func()) { +) ( + BackupOperation, + account.Account, + *kopia.Wrapper, + *kopia.ModelStore, + *connector.GraphConnector, + func(), +) { //revive:enable:context-as-argument - acct := tester.NewM365Account(t) - // need to initialize the repository before we can test connecting to it. - st := tester.NewPrefixedS3Storage(t) - k := kopia.NewConn(st) + var ( + acct = tester.NewM365Account(t) + // need to initialize the repository before we can test connecting to it. + st = tester.NewPrefixedS3Storage(t) + k = kopia.NewConn(st) + ) err := k.Initialize(ctx) require.NoError(t, err, clues.ToCore(err)) @@ -96,9 +105,25 @@ func prepNewTestBackupOp( ms.Close(ctx) } - bo := newTestBackupOp(t, ctx, kw, ms, acct, sel, bus, featureToggles, closer) + connectorResource := connector.Users + if sel.Service == selectors.ServiceSharePoint { + connectorResource = connector.Sites + } - return bo, acct, kw, ms, closer + gc, err := connector.NewGraphConnector( + ctx, + graph.HTTPClient(graph.NoTimeout()), + acct, + connectorResource, + fault.New(true)) + if !assert.NoError(t, err, clues.ToCore(err)) { + closer() + t.FailNow() + } + + bo := newTestBackupOp(t, ctx, kw, ms, gc, acct, sel, bus, featureToggles, closer) + + return bo, acct, kw, ms, gc, closer } // newTestBackupOp accepts the clients required to compose a backup operation, plus @@ -112,6 +137,7 @@ func newTestBackupOp( ctx context.Context, kw *kopia.Wrapper, ms *kopia.ModelStore, + gc *connector.GraphConnector, acct account.Account, sel selectors.Selector, bus events.Eventer, @@ -126,7 +152,7 @@ func newTestBackupOp( opts.ToggleFeatures = featureToggles - bo, err := NewBackupOperation(ctx, opts, kw, sw, acct, sel, sel.DiscreteOwner, bus) + bo, err := NewBackupOperation(ctx, opts, kw, sw, gc, acct, sel, sel.DiscreteOwner, bus) if !assert.NoError(t, err, clues.ToCore(err)) { closer() t.FailNow() @@ -141,20 +167,28 @@ func runAndCheckBackup( ctx context.Context, bo *BackupOperation, mb *evmock.Bus, + acceptNoData bool, ) { //revive:enable:context-as-argument err := bo.Run(ctx) require.NoError(t, err, clues.ToCore(err)) require.NotEmpty(t, bo.Results, "the backup had non-zero results") require.NotEmpty(t, bo.Results.BackupID, "the backup generated an ID") - require.Equalf( - t, - Completed, - bo.Status, - "backup status should be Completed, got %s", - bo.Status) - require.Less(t, 0, bo.Results.ItemsWritten) + expectStatus := []opStatus{Completed} + if acceptNoData { + expectStatus = append(expectStatus, NoData) + } + + require.Contains( + t, + expectStatus, + bo.Status, + "backup doesn't match expectation, wanted any of %v, got %s", + expectStatus, + bo.Status) + + require.Less(t, 0, bo.Results.ItemsWritten) assert.Less(t, 0, bo.Results.ItemsRead, "count of items read") assert.Less(t, int64(0), bo.Results.BytesRead, "bytes read") assert.Less(t, int64(0), bo.Results.BytesUploaded, "bytes uploaded") @@ -360,6 +394,8 @@ func generateContainerOfItems( fault.New(true)) require.NoError(t, err, clues.ToCore(err)) + gc.AwaitStatus() + return deets } @@ -503,6 +539,7 @@ func (suite *BackupOpIntegrationSuite) SetupSuite() { func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { kw := &kopia.Wrapper{} sw := &store.Wrapper{} + gc := &connector.GraphConnector{} acct := tester.NewM365Account(suite.T()) table := []struct { @@ -510,13 +547,15 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { opts control.Options kw *kopia.Wrapper sw *store.Wrapper + gc *connector.GraphConnector acct account.Account targets []string errCheck assert.ErrorAssertionFunc }{ - {"good", control.Options{}, kw, sw, acct, nil, assert.NoError}, - {"missing kopia", control.Options{}, nil, sw, acct, nil, assert.Error}, - {"missing modelstore", control.Options{}, kw, nil, acct, nil, assert.Error}, + {"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError}, + {"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error}, + {"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error}, + {"missing graphconnector", control.Options{}, kw, sw, nil, acct, nil, assert.Error}, } for _, test := range table { suite.Run(test.name, func() { @@ -528,6 +567,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { test.opts, test.kw, test.sw, + test.gc, test.acct, selectors.Selector{DiscreteOwner: "test"}, "test-name", @@ -604,14 +644,14 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() { ffs = control.Toggles{} ) - bo, acct, kw, ms, closer := prepNewTestBackupOp(t, ctx, mb, sel, ffs) + bo, acct, kw, ms, gc, closer := prepNewTestBackupOp(t, ctx, mb, sel, ffs) defer closer() m365, err := acct.M365Config() require.NoError(t, err, clues.ToCore(err)) // run the tests - runAndCheckBackup(t, ctx, &bo, mb) + runAndCheckBackup(t, ctx, &bo, mb, false) checkBackupIsInManifests(t, ctx, kw, &bo, sel, test.resourceOwner, test.category) checkMetadataFilesExist( t, @@ -622,8 +662,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() { m365.AzureTenantID, test.resourceOwner, path.ExchangeService, - map[path.CategoryType][]string{test.category: test.metadataFiles}, - ) + map[path.CategoryType][]string{test.category: test.metadataFiles}) if !test.runIncremental { return @@ -634,10 +673,10 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() { // produces fewer results than the last backup. var ( incMB = evmock.NewBus() - incBO = newTestBackupOp(t, ctx, kw, ms, acct, sel, incMB, ffs, closer) + incBO = newTestBackupOp(t, ctx, kw, ms, gc, acct, sel, incMB, ffs, closer) ) - runAndCheckBackup(t, ctx, &incBO, incMB) + runAndCheckBackup(t, ctx, &incBO, incMB, true) checkBackupIsInManifests(t, ctx, kw, &incBO, sel, test.resourceOwner, test.category) checkMetadataFilesExist( t, @@ -648,8 +687,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchange() { m365.AzureTenantID, test.resourceOwner, path.ExchangeService, - map[path.CategoryType][]string{test.category: test.metadataFiles}, - ) + map[path.CategoryType][]string{test.category: test.metadataFiles}) // do some additional checks to ensure the incremental dealt with fewer items. assert.Greater(t, bo.Results.ItemsWritten, incBO.Results.ItemsWritten, "incremental items written") @@ -826,11 +864,11 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchangeIncrementals() { sel.ContactFolders(containers, selectors.PrefixMatch()), ) - bo, _, kw, ms, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, ffs) + bo, _, kw, ms, gc, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, ffs) defer closer() // run the initial backup - runAndCheckBackup(t, ctx, &bo, mb) + runAndCheckBackup(t, ctx, &bo, mb, false) // Although established as a table, these tests are no isolated from each other. // Assume that every test's side effects cascade to all following test cases. @@ -1057,10 +1095,11 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_exchangeIncrementals() { } for _, test := range table { suite.Run(test.name, func() { + fmt.Printf("\n-----\ntest %+v\n-----\n", test.name) var ( t = suite.T() incMB = evmock.NewBus() - incBO = newTestBackupOp(t, ctx, kw, ms, acct, sel.Selector, incMB, ffs, closer) + incBO = newTestBackupOp(t, ctx, kw, ms, gc, acct, sel.Selector, incMB, ffs, closer) ) test.updateUserData(t) @@ -1111,10 +1150,10 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_oneDrive() { sel.Include(sel.AllData()) - bo, _, _, _, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, control.Toggles{EnablePermissionsBackup: true}) + bo, _, _, _, _, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, control.Toggles{EnablePermissionsBackup: true}) defer closer() - runAndCheckBackup(t, ctx, &bo, mb) + runAndCheckBackup(t, ctx, &bo, mb, false) } // TestBackup_Run ensures that Integration Testing works for OneDrive @@ -1206,11 +1245,11 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_oneDriveIncrementals() { sel := selectors.NewOneDriveBackup(owners) sel.Include(sel.Folders(containers, selectors.PrefixMatch())) - bo, _, kw, ms, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, ffs) + bo, _, kw, ms, gc, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, ffs) defer closer() // run the initial backup - runAndCheckBackup(t, ctx, &bo, mb) + runAndCheckBackup(t, ctx, &bo, mb, false) var ( newFile models.DriveItemable @@ -1513,7 +1552,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_oneDriveIncrementals() { var ( t = suite.T() incMB = evmock.NewBus() - incBO = newTestBackupOp(t, ctx, kw, ms, acct, sel.Selector, incMB, ffs, closer) + incBO = newTestBackupOp(t, ctx, kw, ms, gc, acct, sel.Selector, incMB, ffs, closer) ) tester.LogTimeOfTest(suite.T()) @@ -1566,9 +1605,9 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run_sharePoint() { sel.Include(sel.LibraryFolders(selectors.Any())) - bo, _, kw, _, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, control.Toggles{}) + bo, _, kw, _, _, closer := prepNewTestBackupOp(t, ctx, mb, sel.Selector, control.Toggles{}) defer closer() - runAndCheckBackup(t, ctx, &bo, mb) + runAndCheckBackup(t, ctx, &bo, mb, false) checkBackupIsInManifests(t, ctx, kw, &bo, sel.Selector, suite.site, path.LibrariesCategory) } diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index c18c2403d..74641704a 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" evmock "github.com/alcionai/corso/src/internal/events/mock" @@ -359,6 +360,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() { var ( kw = &kopia.Wrapper{} sw = &store.Wrapper{} + gc = &connector.GraphConnector{} acct = account.Account{} now = time.Now() ) @@ -413,6 +415,7 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() { control.Options{}, kw, sw, + gc, acct, sel, sel.DiscreteOwner, diff --git a/src/internal/operations/operation.go b/src/internal/operations/operation.go index 8136c0905..8dc27c87a 100644 --- a/src/internal/operations/operation.go +++ b/src/internal/operations/operation.go @@ -1,20 +1,15 @@ package operations import ( - "context" "time" "github.com/alcionai/clues" "github.com/alcionai/corso/src/internal/connector" - "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" - "github.com/alcionai/corso/src/internal/observe" - "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/fault" - "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/store" ) @@ -62,6 +57,7 @@ type operation struct { bus events.Eventer kopia *kopia.Wrapper store *store.Wrapper + gc *connector.GraphConnector } func newOperation( @@ -69,6 +65,7 @@ func newOperation( bus events.Eventer, kw *kopia.Wrapper, sw *store.Wrapper, + gc *connector.GraphConnector, ) operation { return operation{ CreatedAt: time.Now(), @@ -78,6 +75,7 @@ func newOperation( bus: bus, kopia: kw, store: sw, + gc: gc, Status: InProgress, } @@ -92,33 +90,9 @@ func (op operation) validate() error { return clues.New("missing modelstore") } + if op.gc == nil { + return clues.New("missing graph connector") + } + return nil } - -// produces a graph connector. -func connectToM365( - ctx context.Context, - sel selectors.Selector, - acct account.Account, - errs *fault.Bus, -) (*connector.GraphConnector, error) { - complete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Connecting to M365")) - defer func() { - complete <- struct{}{} - close(complete) - closer() - }() - - // retrieve data from the producer - resource := connector.Users - if sel.Service == selectors.ServiceSharePoint { - resource = connector.Sites - } - - gc, err := connector.NewGraphConnector(ctx, graph.HTTPClient(graph.NoTimeout()), acct, resource, errs) - if err != nil { - return nil, err - } - - return gc, nil -} diff --git a/src/internal/operations/operation_test.go b/src/internal/operations/operation_test.go index 27cf6185f..5d0425022 100644 --- a/src/internal/operations/operation_test.go +++ b/src/internal/operations/operation_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/tester" @@ -25,27 +26,30 @@ func TestOperationSuite(t *testing.T) { func (suite *OperationSuite) TestNewOperation() { t := suite.T() - op := newOperation(control.Options{}, events.Bus{}, nil, nil) + op := newOperation(control.Options{}, events.Bus{}, nil, nil, nil) assert.Greater(t, op.CreatedAt, time.Time{}) } func (suite *OperationSuite) TestOperation_Validate() { kwStub := &kopia.Wrapper{} swStub := &store.Wrapper{} + gcStub := &connector.GraphConnector{} table := []struct { name string kw *kopia.Wrapper sw *store.Wrapper + gc *connector.GraphConnector errCheck assert.ErrorAssertionFunc }{ - {"good", kwStub, swStub, assert.NoError}, - {"missing kopia wrapper", nil, swStub, assert.Error}, - {"missing store wrapper", kwStub, nil, assert.Error}, + {"good", kwStub, swStub, gcStub, assert.NoError}, + {"missing kopia wrapper", nil, swStub, gcStub, assert.Error}, + {"missing store wrapper", kwStub, nil, gcStub, assert.Error}, + {"missing graph connector", kwStub, swStub, nil, assert.Error}, } for _, test := range table { suite.Run(test.name, func() { - err := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw).validate() + err := newOperation(control.Options{}, events.Bus{}, test.kw, test.sw, test.gc).validate() test.errCheck(suite.T(), err, clues.ToCore(err)) }) } diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 2254c0d3e..0365765af 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -10,6 +10,7 @@ import ( "github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/internal/common/crash" + "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector/onedrive" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" @@ -55,6 +56,7 @@ func NewRestoreOperation( opts control.Options, kw *kopia.Wrapper, sw *store.Wrapper, + gc *connector.GraphConnector, acct account.Account, backupID model.StableID, sel selectors.Selector, @@ -62,7 +64,7 @@ func NewRestoreOperation( bus events.Eventer, ) (RestoreOperation, error) { op := RestoreOperation{ - operation: newOperation(opts, bus, kw, sw), + operation: newOperation(opts, bus, kw, sw, gc), BackupID: backupID, Selectors: sel, Destination: dest, @@ -233,16 +235,11 @@ func (op *RestoreOperation) do( opStats.resourceCount = 1 opStats.cs = dcs - gc, err := connectToM365(ctx, op.Selectors, op.account, op.Errors) - if err != nil { - return nil, clues.Wrap(err, "connecting to M365") - } - restoreComplete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Restoring data")) defer closer() defer close(restoreComplete) - restoreDetails, err := gc.RestoreDataCollections( + restoreDetails, err := op.gc.RestoreDataCollections( ctx, bup.Version, op.account, @@ -257,9 +254,9 @@ func (op *RestoreOperation) do( restoreComplete <- struct{}{} - opStats.gc = gc.AwaitStatus() + opStats.gc = op.gc.AwaitStatus() - logger.Ctx(ctx).Debug(gc.PrintableStatus()) + logger.Ctx(ctx).Debug(op.gc.PrintableStatus()) return restoreDetails, nil } diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index d0d10b104..0ffbf587e 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/alcionai/corso/src/internal/common" + "github.com/alcionai/corso/src/internal/connector" "github.com/alcionai/corso/src/internal/connector/exchange" "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/mockconnector" @@ -25,6 +26,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" + "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/selectors" "github.com/alcionai/corso/src/pkg/store" ) @@ -48,6 +50,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { var ( kw = &kopia.Wrapper{} sw = &store.Wrapper{} + gc = &connector.GraphConnector{} acct = account.Account{} now = time.Now() dest = tester.DefaultTestRestoreDestination() @@ -108,6 +111,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { control.Options{}, kw, sw, + gc, acct, "foo", selectors.Selector{DiscreteOwner: "test"}, @@ -137,18 +141,16 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { type bupResults struct { selectorResourceOwners []string - resourceOwner string backupID model.StableID items int + gc *connector.GraphConnector } type RestoreOpIntegrationSuite struct { tester.Suite - exchange bupResults - sharepoint bupResults - kopiaCloser func(ctx context.Context) + acct account.Account kw *kopia.Wrapper sw *store.Wrapper ms *kopia.ModelStore @@ -167,13 +169,13 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() { defer flush() var ( - t = suite.T() - m365UserID = tester.M365UserID(t) - acct = tester.NewM365Account(t) - st = tester.NewPrefixedS3Storage(t) - k = kopia.NewConn(st) + t = suite.T() + st = tester.NewPrefixedS3Storage(t) + k = kopia.NewConn(st) ) + suite.acct = tester.NewM365Account(t) + err := k.Initialize(ctx) require.NoError(t, err, clues.ToCore(err)) @@ -193,105 +195,6 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() { sw := store.NewKopiaStore(ms) suite.sw = sw - - suite.Run("exchange_setup", func() { - var ( - t = suite.T() - users = []string{m365UserID} - bsel = selectors.NewExchangeBackup(users) - ) - - bsel.DiscreteOwner = m365UserID - bsel.Include( - bsel.MailFolders([]string{exchange.DefaultMailFolder}, selectors.PrefixMatch()), - bsel.ContactFolders([]string{exchange.DefaultContactFolder}, selectors.PrefixMatch()), - bsel.EventCalendars([]string{exchange.DefaultCalendar}, selectors.PrefixMatch()), - ) - - bo, err := NewBackupOperation( - ctx, - control.Options{}, - kw, - sw, - acct, - bsel.Selector, - bsel.Selector.DiscreteOwner, - evmock.NewBus()) - require.NoError(t, err, clues.ToCore(err)) - - err = bo.Run(ctx) - require.NoError(t, err, clues.ToCore(err)) - require.NotEmpty(t, bo.Results.BackupID) - - suite.exchange = bupResults{ - selectorResourceOwners: users, - resourceOwner: m365UserID, - backupID: bo.Results.BackupID, - // Discount metadata collection files (1 delta and one prev path for each category). - // These meta files are used to aid restore, but are not themselves - // restored (ie: counted as writes). - items: bo.Results.ItemsWritten - 6, - } - }) - - suite.Run("sharepoint_setup", func() { - var ( - t = suite.T() - siteID = tester.M365SiteID(t) - sites = []string{siteID} - spsel = selectors.NewSharePointBackup(sites) - ) - - spsel.DiscreteOwner = siteID - // assume a folder name "test" exists in the drive. - // this is brittle, and requires us to backfill anytime - // the site under test changes, but also prevents explosive - // growth from re-backup/restore of restored files. - spsel.Include(spsel.LibraryFolders([]string{"test"}, selectors.PrefixMatch())) - - bo, err := NewBackupOperation( - ctx, - control.Options{}, - kw, - sw, - acct, - spsel.Selector, - spsel.Selector.DiscreteOwner, - evmock.NewBus()) - require.NoError(t, err, clues.ToCore(err)) - - // get the count of drives - m365, err := acct.M365Config() - require.NoError(t, err, clues.ToCore(err)) - - adpt, err := graph.CreateAdapter( - m365.AzureTenantID, - m365.AzureClientID, - m365.AzureClientSecret) - require.NoError(t, err, clues.ToCore(err)) - - service := graph.NewService(adpt) - spPgr := api.NewSiteDrivePager(service, siteID, []string{"id", "name"}) - - drives, err := api.GetAllDrives(ctx, spPgr, true, 3) - require.NoError(t, err, clues.ToCore(err)) - - err = bo.Run(ctx) - require.NoError(t, err, clues.ToCore(err)) - require.NotEmpty(t, bo.Results.BackupID) - - suite.sharepoint = bupResults{ - selectorResourceOwners: sites, - resourceOwner: siteID, - backupID: bo.Results.BackupID, - // Discount metadata files (1 delta, 1 prev path) - // assume only one folder, and therefore 1 dirmeta per drive - // assume only one file in each folder, and therefore 1 meta per drive. - // These meta files are used to aid restore, but are not themselves - // restored (ie: counted as writes). - items: bo.Results.ItemsWritten - 2 - len(drives) - len(drives), - } - }) } func (suite *RestoreOpIntegrationSuite) TearDownSuite() { @@ -314,6 +217,7 @@ func (suite *RestoreOpIntegrationSuite) TearDownSuite() { func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { kw := &kopia.Wrapper{} sw := &store.Wrapper{} + gc := &connector.GraphConnector{} acct := tester.NewM365Account(suite.T()) dest := tester.DefaultTestRestoreDestination() @@ -322,13 +226,15 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { opts control.Options kw *kopia.Wrapper sw *store.Wrapper + gc *connector.GraphConnector acct account.Account targets []string errCheck assert.ErrorAssertionFunc }{ - {"good", control.Options{}, kw, sw, acct, nil, assert.NoError}, - {"missing kopia", control.Options{}, nil, sw, acct, nil, assert.Error}, - {"missing modelstore", control.Options{}, kw, nil, acct, nil, assert.Error}, + {"good", control.Options{}, kw, sw, gc, acct, nil, assert.NoError}, + {"missing kopia", control.Options{}, nil, sw, gc, acct, nil, assert.Error}, + {"missing modelstore", control.Options{}, kw, nil, gc, acct, nil, assert.Error}, + {"missing graphConnector", control.Options{}, kw, sw, nil, acct, nil, assert.Error}, } for _, test := range table { suite.Run(test.name, func() { @@ -340,6 +246,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { test.opts, test.kw, test.sw, + test.gc, test.acct, "backup-id", selectors.Selector{DiscreteOwner: "test"}, @@ -350,58 +257,195 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { } } -//nolint:lll +func setupExchangeBackup( + t *testing.T, + kw *kopia.Wrapper, + sw *store.Wrapper, + acct account.Account, + owner string, +) bupResults { + ctx, flush := tester.NewContext() + defer flush() + + var ( + users = []string{owner} + bsel = selectors.NewExchangeBackup(users) + ) + + gc, err := connector.NewGraphConnector( + ctx, + graph.HTTPClient(graph.NoTimeout()), + acct, + connector.Users, + fault.New(true)) + require.NoError(t, err, clues.ToCore(err)) + + bsel.DiscreteOwner = owner + bsel.Include( + bsel.MailFolders([]string{exchange.DefaultMailFolder}, selectors.PrefixMatch()), + bsel.ContactFolders([]string{exchange.DefaultContactFolder}, selectors.PrefixMatch()), + bsel.EventCalendars([]string{exchange.DefaultCalendar}, selectors.PrefixMatch()), + ) + + bo, err := NewBackupOperation( + ctx, + control.Options{}, + kw, + sw, + gc, + acct, + bsel.Selector, + bsel.Selector.DiscreteOwner, + evmock.NewBus()) + require.NoError(t, err, clues.ToCore(err)) + + err = bo.Run(ctx) + require.NoError(t, err, clues.ToCore(err)) + require.NotEmpty(t, bo.Results.BackupID) + + return bupResults{ + selectorResourceOwners: users, + backupID: bo.Results.BackupID, + // Discount metadata collection files (1 delta and one prev path for each category). + // These meta files are used to aid restore, but are not themselves + // restored (ie: counted as writes). + items: bo.Results.ItemsWritten - 6, + gc: gc, + } +} + +func setupSharePointBackup( + t *testing.T, + kw *kopia.Wrapper, + sw *store.Wrapper, + acct account.Account, + owner string, +) bupResults { + ctx, flush := tester.NewContext() + defer flush() + + var ( + sites = []string{owner} + spsel = selectors.NewSharePointBackup(sites) + ) + + gc, err := connector.NewGraphConnector( + ctx, + graph.HTTPClient(graph.NoTimeout()), + acct, + connector.Sites, + fault.New(true)) + require.NoError(t, err, clues.ToCore(err)) + + spsel.DiscreteOwner = owner + // assume a folder name "test" exists in the drive. + // this is brittle, and requires us to backfill anytime + // the site under test changes, but also prevents explosive + // growth from re-backup/restore of restored files. + spsel.Include(spsel.LibraryFolders([]string{"test"}, selectors.PrefixMatch())) + + bo, err := NewBackupOperation( + ctx, + control.Options{}, + kw, + sw, + gc, + acct, + spsel.Selector, + spsel.Selector.DiscreteOwner, + evmock.NewBus()) + require.NoError(t, err, clues.ToCore(err)) + + // get the count of drives + m365, err := acct.M365Config() + require.NoError(t, err, clues.ToCore(err)) + + adpt, err := graph.CreateAdapter( + m365.AzureTenantID, + m365.AzureClientID, + m365.AzureClientSecret) + require.NoError(t, err, clues.ToCore(err)) + + service := graph.NewService(adpt) + spPgr := api.NewSiteDrivePager(service, owner, []string{"id", "name"}) + + drives, err := api.GetAllDrives(ctx, spPgr, true, 3) + require.NoError(t, err, clues.ToCore(err)) + + err = bo.Run(ctx) + require.NoError(t, err, clues.ToCore(err)) + require.NotEmpty(t, bo.Results.BackupID) + + return bupResults{ + selectorResourceOwners: sites, + backupID: bo.Results.BackupID, + // Discount metadata files (1 delta, 1 prev path) + // assume only one folder, and therefore 1 dirmeta per drive + // assume only one file in each folder, and therefore 1 meta per drive. + // These meta files are used to aid restore, but are not themselves + // restored (ie: counted as writes). + items: bo.Results.ItemsWritten - 2 - len(drives) - len(drives), + gc: gc, + } +} + func (suite *RestoreOpIntegrationSuite) TestRestore_Run() { ctx, flush := tester.NewContext() defer flush() tables := []struct { - name string - bID model.StableID - expectedItems int - dest control.RestoreDestination - getSelector func(t *testing.T) selectors.Selector - cleanup func(t *testing.T, dest string) + name string + owner string + dest control.RestoreDestination + getSelector func(t *testing.T, owners []string) selectors.Selector + setup func(t *testing.T, kw *kopia.Wrapper, sw *store.Wrapper, acct account.Account, owner string) bupResults }{ { - name: "Exchange_Restore", - bID: suite.exchange.backupID, - expectedItems: suite.exchange.items, - dest: tester.DefaultTestRestoreDestination(), - getSelector: func(t *testing.T) selectors.Selector { - rsel := selectors.NewExchangeRestore(suite.exchange.selectorResourceOwners) + name: "Exchange_Restore", + owner: tester.M365UserID(suite.T()), + dest: tester.DefaultTestRestoreDestination(), + getSelector: func(t *testing.T, owners []string) selectors.Selector { + rsel := selectors.NewExchangeRestore(owners) rsel.Include(rsel.AllData()) return rsel.Selector }, + setup: setupExchangeBackup, }, { - name: "SharePoint_Restore", - bID: suite.sharepoint.backupID, - expectedItems: suite.sharepoint.items, - dest: control.DefaultRestoreDestination(common.SimpleDateTimeOneDrive), - getSelector: func(t *testing.T) selectors.Selector { - rsel := selectors.NewSharePointRestore(suite.sharepoint.selectorResourceOwners) + name: "SharePoint_Restore", + owner: tester.M365SiteID(suite.T()), + dest: control.DefaultRestoreDestination(common.SimpleDateTimeOneDrive), + getSelector: func(t *testing.T, owners []string) selectors.Selector { + rsel := selectors.NewSharePointRestore(owners) rsel.Include(rsel.AllData()) return rsel.Selector }, + setup: setupSharePointBackup, }, } for _, test := range tables { suite.Run(test.name, func() { - t := suite.T() - mb := evmock.NewBus() + var ( + t = suite.T() + mb = evmock.NewBus() + bup = test.setup(t, suite.kw, suite.sw, suite.acct, test.owner) + ) + + require.NotZero(t, bup.items) + require.NotEmpty(t, bup.backupID) ro, err := NewRestoreOperation( ctx, control.Options{FailFast: true}, suite.kw, suite.sw, + bup.gc, tester.NewM365Account(t), - test.bID, - test.getSelector(t), + bup.backupID, + test.getSelector(t, bup.selectorResourceOwners), test.dest, mb) require.NoError(t, err, clues.ToCore(err)) @@ -412,38 +456,48 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run() { require.NotEmpty(t, ro.Results, "restoreOp results") require.NotNil(t, ds, "restored details") assert.Equal(t, ro.Status, Completed, "restoreOp status") - assert.Equal(t, ro.Results.ItemsWritten, len(ds.Entries), "count of items written matches restored entries in details") + assert.Equal(t, ro.Results.ItemsWritten, len(ds.Entries), "item write count matches len details") assert.Less(t, 0, ro.Results.ItemsRead, "restore items read") assert.Less(t, int64(0), ro.Results.BytesRead, "bytes read") assert.Equal(t, 1, ro.Results.ResourceOwners, "resource Owners") assert.NoError(t, ro.Errors.Failure(), "non-recoverable error", clues.ToCore(ro.Errors.Failure())) assert.Empty(t, ro.Errors.Recovered(), "recoverable errors") - assert.Equal(t, test.expectedItems, ro.Results.ItemsWritten, "backup and restore wrote the same num of items") + assert.Equal(t, bup.items, ro.Results.ItemsWritten, "backup and restore wrote the same num of items") assert.Equal(t, 1, mb.TimesCalled[events.RestoreStart], "restore-start events") assert.Equal(t, 1, mb.TimesCalled[events.RestoreEnd], "restore-end events") }) } } -func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() { +func (suite *RestoreOpIntegrationSuite) TestRestore_Run_errorNoResults() { ctx, flush := tester.NewContext() defer flush() - t := suite.T() + var ( + t = suite.T() + dest = tester.DefaultTestRestoreDestination() + mb = evmock.NewBus() + ) rsel := selectors.NewExchangeRestore(selectors.None()) rsel.Include(rsel.AllData()) - dest := tester.DefaultTestRestoreDestination() - mb := evmock.NewBus() + gc, err := connector.NewGraphConnector( + ctx, + graph.HTTPClient(graph.NoTimeout()), + suite.acct, + connector.Users, + fault.New(true)) + require.NoError(t, err, clues.ToCore(err)) ro, err := NewRestoreOperation( ctx, control.Options{}, suite.kw, suite.sw, + gc, tester.NewM365Account(t), - suite.exchange.backupID, + "backupID", rsel.Selector, dest, mb) @@ -454,6 +508,6 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_ErrorNoResults() { require.Nil(t, ds, "restoreOp.Run() should not produce details") assert.Zero(t, ro.Results.ResourceOwners, "resource owners") assert.Zero(t, ro.Results.BytesRead, "bytes read") - assert.Equal(t, 1, mb.TimesCalled[events.RestoreStart], "restore-start events") + assert.Zero(t, mb.TimesCalled[events.RestoreStart], "restore-start events") assert.Zero(t, mb.TimesCalled[events.RestoreEnd], "restore-end events") } diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index fab25266f..d2c6e3aaa 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -9,6 +9,8 @@ import ( "github.com/pkg/errors" "github.com/alcionai/corso/src/internal/common/crash" + "github.com/alcionai/corso/src/internal/connector" + "github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/onedrive" "github.com/alcionai/corso/src/internal/events" "github.com/alcionai/corso/src/internal/kopia" @@ -283,16 +285,22 @@ func (r *repository) Close(ctx context.Context) error { // NewBackup generates a BackupOperation runner. func (r repository) NewBackup( ctx context.Context, - selector selectors.Selector, + sel selectors.Selector, ) (operations.BackupOperation, error) { + gc, err := connectToM365(ctx, sel, r.Account, fault.New(true)) + if err != nil { + return operations.BackupOperation{}, errors.Wrap(err, "connecting to m365") + } + return operations.NewBackupOperation( ctx, r.Opts, r.dataLayer, store.NewKopiaStore(r.modelStore), + gc, r.Account, - selector, - selector.DiscreteOwner, + sel, + sel.DiscreteOwner, r.Bus) } @@ -303,11 +311,17 @@ func (r repository) NewRestore( sel selectors.Selector, dest control.RestoreDestination, ) (operations.RestoreOperation, error) { + gc, err := connectToM365(ctx, sel, r.Account, fault.New(true)) + if err != nil { + return operations.RestoreOperation{}, errors.Wrap(err, "connecting to m365") + } + return operations.NewRestoreOperation( ctx, r.Opts, r.dataLayer, store.NewKopiaStore(r.modelStore), + gc, r.Account, model.StableID(backupID), sel, @@ -542,3 +556,35 @@ func getRepoModel(ctx context.Context, ms *kopia.ModelStore) (*repositoryModel, func newRepoID(s storage.Storage) string { return uuid.NewString() } + +// --------------------------------------------------------------------------- +// helpers +// --------------------------------------------------------------------------- + +// produces a graph connector. +func connectToM365( + ctx context.Context, + sel selectors.Selector, + acct account.Account, + errs *fault.Bus, +) (*connector.GraphConnector, error) { + complete, closer := observe.MessageWithCompletion(ctx, observe.Safe("Connecting to M365")) + defer func() { + complete <- struct{}{} + close(complete) + closer() + }() + + // retrieve data from the producer + resource := connector.Users + if sel.Service == selectors.ServiceSharePoint { + resource = connector.Sites + } + + gc, err := connector.NewGraphConnector(ctx, graph.HTTPClient(graph.NoTimeout()), acct, resource, errs) + if err != nil { + return nil, err + } + + return gc, nil +}