Implement OneDrive DataCollection restore (#877)
## Description Implements `data.Collection` restore for OneDrive and wires it up to the restore operation Leverages previously added `onedrive` helpers that have integration tests. An op/CLI level integration test will be added in a follow-up PR. ## Type of change <!--- Please check the type of change your PR introduces: ---> - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Test - [ ] 💻 CI/Deployment - [ ] 🐹 Trivial/Minor ## Issue(s) * #668 ## Test Plan <!-- How will this be tested prior to merging.--> - [x] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E ``` $ ./corso restore onedrive --backup 7de4d68f-67c2-4e81-95a2-81cf0d1bd11d Restored OneDrive in S3 for user ```
This commit is contained in:
parent
1203fd2b6a
commit
921d77cc05
@ -12,8 +12,12 @@ const (
|
||||
ClippedSimpleTimeFormat = "02-Jan-2006_15:04"
|
||||
LegacyTimeFormat = time.RFC3339
|
||||
SimpleDateTimeFormat = "02-Jan-2006_15:04:05"
|
||||
StandardTimeFormat = time.RFC3339Nano
|
||||
TabularOutputTimeFormat = "2006-01-02T15:04:05Z"
|
||||
// SimpleDateTimeFormatOneDrive is similar to `SimpleDateTimeFormat`
|
||||
// but uses `-` instead of `:` which is a reserved character in
|
||||
// OneDrive
|
||||
SimpleDateTimeFormatOneDrive = "02-Jan-2006_15-04-05"
|
||||
StandardTimeFormat = time.RFC3339Nano
|
||||
TabularOutputTimeFormat = "2006-01-02T15:04:05Z"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@ -18,7 +18,6 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/connector/onedrive"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/path"
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
@ -244,6 +243,32 @@ func (gc *GraphConnector) ExchangeDataCollection(
|
||||
// RestoreDataCollections restores data from the specified collections
|
||||
// into M365
|
||||
func (gc *GraphConnector) RestoreDataCollections(
|
||||
ctx context.Context,
|
||||
selector selectors.Selector,
|
||||
dcs []data.Collection,
|
||||
) error {
|
||||
switch selector.Service {
|
||||
case selectors.ServiceExchange:
|
||||
return gc.RestoreExchangeDataCollections(ctx, dcs)
|
||||
case selectors.ServiceOneDrive:
|
||||
status, err := onedrive.RestoreCollections(ctx, gc, dcs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
gc.incrementAwaitingMessages()
|
||||
|
||||
gc.UpdateStatus(status)
|
||||
|
||||
return nil
|
||||
default:
|
||||
return errors.Errorf("restore data from service %s not supported", selector.Service.String())
|
||||
}
|
||||
}
|
||||
|
||||
// RestoreDataCollections restores data from the specified collections
|
||||
// into M365
|
||||
func (gc *GraphConnector) RestoreExchangeDataCollections(
|
||||
ctx context.Context,
|
||||
dcs []data.Collection,
|
||||
) error {
|
||||
@ -266,21 +291,10 @@ func (gc *GraphConnector) RestoreDataCollections(
|
||||
exit bool
|
||||
)
|
||||
|
||||
// Check whether restoring data into the specified service is supported
|
||||
switch service {
|
||||
case path.ExchangeService:
|
||||
// Supported
|
||||
default:
|
||||
return errors.Errorf("restore data from service %s not supported", service.String())
|
||||
}
|
||||
|
||||
if _, ok := pathCounter[directory.String()]; !ok {
|
||||
pathCounter[directory.String()] = true
|
||||
|
||||
switch service {
|
||||
case path.ExchangeService:
|
||||
folderID, errs = exchange.GetRestoreContainer(ctx, &gc.graphService, user, category)
|
||||
}
|
||||
folderID, errs = exchange.GetRestoreContainer(ctx, &gc.graphService, user, category)
|
||||
|
||||
if errs != nil {
|
||||
fmt.Println("RestoreContainer Failed")
|
||||
@ -312,10 +326,7 @@ func (gc *GraphConnector) RestoreDataCollections(
|
||||
continue
|
||||
}
|
||||
|
||||
switch service {
|
||||
case path.ExchangeService:
|
||||
err = exchange.RestoreExchangeObject(ctx, buf.Bytes(), category, policy, &gc.graphService, folderID, user)
|
||||
}
|
||||
err = exchange.RestoreExchangeObject(ctx, buf.Bytes(), category, policy, &gc.graphService, folderID, user)
|
||||
|
||||
if err != nil {
|
||||
// More information to be here
|
||||
|
||||
@ -120,7 +120,7 @@ func (oc *Collection) populateItems(ctx context.Context) {
|
||||
// Item read successfully, add to collection
|
||||
itemsRead++
|
||||
oc.data <- &Item{
|
||||
id: itemID,
|
||||
id: itemName,
|
||||
data: itemData,
|
||||
info: &details.OneDriveInfo{
|
||||
ItemType: details.OneDriveItem,
|
||||
|
||||
@ -92,7 +92,7 @@ func (suite *OneDriveCollectionSuite) TestOneDriveCollection() {
|
||||
readItem := readItems[0]
|
||||
readItemInfo := readItem.(data.StreamInfo)
|
||||
|
||||
assert.Equal(t, testItemID, readItem.UUID())
|
||||
assert.Equal(t, testItemName, readItem.UUID())
|
||||
readData, err := io.ReadAll(readItem.ToReader())
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
@ -14,6 +14,8 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
)
|
||||
|
||||
var errFolderNotFound = errors.New("folder not found")
|
||||
|
||||
const (
|
||||
// nextLinkKey is used to find the next link in a paged
|
||||
// graph response
|
||||
@ -95,7 +97,7 @@ func getFolder(ctx context.Context, service graph.Service, driveID string, paren
|
||||
return item, nil
|
||||
}
|
||||
|
||||
return nil, errors.Errorf("folder %s not found in drive(%s) parentFolder(%s)", folderName, driveID, parentFolderID)
|
||||
return nil, errors.WithStack(errFolderNotFound)
|
||||
}
|
||||
|
||||
// Create a new item in the specified folder
|
||||
@ -112,7 +114,7 @@ func createItem(ctx context.Context, service graph.Service, driveID string, pare
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(
|
||||
err,
|
||||
"failed to create folder. details: %s",
|
||||
"failed to create item. details: %s",
|
||||
support.ConnectorStackErrorTrace(err),
|
||||
)
|
||||
}
|
||||
|
||||
188
src/internal/connector/onedrive/restore.go
Normal file
188
src/internal/connector/onedrive/restore.go
Normal file
@ -0,0 +1,188 @@
|
||||
package onedrive
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common"
|
||||
"github.com/alcionai/corso/src/internal/connector/graph"
|
||||
"github.com/alcionai/corso/src/internal/connector/support"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/path"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
// copyBufferSize is used for chunked upload
|
||||
// Microsoft recommends 5-10MB buffers
|
||||
// https://docs.microsoft.com/en-us/graph/api/driveitem-createuploadsession?view=graph-rest-1.0#best-practices
|
||||
copyBufferSize = 5 * 1024 * 1024
|
||||
)
|
||||
|
||||
// drivePath is used to represent path components
|
||||
// of an item within the drive i.e.
|
||||
// Given `drives/b!X_8Z2zuXpkKkXZsr7gThk9oJpuj0yXVGnK5_VjRRPK-q725SX_8ZQJgFDK8PlFxA/root:/Folder1/Folder2/file`
|
||||
//
|
||||
// driveID is `b!X_8Z2zuXpkKkXZsr7gThk9oJpuj0yXVGnK5_VjRRPK-q725SX_8ZQJgFDK8PlFxA` and
|
||||
// folders[] is []{"Folder1", "Folder2"}
|
||||
type drivePath struct {
|
||||
driveID string
|
||||
folders []string
|
||||
}
|
||||
|
||||
func toOneDrivePath(p path.Path) (*drivePath, error) {
|
||||
folders := strings.Split(p.Folder(), "/")
|
||||
|
||||
// Must be at least `drives/<driveID>/root:`
|
||||
if len(folders) < 3 {
|
||||
return nil, errors.Errorf("folder path doesn't match expected format for OneDrive items: %s", p.Folder())
|
||||
}
|
||||
|
||||
return &drivePath{driveID: folders[1], folders: folders[3:]}, nil
|
||||
}
|
||||
|
||||
// RestoreCollections will restore the specified data collections into OneDrive
|
||||
func RestoreCollections(ctx context.Context, service graph.Service, dcs []data.Collection,
|
||||
) (*support.ConnectorOperationStatus, error) {
|
||||
var (
|
||||
total, restored int
|
||||
restoreErrors error
|
||||
copyBuffer = make([]byte, copyBufferSize)
|
||||
restoreContainerName = fmt.Sprintf("Corso_Restore_%s", common.FormatNow(common.SimpleDateTimeFormatOneDrive))
|
||||
)
|
||||
|
||||
// Iterate through the data collections and restore the contents of each
|
||||
for _, dc := range dcs {
|
||||
|
||||
directory := dc.FullPath()
|
||||
|
||||
drivePath, err := toOneDrivePath(directory)
|
||||
if err != nil {
|
||||
restoreErrors = support.WrapAndAppend(directory.String(), err, restoreErrors)
|
||||
continue
|
||||
}
|
||||
|
||||
// Assemble folder hierarchy we're going to restore into (we recreate the folder hierarchy
|
||||
// from the backup under this the restore folder instead of root)
|
||||
// i.e. Restore into `<drive>/root:/<restoreContainerName>/<original folder path>`
|
||||
|
||||
restoreFolderElements := []string{restoreContainerName}
|
||||
|
||||
restoreFolderElements = append(restoreFolderElements, drivePath.folders...)
|
||||
|
||||
logger.Ctx(ctx).Debugf("Restore target for %s is %v", dc.FullPath(), restoreFolderElements)
|
||||
|
||||
// Create restore folders and get the folder ID of the folder the data stream will be restored in
|
||||
restoreFolderID, err := createRestoreFolders(ctx, service, drivePath.driveID, restoreFolderElements)
|
||||
if err != nil {
|
||||
restoreErrors = support.WrapAndAppend(directory.String(), errors.Wrapf(err, "failed to create folders %v",
|
||||
restoreFolderElements), restoreErrors)
|
||||
continue
|
||||
}
|
||||
|
||||
// Restore items from the collection
|
||||
exit := false
|
||||
items := dc.Items()
|
||||
for !exit {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, support.WrapAndAppend("context cancelled", ctx.Err(), restoreErrors)
|
||||
case itemData, ok := <-items:
|
||||
if !ok {
|
||||
exit = true
|
||||
break
|
||||
}
|
||||
total++
|
||||
|
||||
err := restoreItem(ctx, service, itemData, drivePath.driveID, restoreFolderID, copyBuffer)
|
||||
if err != nil {
|
||||
restoreErrors = support.WrapAndAppend(itemData.UUID(), err, restoreErrors)
|
||||
continue
|
||||
}
|
||||
|
||||
restored++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return support.CreateStatus(ctx, support.Restore, total, restored, 0, restoreErrors), nil
|
||||
}
|
||||
|
||||
// createRestoreFolders creates the restore folder hieararchy in the specified drive and returns the folder ID
|
||||
// of the last folder entry in the hiearchy
|
||||
func createRestoreFolders(ctx context.Context, service graph.Service, driveID string, restoreFolders []string,
|
||||
) (string, error) {
|
||||
driveRoot, err := service.Client().DrivesById(driveID).Root().Get()
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(
|
||||
err,
|
||||
"failed to get drive root. details: %s",
|
||||
support.ConnectorStackErrorTrace(err),
|
||||
)
|
||||
}
|
||||
logger.Ctx(ctx).Debugf("Found Root for Drive %s with ID %s", driveID, *driveRoot.GetId())
|
||||
|
||||
parentFolderID := *driveRoot.GetId()
|
||||
for _, folder := range restoreFolders {
|
||||
folderItem, err := getFolder(ctx, service, driveID, parentFolderID, folder)
|
||||
if err == nil {
|
||||
parentFolderID = *folderItem.GetId()
|
||||
logger.Ctx(ctx).Debugf("Found %s with ID %s", folder, parentFolderID)
|
||||
continue
|
||||
}
|
||||
if err != errFolderNotFound {
|
||||
return "", errors.Wrapf(err, "folder %s not found in drive(%s) parentFolder(%s)", folder, driveID, parentFolderID)
|
||||
}
|
||||
|
||||
folderItem, err = createItem(ctx, service, driveID, parentFolderID, newItem(folder, true))
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(
|
||||
err,
|
||||
"failed to create folder %s/%s. details: %s", parentFolderID, folder,
|
||||
support.ConnectorStackErrorTrace(err),
|
||||
)
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Debugf("Resolved %s in %s to %s", folder, parentFolderID, *folderItem.GetId())
|
||||
parentFolderID = *folderItem.GetId()
|
||||
}
|
||||
|
||||
return parentFolderID, nil
|
||||
}
|
||||
|
||||
// restoreItem will create a new item in the specified `parentFolderID` and upload the data.Stream
|
||||
func restoreItem(ctx context.Context, service graph.Service, itemData data.Stream, driveID, parentFolderID string,
|
||||
copyBuffer []byte,
|
||||
) error {
|
||||
itemName := itemData.UUID()
|
||||
|
||||
// Get the stream size (needed to create the upload session)
|
||||
ss, ok := itemData.(data.StreamSize)
|
||||
if !ok {
|
||||
return errors.Errorf("item %q does not implement DataStreamInfo", itemName)
|
||||
}
|
||||
|
||||
// Create Item
|
||||
newItem, err := createItem(ctx, service, driveID, parentFolderID, newItem(itemData.UUID(), false))
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to create item %s", itemName)
|
||||
}
|
||||
|
||||
// Get a drive item writer
|
||||
w, err := driveItemWriter(ctx, service, driveID, *newItem.GetId(), ss.Size())
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to create item upload session %s", itemName)
|
||||
}
|
||||
|
||||
// Upload the stream data
|
||||
_, err = io.CopyBuffer(w, itemData.ToReader(), copyBuffer)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to upload data: item %s", itemName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
59
src/internal/connector/onedrive/restore_test.go
Normal file
59
src/internal/connector/onedrive/restore_test.go
Normal file
@ -0,0 +1,59 @@
|
||||
package onedrive
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/path"
|
||||
)
|
||||
|
||||
type OneDriveRestoreSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestOneDriveRestoreSuite(t *testing.T) {
|
||||
suite.Run(t, new(OneDriveRestoreSuite))
|
||||
}
|
||||
|
||||
func (suite *OneDriveRestoreSuite) Test_toOneDrivePath() {
|
||||
tests := []struct {
|
||||
name string
|
||||
pathElements []string
|
||||
expected *drivePath
|
||||
errCheck assert.ErrorAssertionFunc
|
||||
}{
|
||||
{
|
||||
name: "Not enough path elements",
|
||||
pathElements: []string{"drive", "driveID"},
|
||||
errCheck: assert.Error,
|
||||
},
|
||||
{
|
||||
name: "Root path",
|
||||
pathElements: []string{"drive", "driveID", "root:"},
|
||||
expected: &drivePath{driveID: "driveID", folders: []string{}},
|
||||
errCheck: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "Deeper path",
|
||||
pathElements: []string{"drive", "driveID", "root:", "folder1", "folder2"},
|
||||
expected: &drivePath{driveID: "driveID", folders: []string{"folder1", "folder2"}},
|
||||
errCheck: assert.NoError,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
suite.T().Run(tt.name, func(t *testing.T) {
|
||||
p, err := path.Builder{}.Append(tt.pathElements...).ToDataLayerOneDrivePath("tenant", "user", false)
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
got, err := toOneDrivePath(p)
|
||||
tt.errCheck(t, err)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
assert.Equal(suite.T(), tt.expected, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -183,7 +183,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
err = gc.RestoreDataCollections(ctx, dcs)
|
||||
err = gc.RestoreDataCollections(ctx, op.Selectors, dcs)
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "restoring service data")
|
||||
opStats.writeErr = err
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user