Use service-handler for restore operations (#4714)

Switch restore code path to use service-level handlers. Does a few
things:
* switches existing service-level functions to be methods on
  service-level handlers
* update interfaces as necessary
* moves some logic from old controller-level restore function to either
  the new handlers or the operation-level function
* removes old code

May be easiest to review by commit

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)

* #4254

#### Test Plan

- [ ] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
ashmrtn 2023-11-28 08:47:45 -08:00 committed by GitHub
parent 1be4c4dbe7
commit d36636285a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 208 additions and 163 deletions

View File

@ -111,7 +111,12 @@ func generateAndRestoreItems(
Selector: sel,
}
deets, _, err := ctrl.ConsumeRestoreCollections(
handler, err := ctrl.NewServiceHandler(opts, service)
if err != nil {
return nil, clues.Stack(err)
}
deets, _, err := handler.ConsumeRestoreCollections(
ctx,
rcc,
dataColls,
@ -428,13 +433,6 @@ func generateAndRestoreDriveItems(
return nil, err
}
// collections := getCollections(
// service,
// tenantID,
// []string{resourceOwner},
// input,
// version.Backup)
opts := control.DefaultOptions()
restoreCfg.IncludePermissions = true
@ -462,7 +460,12 @@ func generateAndRestoreDriveItems(
Selector: sel,
}
deets, _, err := ctrl.ConsumeRestoreCollections(
handler, err := ctrl.NewServiceHandler(opts, service)
if err != nil {
return nil, clues.Stack(err)
}
deets, _, err := handler.ConsumeRestoreCollections(
ctx,
rcc,
collections,

View File

@ -11,7 +11,10 @@ import (
"github.com/alcionai/corso/src/pkg/path"
)
var ErrNotFound = clues.New("not found")
var (
ErrNotFound = clues.New("not found")
ErrNoData = clues.New("no data")
)
type CollectionState int

View File

@ -26,7 +26,6 @@ var ErrNoResourceLookup = clues.New("missing resource lookup client")
// must comply with BackupProducer and RestoreConsumer
var (
_ inject.BackupProducer = &Controller{}
_ inject.RestoreConsumer = &Controller{}
_ inject.ToServiceHandler = &Controller{}
)

View File

@ -520,7 +520,12 @@ func (suite *ControllerIntegrationSuite) TestEmptyCollections() {
Selector: test.sel,
}
deets, _, err := suite.ctrl.ConsumeRestoreCollections(
handler, err := suite.ctrl.NewServiceHandler(
control.DefaultOptions(),
test.sel.PathService())
require.NoError(t, err, clues.ToCore(err))
deets, _, err := handler.ConsumeRestoreCollections(
ctx,
rcc,
test.col,
@ -562,7 +567,12 @@ func runRestore(
Selector: restoreSel,
}
deets, status, err := restoreCtrl.ConsumeRestoreCollections(
handler, err := restoreCtrl.NewServiceHandler(
control.DefaultOptions(),
sci.Service)
require.NoError(t, err, clues.ToCore(err))
deets, status, err := handler.ConsumeRestoreCollections(
ctx,
rcc,
collections,
@ -1194,7 +1204,12 @@ func (suite *ControllerIntegrationSuite) TestMultiFolderBackupDifferentNames() {
Selector: restoreSel,
}
deets, status, err := restoreCtrl.ConsumeRestoreCollections(
handler, err := restoreCtrl.NewServiceHandler(
control.DefaultOptions(),
test.service)
require.NoError(t, err, clues.ToCore(err))
deets, status, err := handler.ConsumeRestoreCollections(
ctx,
rcc,
collections,

View File

@ -109,3 +109,45 @@ func (ctrl Controller) SetRateLimiter(
) context.Context {
return ctx
}
var _ inject.RestoreConsumer = &RestoreConsumer{}
type RestoreConsumer struct {
Deets *details.Details
Err error
Stats data.CollectionStats
ProtectedResourceID string
ProtectedResourceName string
ProtectedResourceErr error
}
func (rc RestoreConsumer) IsServiceEnabled(
context.Context,
string,
) (bool, error) {
return true, rc.Err
}
func (rc RestoreConsumer) PopulateProtectedResourceIDAndName(
ctx context.Context,
protectedResource string, // input value, can be either id or name
ins idname.Cacher,
) (idname.Provider, error) {
return idname.NewProvider(rc.ProtectedResourceID, rc.ProtectedResourceName),
rc.ProtectedResourceErr
}
func (rc RestoreConsumer) CacheItemInfo(dii details.ItemInfo) {}
func (rc RestoreConsumer) ConsumeRestoreCollections(
ctx context.Context,
rcc inject.RestoreConsumerConfig,
dcs []data.RestoreCollection,
errs *fault.Bus,
ctr *count.Bus,
) (*details.Details, *data.CollectionStats, error) {
return rc.Deets, &rc.Stats, rc.Err
}

View File

@ -1,91 +0,0 @@
package m365
import (
"context"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/m365/collection/drive"
"github.com/alcionai/corso/src/internal/m365/service/exchange"
"github.com/alcionai/corso/src/internal/m365/service/groups"
"github.com/alcionai/corso/src/internal/m365/service/onedrive"
"github.com/alcionai/corso/src/internal/m365/service/sharepoint"
"github.com/alcionai/corso/src/internal/operations/inject"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
)
// ConsumeRestoreCollections restores data from the specified collections
// into M365 using the GraphAPI.
func (ctrl *Controller) ConsumeRestoreCollections(
ctx context.Context,
rcc inject.RestoreConsumerConfig,
dcs []data.RestoreCollection,
errs *fault.Bus,
ctr *count.Bus,
) (*details.Details, *data.CollectionStats, error) {
ctx, end := diagnostics.Span(ctx, "m365:restore")
defer end()
ctx = graph.BindRateLimiterConfig(ctx, graph.LimiterCfg{Service: rcc.Selector.PathService()})
ctx = clues.Add(ctx, "restore_config", rcc.RestoreConfig)
if len(dcs) == 0 {
return nil, nil, clues.New("no data collections to restore")
}
var (
service = rcc.Selector.PathService()
stats *data.CollectionStats
deets *details.Details
err error
)
switch service {
case path.ExchangeService:
deets, stats, err = exchange.ConsumeRestoreCollections(
ctx,
ctrl.AC,
rcc,
dcs,
errs,
ctr)
case path.OneDriveService:
deets, stats, err = onedrive.ConsumeRestoreCollections(
ctx,
drive.NewUserDriveRestoreHandler(ctrl.AC),
rcc,
ctrl.backupDriveIDNames,
dcs,
errs,
ctr)
case path.SharePointService:
deets, stats, err = sharepoint.ConsumeRestoreCollections(
ctx,
rcc,
ctrl.AC,
ctrl.backupDriveIDNames,
dcs,
errs,
ctr)
case path.GroupsService:
deets, stats, err = groups.ConsumeRestoreCollections(
ctx,
rcc,
ctrl.AC,
ctrl.backupDriveIDNames,
ctrl.backupSiteIDWebURL,
dcs,
errs,
ctr)
default:
err = clues.Wrap(clues.New(service.String()), "service not supported")
}
return deets, stats, err
}

View File

@ -19,7 +19,7 @@ import (
"github.com/alcionai/corso/src/pkg/services/m365/api"
)
var _ inject.ServiceHandler = &baseExchangeHandler{}
var _ inject.ServiceHandler = &exchangeHandler{}
func NewExchangeHandler(
opts control.Options,

View File

@ -13,29 +13,38 @@ import (
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
)
// ConsumeRestoreCollections restores M365 objects in data.RestoreCollection to MSFT
// store through GraphAPI.
func ConsumeRestoreCollections(
func (h *exchangeHandler) ConsumeRestoreCollections(
ctx context.Context,
ac api.Client,
rcc inject.RestoreConsumerConfig,
dcs []data.RestoreCollection,
errs *fault.Bus,
ctr *count.Bus,
) (*details.Details, *data.CollectionStats, error) {
if len(dcs) == 0 {
return nil, nil, clues.New("no data collections to restore")
return nil, nil, clues.WrapWC(ctx, data.ErrNoData, "performing restore")
}
// TODO(ashmrtn): We should stop relying on the context for rate limiter stuff
// and instead configure this when we make the handler instance. We can't
// initialize it in the NewHandler call right now because those functions
// aren't (and shouldn't be) returning a context along with the handler. Since
// that call isn't directly calling into this function even if we did
// initialize the rate limiter there it would be lost because it wouldn't get
// stored in an ancestor of the context passed to this function.
ctx = graph.BindRateLimiterConfig(
ctx,
graph.LimiterCfg{Service: path.ExchangeService})
var (
deets = &details.Builder{}
resourceID = rcc.ProtectedResource.ID()
directoryCache = make(map[path.CategoryType]graph.ContainerResolver)
handlers = exchange.RestoreHandlers(ac)
handlers = exchange.RestoreHandlers(h.apiClient)
metrics support.CollectionMetrics
el = errs.Local()
)

View File

@ -25,21 +25,35 @@ import (
)
// ConsumeRestoreCollections will restore the specified data collections into OneDrive
func ConsumeRestoreCollections(
func (h *groupsHandler) ConsumeRestoreCollections(
ctx context.Context,
rcc inject.RestoreConsumerConfig,
ac api.Client,
backupDriveIDNames idname.Cacher,
backupSiteIDWebURL idname.Cacher,
dcs []data.RestoreCollection,
errs *fault.Bus,
ctr *count.Bus,
) (*details.Details, *data.CollectionStats, error) {
if len(dcs) == 0 {
return nil, nil, clues.WrapWC(ctx, data.ErrNoData, "performing restore")
}
// TODO(ashmrtn): We should stop relying on the context for rate limiter stuff
// and instead configure this when we make the handler instance. We can't
// initialize it in the NewHandler call right now because those functions
// aren't (and shouldn't be) returning a context along with the handler. Since
// that call isn't directly calling into this function even if we did
// initialize the rate limiter there it would be lost because it wouldn't get
// stored in an ancestor of the context passed to this function.
ctx = graph.BindRateLimiterConfig(
ctx,
graph.LimiterCfg{Service: path.GroupsService})
var (
deets = &details.Builder{}
restoreMetrics support.CollectionMetrics
caches = drive.NewRestoreCaches(backupDriveIDNames)
lrh = drive.NewSiteRestoreHandler(ac, rcc.Selector.PathService())
deets = &details.Builder{}
restoreMetrics support.CollectionMetrics
caches = drive.NewRestoreCaches(h.backupDriveIDNames)
lrh = drive.NewSiteRestoreHandler(
h.apiClient,
rcc.Selector.PathService())
el = errs.Local()
webURLToSiteNames = map[string]string{}
)
@ -70,13 +84,13 @@ func ConsumeRestoreCollections(
case path.LibrariesCategory:
siteID := dc.FullPath().Folders()[1]
webURL, ok := backupSiteIDWebURL.NameOf(siteID)
webURL, ok := h.backupSiteIDWebURL.NameOf(siteID)
if !ok {
// This should not happen, but just in case
logger.Ctx(ictx).With("site_id", siteID).Info("site weburl not found, using site id")
}
siteName, err = getSiteName(ictx, siteID, webURL, ac.Sites(), webURLToSiteNames)
siteName, err = getSiteName(ictx, siteID, webURL, h.apiClient.Sites(), webURLToSiteNames)
if err != nil {
el.AddRecoverable(ictx, clues.Wrap(err, "getting site").
With("web_url", webURL, "site_id", siteID))

View File

@ -9,7 +9,6 @@ import (
"github.com/stretchr/testify/suite"
"golang.org/x/exp/slices"
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/data/mock"
@ -53,15 +52,13 @@ func (suite *GroupsUnitSuite) TestConsumeRestoreCollections_noErrorOnGroups() {
mock.Collection{Path: pth},
}
_, _, err = ConsumeRestoreCollections(
ctx,
rcc,
api.Client{},
idname.NewCache(map[string]string{}),
idname.NewCache(map[string]string{}),
dcs,
fault.New(false),
nil)
_, _, err = NewGroupsHandler(control.DefaultOptions(), api.Client{}, nil).
ConsumeRestoreCollections(
ctx,
rcc,
dcs,
fault.New(false),
nil)
assert.NoError(t, err, "Groups Channels restore")
}

View File

@ -7,7 +7,6 @@ import (
"github.com/alcionai/clues"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/collection/drive"
"github.com/alcionai/corso/src/internal/m365/support"
@ -17,24 +16,39 @@ import (
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
)
// ConsumeRestoreCollections will restore the specified data collections into OneDrive
func ConsumeRestoreCollections(
func (h *onedriveHandler) ConsumeRestoreCollections(
ctx context.Context,
rh drive.RestoreHandler,
rcc inject.RestoreConsumerConfig,
backupDriveIDNames idname.Cacher,
dcs []data.RestoreCollection,
errs *fault.Bus,
ctr *count.Bus,
) (*details.Details, *data.CollectionStats, error) {
if len(dcs) == 0 {
return nil, nil, clues.WrapWC(ctx, data.ErrNoData, "performing restore")
}
// TODO(ashmrtn): We should stop relying on the context for rate limiter stuff
// and instead configure this when we make the handler instance. We can't
// initialize it in the NewHandler call right now because those functions
// aren't (and shouldn't be) returning a context along with the handler. Since
// that call isn't directly calling into this function even if we did
// initialize the rate limiter there it would be lost because it wouldn't get
// stored in an ancestor of the context passed to this function.
ctx = graph.BindRateLimiterConfig(
ctx,
graph.LimiterCfg{Service: path.OneDriveService})
var (
deets = &details.Builder{}
restoreMetrics support.CollectionMetrics
el = errs.Local()
caches = drive.NewRestoreCaches(backupDriveIDNames)
caches = drive.NewRestoreCaches(h.backupDriveIDNames)
fallbackDriveName = rcc.RestoreConfig.Location
rh = drive.NewUserDriveRestoreHandler(h.apiClient)
)
ctx = clues.Add(ctx, "backup_version", rcc.BackupVersion)

View File

@ -7,7 +7,6 @@ import (
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/common/dttm"
"github.com/alcionai/corso/src/internal/common/idname"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/m365/collection/drive"
"github.com/alcionai/corso/src/internal/m365/collection/site"
@ -18,24 +17,39 @@ import (
"github.com/alcionai/corso/src/pkg/count"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
"github.com/alcionai/corso/src/pkg/services/m365/api"
"github.com/alcionai/corso/src/pkg/services/m365/api/graph"
)
// ConsumeRestoreCollections will restore the specified data collections into OneDrive
func ConsumeRestoreCollections(
func (h *sharepointHandler) ConsumeRestoreCollections(
ctx context.Context,
rcc inject.RestoreConsumerConfig,
ac api.Client,
backupDriveIDNames idname.Cacher,
dcs []data.RestoreCollection,
errs *fault.Bus,
ctr *count.Bus,
) (*details.Details, *data.CollectionStats, error) {
if len(dcs) == 0 {
return nil, nil, clues.WrapWC(ctx, data.ErrNoData, "performing restore")
}
// TODO(ashmrtn): We should stop relying on the context for rate limiter stuff
// and instead configure this when we make the handler instance. We can't
// initialize it in the NewHandler call right now because those functions
// aren't (and shouldn't be) returning a context along with the handler. Since
// that call isn't directly calling into this function even if we did
// initialize the rate limiter there it would be lost because it wouldn't get
// stored in an ancestor of the context passed to this function.
ctx = graph.BindRateLimiterConfig(
ctx,
graph.LimiterCfg{Service: path.SharePointService})
var (
deets = &details.Builder{}
lrh = drive.NewSiteRestoreHandler(ac, rcc.Selector.PathService())
deets = &details.Builder{}
lrh = drive.NewSiteRestoreHandler(
h.apiClient,
rcc.Selector.PathService())
restoreMetrics support.CollectionMetrics
caches = drive.NewRestoreCaches(backupDriveIDNames)
caches = drive.NewRestoreCaches(h.backupDriveIDNames)
el = errs.Local()
)
@ -81,7 +95,7 @@ func ConsumeRestoreCollections(
case path.ListsCategory:
metrics, err = site.RestoreListCollection(
ictx,
ac.Stable,
h.apiClient.Stable,
dc,
rcc.RestoreConfig.Location,
deets,
@ -90,7 +104,7 @@ func ConsumeRestoreCollections(
case path.PagesCategory:
metrics, err = site.RestorePageCollection(
ictx,
ac.Stable,
h.apiClient.Stable,
dc,
rcc.RestoreConfig.Location,
deets,

View File

@ -63,9 +63,9 @@ type (
ctr *count.Bus,
) (*details.Details, *data.CollectionStats, error)
IsServiceEnableder
Wait() *data.CollectionStats
// TODO(ashmrtn): Update the IsServiceEnableder interface once
// BackupProducer is also switched to service handlers.
IsServiceEnabled(ctx context.Context, resourceOwner string) (bool, error)
CacheItemInfoer
PopulateProtectedResourceIDAndNamer
@ -127,6 +127,7 @@ type (
// service-specific functionality for backups, restores, and exports.
ServiceHandler interface {
ExportConsumer
RestoreConsumer
}
ToServiceHandler interface {

View File

@ -240,10 +240,7 @@ func (op *RestoreOperation) do(
"restore_protected_resource_name", clues.Hide(restoreToProtectedResource.Name()))
// Check if the resource has the service enabled to be able to restore.
enabled, err := op.rc.IsServiceEnabled(
ctx,
op.Selectors.PathService(),
restoreToProtectedResource.ID())
enabled, err := op.rc.IsServiceEnabled(ctx, restoreToProtectedResource.ID())
if err != nil {
return nil, clues.Wrap(err, "verifying service restore is enabled")
}
@ -393,9 +390,16 @@ func consumeRestoreCollections(
errs *fault.Bus,
ctr *count.Bus,
) (*details.Details, *data.CollectionStats, error) {
if len(dcs) == 0 {
return nil, nil, clues.New("no data collections to restore")
}
progressBar := observe.MessageWithCompletion(ctx, observe.ProgressCfg{}, "Restoring data")
defer close(progressBar)
ctx, end := diagnostics.Span(ctx, "operations:restore")
defer end()
rcc := inject.RestoreConsumerConfig{
BackupVersion: backupVersion,
Options: opts,
@ -404,6 +408,8 @@ func consumeRestoreCollections(
Selector: sel,
}
ctx = clues.Add(ctx, "restore_config", rcc.RestoreConfig)
deets, status, err := rc.ConsumeRestoreCollections(ctx, rcc, dcs, errs, ctr)
return deets, status, clues.Wrap(err, "restoring collections").OrNil()

View File

@ -51,7 +51,7 @@ func (suite *RestoreOpUnitSuite) TestRestoreOperation_PersistResults() {
var (
kw = &kopia.Wrapper{}
sw = store.NewWrapper(&kopia.ModelStore{})
ctrl = &mock.Controller{}
ctrl = &mock.RestoreConsumer{}
now = time.Now()
restoreCfg = testdata.DefaultRestoreConfig("")
)
@ -286,7 +286,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
var (
kw = &kopia.Wrapper{}
sw = store.NewWrapper(&kopia.ModelStore{})
ctrl = &mock.Controller{}
rc = &mock.RestoreConsumer{}
restoreCfg = testdata.DefaultRestoreConfig("")
opts = control.DefaultOptions()
)
@ -299,9 +299,9 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
targets []string
errCheck assert.ErrorAssertionFunc
}{
{"good", kw, sw, ctrl, nil, assert.NoError},
{"missing kopia", nil, sw, ctrl, nil, assert.Error},
{"missing modelstore", kw, nil, ctrl, nil, assert.Error},
{"good", kw, sw, rc, nil, assert.NoError},
{"missing kopia", nil, sw, rc, nil, assert.Error},
{"missing modelstore", kw, nil, rc, nil, assert.Error},
{"missing restore consumer", kw, sw, nil, nil, assert.Error},
}
for _, test := range table {
@ -350,12 +350,17 @@ func (suite *RestoreOpIntegrationSuite) TestRestore_Run_errorNoBackup() {
count.New())
require.NoError(t, err, clues.ToCore(err))
rc, err := ctrl.NewServiceHandler(
control.DefaultOptions(),
rsel.PathService())
require.NoError(t, err, clues.ToCore(err))
ro, err := NewRestoreOperation(
ctx,
control.DefaultOptions(),
suite.kw,
suite.sw,
ctrl,
rc,
tconfig.NewM365Account(t),
"backupID",
rsel.Selector,

View File

@ -590,7 +590,10 @@ func generateContainerOfItems(
Selector: sel,
}
deets, _, err := ctrl.ConsumeRestoreCollections(
handler, err := ctrl.NewServiceHandler(opts, service)
require.NoError(t, err, clues.ToCore(err))
deets, _, err := handler.ConsumeRestoreCollections(
ctx,
rcc,
dataColls,

View File

@ -151,12 +151,15 @@ func newTestRestoreOp(
) operations.RestoreOperation {
rod.ctrl.IDNameLookup = idname.NewCache(map[string]string{rod.sel.ID(): rod.sel.Name()})
handler, err := rod.ctrl.NewServiceHandler(opts, rod.sel.PathService())
require.NoError(t, err, clues.ToCore(err))
ro, err := operations.NewRestoreOperation(
ctx,
opts,
rod.kw,
rod.sw,
rod.ctrl,
handler,
rod.acct,
backupID,
rod.sel,

View File

@ -17,7 +17,8 @@ import (
type DataProvider interface {
inject.BackupProducer
inject.RestoreConsumer
// Required for backups right now.
inject.PopulateProtectedResourceIDAndNamer
inject.ToServiceHandler

View File

@ -3,6 +3,8 @@ package repository
import (
"context"
"github.com/alcionai/clues"
"github.com/alcionai/corso/src/internal/model"
"github.com/alcionai/corso/src/internal/operations"
"github.com/alcionai/corso/src/pkg/control"
@ -27,12 +29,17 @@ func (r repository) NewRestore(
sel selectors.Selector,
restoreCfg control.RestoreConfig,
) (operations.RestoreOperation, error) {
handler, err := r.Provider.NewServiceHandler(r.Opts, sel.PathService())
if err != nil {
return operations.RestoreOperation{}, clues.Stack(err)
}
return operations.NewRestoreOperation(
ctx,
r.Opts,
r.dataLayer,
store.NewWrapper(r.modelStore),
r.Provider,
handler,
r.Account,
model.StableID(backupID),
sel,