## Description Adds the following selectors to OneDrive details/restore : - `file-name`, `folder`, `file-created-after`, `file-created-before`, `file-modified-after`, `file-modified-before` Also includes a change where we remove the `drive/<driveID>/root:` prefix from parent path entries in details. This is to improve readability. We will add drive back as a separate item in details if needed later. ## Type of change <!--- Please check the type of change your PR introduces: ---> - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Test - [ ] 💻 CI/Deployment - [ ] 🐹 Trivial/Minor ## Issue(s) * #627 ## Test Plan <!-- How will this be tested prior to merging.--> - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
278 lines
7.0 KiB
Go
278 lines
7.0 KiB
Go
package operations
|
|
|
|
import (
|
|
"context"
|
|
"runtime/trace"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
multierror "github.com/hashicorp/go-multierror"
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/alcionai/corso/src/internal/connector"
|
|
"github.com/alcionai/corso/src/internal/connector/support"
|
|
"github.com/alcionai/corso/src/internal/data"
|
|
"github.com/alcionai/corso/src/internal/events"
|
|
"github.com/alcionai/corso/src/internal/kopia"
|
|
"github.com/alcionai/corso/src/internal/model"
|
|
"github.com/alcionai/corso/src/internal/stats"
|
|
"github.com/alcionai/corso/src/pkg/account"
|
|
"github.com/alcionai/corso/src/pkg/backup/details"
|
|
"github.com/alcionai/corso/src/pkg/control"
|
|
"github.com/alcionai/corso/src/pkg/logger"
|
|
"github.com/alcionai/corso/src/pkg/path"
|
|
"github.com/alcionai/corso/src/pkg/selectors"
|
|
"github.com/alcionai/corso/src/pkg/store"
|
|
)
|
|
|
|
// RestoreOperation wraps an operation with restore-specific props.
|
|
type RestoreOperation struct {
|
|
operation
|
|
|
|
BackupID model.StableID `json:"backupID"`
|
|
Results RestoreResults `json:"results"`
|
|
Selectors selectors.Selector `json:"selectors"`
|
|
Destination control.RestoreDestination `json:"destination"`
|
|
Version string `json:"version"`
|
|
|
|
account account.Account
|
|
}
|
|
|
|
// RestoreResults aggregate the details of the results of the operation.
|
|
type RestoreResults struct {
|
|
stats.Errs
|
|
stats.ReadWrites
|
|
stats.StartAndEndTime
|
|
}
|
|
|
|
// NewRestoreOperation constructs and validates a restore operation.
|
|
func NewRestoreOperation(
|
|
ctx context.Context,
|
|
opts control.Options,
|
|
kw *kopia.Wrapper,
|
|
sw *store.Wrapper,
|
|
acct account.Account,
|
|
backupID model.StableID,
|
|
sel selectors.Selector,
|
|
dest control.RestoreDestination,
|
|
bus events.Eventer,
|
|
) (RestoreOperation, error) {
|
|
op := RestoreOperation{
|
|
operation: newOperation(opts, bus, kw, sw),
|
|
BackupID: backupID,
|
|
Selectors: sel,
|
|
Destination: dest,
|
|
Version: "v0",
|
|
account: acct,
|
|
}
|
|
if err := op.validate(); err != nil {
|
|
return RestoreOperation{}, err
|
|
}
|
|
|
|
return op, nil
|
|
}
|
|
|
|
func (op RestoreOperation) validate() error {
|
|
return op.operation.validate()
|
|
}
|
|
|
|
// aggregates stats from the restore.Run().
|
|
// primarily used so that the defer can take in a
|
|
// pointer wrapping the values, while those values
|
|
// get populated asynchronously.
|
|
type restoreStats struct {
|
|
cs []data.Collection
|
|
gc *support.ConnectorOperationStatus
|
|
bytesRead *stats.ByteCounter
|
|
resourceCount int
|
|
started bool
|
|
readErr, writeErr error
|
|
|
|
// a transient value only used to pair up start-end events.
|
|
restoreID string
|
|
}
|
|
|
|
// Run begins a synchronous restore operation.
|
|
func (op *RestoreOperation) Run(ctx context.Context) (err error) {
|
|
defer trace.StartRegion(ctx, "operations:restore:run").End()
|
|
|
|
startTime := time.Now()
|
|
|
|
// persist operation results to the model store on exit
|
|
opStats := restoreStats{
|
|
bytesRead: &stats.ByteCounter{},
|
|
restoreID: uuid.NewString(),
|
|
}
|
|
|
|
defer func() {
|
|
err = op.persistResults(ctx, startTime, &opStats)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}()
|
|
|
|
// retrieve the restore point details
|
|
d, b, err := op.store.GetDetailsFromBackupID(ctx, op.BackupID)
|
|
if err != nil {
|
|
err = errors.Wrap(err, "getting backup details for restore")
|
|
opStats.readErr = err
|
|
|
|
return err
|
|
}
|
|
|
|
op.bus.Event(
|
|
ctx,
|
|
events.RestoreStart,
|
|
map[string]any{
|
|
events.StartTime: startTime,
|
|
events.BackupID: op.BackupID,
|
|
events.BackupCreateTime: b.CreationTime,
|
|
events.RestoreID: opStats.restoreID,
|
|
// TODO: restore options,
|
|
},
|
|
)
|
|
|
|
var fds *details.Details
|
|
|
|
switch op.Selectors.Service {
|
|
case selectors.ServiceExchange:
|
|
er, err := op.Selectors.ToExchangeRestore()
|
|
if err != nil {
|
|
opStats.readErr = err
|
|
return err
|
|
}
|
|
|
|
// format the details and retrieve the items from kopia
|
|
fds = er.Reduce(ctx, d)
|
|
if len(fds.Entries) == 0 {
|
|
return selectors.ErrorNoMatchingItems
|
|
}
|
|
|
|
case selectors.ServiceOneDrive:
|
|
odr, err := op.Selectors.ToOneDriveRestore()
|
|
if err != nil {
|
|
opStats.readErr = err
|
|
return err
|
|
}
|
|
|
|
// format the details and retrieve the items from kopia
|
|
fds = odr.Reduce(ctx, d)
|
|
if len(fds.Entries) == 0 {
|
|
return selectors.ErrorNoMatchingItems
|
|
}
|
|
|
|
default:
|
|
return errors.Errorf("Service %s not supported", op.Selectors.Service)
|
|
}
|
|
|
|
logger.Ctx(ctx).Infof("Discovered %d items in backup %s to restore", len(fds.Entries), op.BackupID)
|
|
|
|
fdsPaths := fds.Paths()
|
|
paths := make([]path.Path, len(fdsPaths))
|
|
|
|
var parseErrs *multierror.Error
|
|
|
|
for i := range fdsPaths {
|
|
p, err := path.FromDataLayerPath(fdsPaths[i], true)
|
|
if err != nil {
|
|
parseErrs = multierror.Append(
|
|
parseErrs,
|
|
errors.Wrap(err, "parsing details entry path"),
|
|
)
|
|
|
|
continue
|
|
}
|
|
|
|
paths[i] = p
|
|
}
|
|
|
|
dcs, err := op.kopia.RestoreMultipleItems(ctx, b.SnapshotID, paths, opStats.bytesRead)
|
|
if err != nil {
|
|
err = errors.Wrap(err, "retrieving service data")
|
|
|
|
parseErrs = multierror.Append(parseErrs, err)
|
|
opStats.readErr = parseErrs.ErrorOrNil()
|
|
|
|
return err
|
|
}
|
|
|
|
opStats.readErr = parseErrs.ErrorOrNil()
|
|
opStats.cs = dcs
|
|
opStats.resourceCount = len(data.ResourceOwnerSet(dcs))
|
|
|
|
// restore those collections using graph
|
|
gc, err := connector.NewGraphConnector(ctx, op.account)
|
|
if err != nil {
|
|
err = errors.Wrap(err, "connecting to graph api")
|
|
opStats.writeErr = err
|
|
|
|
return err
|
|
}
|
|
|
|
err = gc.RestoreDataCollections(ctx, op.Selectors, op.Destination, dcs)
|
|
if err != nil {
|
|
err = errors.Wrap(err, "restoring service data")
|
|
opStats.writeErr = err
|
|
|
|
return err
|
|
}
|
|
|
|
opStats.started = true
|
|
opStats.gc = gc.AwaitStatus()
|
|
|
|
logger.Ctx(ctx).Debug(gc.PrintableStatus())
|
|
|
|
return nil
|
|
}
|
|
|
|
// writes the restoreOperation outcome to the modelStore.
|
|
func (op *RestoreOperation) persistResults(
|
|
ctx context.Context,
|
|
started time.Time,
|
|
opStats *restoreStats,
|
|
) error {
|
|
op.Results.StartedAt = started
|
|
op.Results.CompletedAt = time.Now()
|
|
|
|
op.Status = Completed
|
|
|
|
if !opStats.started {
|
|
op.Status = Failed
|
|
|
|
return multierror.Append(
|
|
errors.New("errors prevented the operation from processing"),
|
|
opStats.readErr,
|
|
opStats.writeErr)
|
|
}
|
|
|
|
op.Results.ReadErrors = opStats.readErr
|
|
op.Results.WriteErrors = opStats.writeErr
|
|
|
|
op.Results.BytesRead = opStats.bytesRead.NumBytes
|
|
op.Results.ItemsRead = len(opStats.cs) // TODO: file count, not collection count
|
|
op.Results.ItemsWritten = opStats.gc.Successful
|
|
op.Results.ResourceOwners = opStats.resourceCount
|
|
|
|
op.bus.Event(
|
|
ctx,
|
|
events.RestoreEnd,
|
|
map[string]any{
|
|
// TODO: RestoreID
|
|
events.BackupID: op.BackupID,
|
|
events.DataRetrieved: op.Results.BytesRead,
|
|
events.Duration: op.Results.CompletedAt.Sub(op.Results.StartedAt),
|
|
events.EndTime: op.Results.CompletedAt,
|
|
events.ItemsRead: op.Results.ItemsRead,
|
|
events.ItemsWritten: op.Results.ItemsWritten,
|
|
events.Resources: op.Results.ResourceOwners,
|
|
events.RestoreID: opStats.restoreID,
|
|
events.Service: op.Selectors.Service.String(),
|
|
events.StartTime: op.Results.StartedAt,
|
|
events.Status: op.Status,
|
|
// TODO: events.ExchangeDataObserved: <amount of data retrieved>,
|
|
},
|
|
)
|
|
|
|
return nil
|
|
}
|