minor refactor for cleanup in operations (#1729)
## Description Starting off changes with a minor refactor. Hopefully will keep code cleaner as we populate the backup operation in the coming changes. ## Type of change - [x] 🐹 Trivial/Minor ## Issue(s) * #1725
This commit is contained in:
parent
a360a4c146
commit
53195017a1
@ -119,71 +119,84 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) {
|
|||||||
|
|
||||||
err = op.createBackupModels(ctx, opStats.k.SnapshotID, backupDetails)
|
err = op.createBackupModels(ctx, opStats.k.SnapshotID, backupDetails)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// todo: we're not persisting this yet, except for the error shown to the user.
|
|
||||||
opStats.writeErr = err
|
opStats.writeErr = err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
complete, closer := observe.MessageWithCompletion("Connecting to M365:")
|
gc, err := connectToM365(ctx, op.Selectors, op.account)
|
||||||
defer closer()
|
|
||||||
defer close(complete)
|
|
||||||
|
|
||||||
// retrieve data from the producer
|
|
||||||
resource := connector.Users
|
|
||||||
if op.Selectors.Service == selectors.ServiceSharePoint {
|
|
||||||
resource = connector.Sites
|
|
||||||
}
|
|
||||||
|
|
||||||
gc, err := connector.NewGraphConnector(ctx, op.account, resource)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = errors.Wrap(err, "connecting to graph api")
|
opStats.readErr = errors.Wrap(err, "connecting to M365")
|
||||||
opStats.readErr = err
|
return opStats.readErr
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
complete <- struct{}{}
|
|
||||||
|
|
||||||
discoverCh, closer := observe.MessageWithCompletion("Discovering items to backup:")
|
cs, err := produceBackupDataCollections(ctx, gc, op.Selectors)
|
||||||
defer closer()
|
|
||||||
defer close(discoverCh)
|
|
||||||
|
|
||||||
cs, err := gc.DataCollections(ctx, op.Selectors, nil)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = errors.Wrap(err, "retrieving service data")
|
opStats.readErr = errors.Wrap(err, "retrieving data to backup")
|
||||||
opStats.readErr = err
|
return opStats.readErr
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
discoverCh <- struct{}{}
|
opStats.k, backupDetails, err = consumeBackupDataCollections(ctx, op.kopia, op.Selectors, cs)
|
||||||
|
|
||||||
opStats.resourceCount = len(data.ResourceOwnerSet(cs))
|
|
||||||
|
|
||||||
backupCh, closer := observe.MessageWithCompletion("Backing up data:")
|
|
||||||
defer closer()
|
|
||||||
defer close(backupCh)
|
|
||||||
|
|
||||||
// hand the results to the consumer
|
|
||||||
opStats.k, backupDetails, err = op.kopia.BackupCollections(ctx, nil, cs, op.Selectors.PathService())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = errors.Wrap(err, "backing up service data")
|
opStats.writeErr = errors.Wrap(err, "backing up service data")
|
||||||
opStats.writeErr = err
|
return opStats.writeErr
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
backupCh <- struct{}{}
|
|
||||||
|
|
||||||
logger.Ctx(ctx).Debugf(
|
logger.Ctx(ctx).Debugf(
|
||||||
"Backed up %d directories and %d files",
|
"Backed up %d directories and %d files",
|
||||||
opStats.k.TotalDirectoryCount, opStats.k.TotalFileCount,
|
opStats.k.TotalDirectoryCount, opStats.k.TotalFileCount,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TODO: should always be 1, since backups are 1:1 with resourceOwners now.
|
||||||
|
opStats.resourceCount = len(data.ResourceOwnerSet(cs))
|
||||||
opStats.started = true
|
opStats.started = true
|
||||||
opStats.gc = gc.AwaitStatus()
|
opStats.gc = gc.AwaitStatus()
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// calls the producer to generate collections of data to backup
|
||||||
|
func produceBackupDataCollections(
|
||||||
|
ctx context.Context,
|
||||||
|
gc *connector.GraphConnector,
|
||||||
|
sel selectors.Selector,
|
||||||
|
) ([]data.Collection, error) {
|
||||||
|
complete, closer := observe.MessageWithCompletion("Discovering items to backup:")
|
||||||
|
defer func() {
|
||||||
|
complete <- struct{}{}
|
||||||
|
close(complete)
|
||||||
|
closer()
|
||||||
|
}()
|
||||||
|
|
||||||
|
cs, err := gc.DataCollections(ctx, sel, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return cs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// calls kopia to backup the collections of data
|
||||||
|
func consumeBackupDataCollections(
|
||||||
|
ctx context.Context,
|
||||||
|
kw *kopia.Wrapper,
|
||||||
|
sel selectors.Selector,
|
||||||
|
cs []data.Collection,
|
||||||
|
) (*kopia.BackupStats, *details.Details, error) {
|
||||||
|
complete, closer := observe.MessageWithCompletion("Backing up data:")
|
||||||
|
defer func() {
|
||||||
|
complete <- struct{}{}
|
||||||
|
close(complete)
|
||||||
|
closer()
|
||||||
|
}()
|
||||||
|
|
||||||
|
kstats, deets, err := kw.BackupCollections(ctx, nil, cs, sel.PathService())
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return kstats, deets, nil
|
||||||
|
}
|
||||||
|
|
||||||
// writes the results metrics to the operation results.
|
// writes the results metrics to the operation results.
|
||||||
// later stored in the manifest using createBackupModels.
|
// later stored in the manifest using createBackupModels.
|
||||||
func (op *BackupOperation) persistResults(
|
func (op *BackupOperation) persistResults(
|
||||||
|
|||||||
@ -1,13 +1,18 @@
|
|||||||
package operations
|
package operations
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/src/internal/connector"
|
||||||
"github.com/alcionai/corso/src/internal/events"
|
"github.com/alcionai/corso/src/internal/events"
|
||||||
"github.com/alcionai/corso/src/internal/kopia"
|
"github.com/alcionai/corso/src/internal/kopia"
|
||||||
|
"github.com/alcionai/corso/src/internal/observe"
|
||||||
|
"github.com/alcionai/corso/src/pkg/account"
|
||||||
"github.com/alcionai/corso/src/pkg/control"
|
"github.com/alcionai/corso/src/pkg/control"
|
||||||
|
"github.com/alcionai/corso/src/pkg/selectors"
|
||||||
"github.com/alcionai/corso/src/pkg/store"
|
"github.com/alcionai/corso/src/pkg/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -82,3 +87,30 @@ func (op operation) validate() error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// produces a graph connector.
|
||||||
|
func connectToM365(
|
||||||
|
ctx context.Context,
|
||||||
|
sel selectors.Selector,
|
||||||
|
acct account.Account,
|
||||||
|
) (*connector.GraphConnector, error) {
|
||||||
|
complete, closer := observe.MessageWithCompletion("Connecting to M365:")
|
||||||
|
defer func() {
|
||||||
|
complete <- struct{}{}
|
||||||
|
close(complete)
|
||||||
|
closer()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// retrieve data from the producer
|
||||||
|
resource := connector.Users
|
||||||
|
if sel.Service == selectors.ServiceSharePoint {
|
||||||
|
resource = connector.Sites
|
||||||
|
}
|
||||||
|
|
||||||
|
gc, err := connector.NewGraphConnector(ctx, acct, resource)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return gc, nil
|
||||||
|
}
|
||||||
|
|||||||
@ -9,7 +9,6 @@ import (
|
|||||||
multierror "github.com/hashicorp/go-multierror"
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/alcionai/corso/src/internal/connector"
|
|
||||||
"github.com/alcionai/corso/src/internal/connector/support"
|
"github.com/alcionai/corso/src/internal/connector/support"
|
||||||
"github.com/alcionai/corso/src/internal/data"
|
"github.com/alcionai/corso/src/internal/data"
|
||||||
D "github.com/alcionai/corso/src/internal/diagnostics"
|
D "github.com/alcionai/corso/src/internal/diagnostics"
|
||||||
@ -161,24 +160,11 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
|
|||||||
opStats.cs = dcs
|
opStats.cs = dcs
|
||||||
opStats.resourceCount = len(data.ResourceOwnerSet(dcs))
|
opStats.resourceCount = len(data.ResourceOwnerSet(dcs))
|
||||||
|
|
||||||
gcComplete, closer := observe.MessageWithCompletion("Connecting to M365:")
|
gc, err := connectToM365(ctx, op.Selectors, op.account)
|
||||||
defer closer()
|
|
||||||
defer close(gcComplete)
|
|
||||||
|
|
||||||
// restore those collections using graph
|
|
||||||
resource := connector.Users
|
|
||||||
if op.Selectors.Service == selectors.ServiceSharePoint {
|
|
||||||
resource = connector.Sites
|
|
||||||
}
|
|
||||||
|
|
||||||
gc, err := connector.NewGraphConnector(ctx, op.account, resource)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = errors.Wrap(err, "connecting to microsoft servers")
|
opStats.readErr = errors.Wrap(err, "connecting to M365")
|
||||||
opStats.writeErr = err
|
return nil, opStats.readErr
|
||||||
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
gcComplete <- struct{}{}
|
|
||||||
|
|
||||||
restoreComplete, closer := observe.MessageWithCompletion("Restoring data:")
|
restoreComplete, closer := observe.MessageWithCompletion("Restoring data:")
|
||||||
defer closer()
|
defer closer()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user