diff --git a/src/internal/common/time.go b/src/internal/common/time.go index 7a03f8ae9..3213ce583 100644 --- a/src/internal/common/time.go +++ b/src/internal/common/time.go @@ -12,8 +12,12 @@ const ( ClippedSimpleTimeFormat = "02-Jan-2006_15:04" LegacyTimeFormat = time.RFC3339 SimpleDateTimeFormat = "02-Jan-2006_15:04:05" - StandardTimeFormat = time.RFC3339Nano - TabularOutputTimeFormat = "2006-01-02T15:04:05Z" + // SimpleDateTimeFormatOneDrive is similar to `SimpleDateTimeFormat` + // but uses `-` instead of `:` which is a reserved character in + // OneDrive + SimpleDateTimeFormatOneDrive = "02-Jan-2006_15-04-05" + StandardTimeFormat = time.RFC3339Nano + TabularOutputTimeFormat = "2006-01-02T15:04:05Z" ) var ( diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 6d202672c..3c7880add 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -18,7 +18,6 @@ import ( "github.com/alcionai/corso/src/internal/connector/onedrive" "github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/data" - "github.com/alcionai/corso/src/internal/path" "github.com/alcionai/corso/src/pkg/account" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/logger" @@ -244,6 +243,32 @@ func (gc *GraphConnector) ExchangeDataCollection( // RestoreDataCollections restores data from the specified collections // into M365 func (gc *GraphConnector) RestoreDataCollections( + ctx context.Context, + selector selectors.Selector, + dcs []data.Collection, +) error { + switch selector.Service { + case selectors.ServiceExchange: + return gc.RestoreExchangeDataCollections(ctx, dcs) + case selectors.ServiceOneDrive: + status, err := onedrive.RestoreCollections(ctx, gc, dcs) + if err != nil { + return err + } + + gc.incrementAwaitingMessages() + + gc.UpdateStatus(status) + + return nil + default: + return errors.Errorf("restore data from service %s not supported", selector.Service.String()) + } +} + +// RestoreDataCollections restores data from the specified collections +// into M365 +func (gc *GraphConnector) RestoreExchangeDataCollections( ctx context.Context, dcs []data.Collection, ) error { @@ -266,21 +291,10 @@ func (gc *GraphConnector) RestoreDataCollections( exit bool ) - // Check whether restoring data into the specified service is supported - switch service { - case path.ExchangeService: - // Supported - default: - return errors.Errorf("restore data from service %s not supported", service.String()) - } - if _, ok := pathCounter[directory.String()]; !ok { pathCounter[directory.String()] = true - switch service { - case path.ExchangeService: - folderID, errs = exchange.GetRestoreContainer(ctx, &gc.graphService, user, category) - } + folderID, errs = exchange.GetRestoreContainer(ctx, &gc.graphService, user, category) if errs != nil { fmt.Println("RestoreContainer Failed") @@ -312,10 +326,7 @@ func (gc *GraphConnector) RestoreDataCollections( continue } - switch service { - case path.ExchangeService: - err = exchange.RestoreExchangeObject(ctx, buf.Bytes(), category, policy, &gc.graphService, folderID, user) - } + err = exchange.RestoreExchangeObject(ctx, buf.Bytes(), category, policy, &gc.graphService, folderID, user) if err != nil { // More information to be here diff --git a/src/internal/connector/onedrive/collection.go b/src/internal/connector/onedrive/collection.go index 6799feb34..b57a80beb 100644 --- a/src/internal/connector/onedrive/collection.go +++ b/src/internal/connector/onedrive/collection.go @@ -120,7 +120,7 @@ func (oc *Collection) populateItems(ctx context.Context) { // Item read successfully, add to collection itemsRead++ oc.data <- &Item{ - id: itemID, + id: itemName, data: itemData, info: &details.OneDriveInfo{ ItemType: details.OneDriveItem, diff --git a/src/internal/connector/onedrive/collection_test.go b/src/internal/connector/onedrive/collection_test.go index add89f295..ff9eb6138 100644 --- a/src/internal/connector/onedrive/collection_test.go +++ b/src/internal/connector/onedrive/collection_test.go @@ -92,7 +92,7 @@ func (suite *OneDriveCollectionSuite) TestOneDriveCollection() { readItem := readItems[0] readItemInfo := readItem.(data.StreamInfo) - assert.Equal(t, testItemID, readItem.UUID()) + assert.Equal(t, testItemName, readItem.UUID()) readData, err := io.ReadAll(readItem.ToReader()) require.NoError(t, err) diff --git a/src/internal/connector/onedrive/drive.go b/src/internal/connector/onedrive/drive.go index 340872af3..d4b41316a 100644 --- a/src/internal/connector/onedrive/drive.go +++ b/src/internal/connector/onedrive/drive.go @@ -14,6 +14,8 @@ import ( "github.com/alcionai/corso/src/pkg/logger" ) +var errFolderNotFound = errors.New("folder not found") + const ( // nextLinkKey is used to find the next link in a paged // graph response @@ -95,7 +97,7 @@ func getFolder(ctx context.Context, service graph.Service, driveID string, paren return item, nil } - return nil, errors.Errorf("folder %s not found in drive(%s) parentFolder(%s)", folderName, driveID, parentFolderID) + return nil, errors.WithStack(errFolderNotFound) } // Create a new item in the specified folder @@ -112,7 +114,7 @@ func createItem(ctx context.Context, service graph.Service, driveID string, pare if err != nil { return nil, errors.Wrapf( err, - "failed to create folder. details: %s", + "failed to create item. details: %s", support.ConnectorStackErrorTrace(err), ) } diff --git a/src/internal/connector/onedrive/restore.go b/src/internal/connector/onedrive/restore.go new file mode 100644 index 000000000..475129f7d --- /dev/null +++ b/src/internal/connector/onedrive/restore.go @@ -0,0 +1,188 @@ +package onedrive + +import ( + "context" + "fmt" + "io" + "strings" + + "github.com/pkg/errors" + + "github.com/alcionai/corso/src/internal/common" + "github.com/alcionai/corso/src/internal/connector/graph" + "github.com/alcionai/corso/src/internal/connector/support" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/path" + "github.com/alcionai/corso/src/pkg/logger" +) + +const ( + // copyBufferSize is used for chunked upload + // Microsoft recommends 5-10MB buffers + // https://docs.microsoft.com/en-us/graph/api/driveitem-createuploadsession?view=graph-rest-1.0#best-practices + copyBufferSize = 5 * 1024 * 1024 +) + +// drivePath is used to represent path components +// of an item within the drive i.e. +// Given `drives/b!X_8Z2zuXpkKkXZsr7gThk9oJpuj0yXVGnK5_VjRRPK-q725SX_8ZQJgFDK8PlFxA/root:/Folder1/Folder2/file` +// +// driveID is `b!X_8Z2zuXpkKkXZsr7gThk9oJpuj0yXVGnK5_VjRRPK-q725SX_8ZQJgFDK8PlFxA` and +// folders[] is []{"Folder1", "Folder2"} +type drivePath struct { + driveID string + folders []string +} + +func toOneDrivePath(p path.Path) (*drivePath, error) { + folders := strings.Split(p.Folder(), "/") + + // Must be at least `drives//root:` + if len(folders) < 3 { + return nil, errors.Errorf("folder path doesn't match expected format for OneDrive items: %s", p.Folder()) + } + + return &drivePath{driveID: folders[1], folders: folders[3:]}, nil +} + +// RestoreCollections will restore the specified data collections into OneDrive +func RestoreCollections(ctx context.Context, service graph.Service, dcs []data.Collection, +) (*support.ConnectorOperationStatus, error) { + var ( + total, restored int + restoreErrors error + copyBuffer = make([]byte, copyBufferSize) + restoreContainerName = fmt.Sprintf("Corso_Restore_%s", common.FormatNow(common.SimpleDateTimeFormatOneDrive)) + ) + + // Iterate through the data collections and restore the contents of each + for _, dc := range dcs { + + directory := dc.FullPath() + + drivePath, err := toOneDrivePath(directory) + if err != nil { + restoreErrors = support.WrapAndAppend(directory.String(), err, restoreErrors) + continue + } + + // Assemble folder hierarchy we're going to restore into (we recreate the folder hierarchy + // from the backup under this the restore folder instead of root) + // i.e. Restore into `/root://` + + restoreFolderElements := []string{restoreContainerName} + + restoreFolderElements = append(restoreFolderElements, drivePath.folders...) + + logger.Ctx(ctx).Debugf("Restore target for %s is %v", dc.FullPath(), restoreFolderElements) + + // Create restore folders and get the folder ID of the folder the data stream will be restored in + restoreFolderID, err := createRestoreFolders(ctx, service, drivePath.driveID, restoreFolderElements) + if err != nil { + restoreErrors = support.WrapAndAppend(directory.String(), errors.Wrapf(err, "failed to create folders %v", + restoreFolderElements), restoreErrors) + continue + } + + // Restore items from the collection + exit := false + items := dc.Items() + for !exit { + select { + case <-ctx.Done(): + return nil, support.WrapAndAppend("context cancelled", ctx.Err(), restoreErrors) + case itemData, ok := <-items: + if !ok { + exit = true + break + } + total++ + + err := restoreItem(ctx, service, itemData, drivePath.driveID, restoreFolderID, copyBuffer) + if err != nil { + restoreErrors = support.WrapAndAppend(itemData.UUID(), err, restoreErrors) + continue + } + + restored++ + } + } + } + + return support.CreateStatus(ctx, support.Restore, total, restored, 0, restoreErrors), nil +} + +// createRestoreFolders creates the restore folder hieararchy in the specified drive and returns the folder ID +// of the last folder entry in the hiearchy +func createRestoreFolders(ctx context.Context, service graph.Service, driveID string, restoreFolders []string, +) (string, error) { + driveRoot, err := service.Client().DrivesById(driveID).Root().Get() + if err != nil { + return "", errors.Wrapf( + err, + "failed to get drive root. details: %s", + support.ConnectorStackErrorTrace(err), + ) + } + logger.Ctx(ctx).Debugf("Found Root for Drive %s with ID %s", driveID, *driveRoot.GetId()) + + parentFolderID := *driveRoot.GetId() + for _, folder := range restoreFolders { + folderItem, err := getFolder(ctx, service, driveID, parentFolderID, folder) + if err == nil { + parentFolderID = *folderItem.GetId() + logger.Ctx(ctx).Debugf("Found %s with ID %s", folder, parentFolderID) + continue + } + if err != errFolderNotFound { + return "", errors.Wrapf(err, "folder %s not found in drive(%s) parentFolder(%s)", folder, driveID, parentFolderID) + } + + folderItem, err = createItem(ctx, service, driveID, parentFolderID, newItem(folder, true)) + if err != nil { + return "", errors.Wrapf( + err, + "failed to create folder %s/%s. details: %s", parentFolderID, folder, + support.ConnectorStackErrorTrace(err), + ) + } + + logger.Ctx(ctx).Debugf("Resolved %s in %s to %s", folder, parentFolderID, *folderItem.GetId()) + parentFolderID = *folderItem.GetId() + } + + return parentFolderID, nil +} + +// restoreItem will create a new item in the specified `parentFolderID` and upload the data.Stream +func restoreItem(ctx context.Context, service graph.Service, itemData data.Stream, driveID, parentFolderID string, + copyBuffer []byte, +) error { + itemName := itemData.UUID() + + // Get the stream size (needed to create the upload session) + ss, ok := itemData.(data.StreamSize) + if !ok { + return errors.Errorf("item %q does not implement DataStreamInfo", itemName) + } + + // Create Item + newItem, err := createItem(ctx, service, driveID, parentFolderID, newItem(itemData.UUID(), false)) + if err != nil { + return errors.Wrapf(err, "failed to create item %s", itemName) + } + + // Get a drive item writer + w, err := driveItemWriter(ctx, service, driveID, *newItem.GetId(), ss.Size()) + if err != nil { + return errors.Wrapf(err, "failed to create item upload session %s", itemName) + } + + // Upload the stream data + _, err = io.CopyBuffer(w, itemData.ToReader(), copyBuffer) + if err != nil { + return errors.Wrapf(err, "failed to upload data: item %s", itemName) + } + + return nil +} diff --git a/src/internal/connector/onedrive/restore_test.go b/src/internal/connector/onedrive/restore_test.go new file mode 100644 index 000000000..7b775bd0d --- /dev/null +++ b/src/internal/connector/onedrive/restore_test.go @@ -0,0 +1,59 @@ +package onedrive + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/path" +) + +type OneDriveRestoreSuite struct { + suite.Suite +} + +func TestOneDriveRestoreSuite(t *testing.T) { + suite.Run(t, new(OneDriveRestoreSuite)) +} + +func (suite *OneDriveRestoreSuite) Test_toOneDrivePath() { + tests := []struct { + name string + pathElements []string + expected *drivePath + errCheck assert.ErrorAssertionFunc + }{ + { + name: "Not enough path elements", + pathElements: []string{"drive", "driveID"}, + errCheck: assert.Error, + }, + { + name: "Root path", + pathElements: []string{"drive", "driveID", "root:"}, + expected: &drivePath{driveID: "driveID", folders: []string{}}, + errCheck: assert.NoError, + }, + { + name: "Deeper path", + pathElements: []string{"drive", "driveID", "root:", "folder1", "folder2"}, + expected: &drivePath{driveID: "driveID", folders: []string{"folder1", "folder2"}}, + errCheck: assert.NoError, + }, + } + for _, tt := range tests { + suite.T().Run(tt.name, func(t *testing.T) { + p, err := path.Builder{}.Append(tt.pathElements...).ToDataLayerOneDrivePath("tenant", "user", false) + require.NoError(suite.T(), err) + + got, err := toOneDrivePath(p) + tt.errCheck(t, err) + if err != nil { + return + } + assert.Equal(suite.T(), tt.expected, got) + }) + } +} diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 4f88cb908..201c500e7 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -183,7 +183,7 @@ func (op *RestoreOperation) Run(ctx context.Context) (err error) { return err } - err = gc.RestoreDataCollections(ctx, dcs) + err = gc.RestoreDataCollections(ctx, op.Selectors, dcs) if err != nil { err = errors.Wrap(err, "restoring service data") opStats.writeErr = err