adding clues & fault to onedrive restore (#2509)

## Does this PR need a docs update or release note?

- [x]  No 

## Type of change

- [x] 🧹 Tech Debt/Cleanup

## Issue(s)

* #1970

## Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-02-22 18:04:50 -07:00 committed by GitHub
parent 129b96714b
commit 6c7623041c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 115 additions and 117 deletions

View File

@ -269,7 +269,7 @@ func (gc *GraphConnector) RestoreDataCollections(
case selectors.ServiceExchange: case selectors.ServiceExchange:
status, err = exchange.RestoreExchangeDataCollections(ctx, creds, gc.Service, dest, dcs, deets, errs) status, err = exchange.RestoreExchangeDataCollections(ctx, creds, gc.Service, dest, dcs, deets, errs)
case selectors.ServiceOneDrive: case selectors.ServiceOneDrive:
status, err = onedrive.RestoreCollections(ctx, backupVersion, gc.Service, dest, opts, dcs, deets) status, err = onedrive.RestoreCollections(ctx, backupVersion, gc.Service, dest, opts, dcs, deets, errs)
case selectors.ServiceSharePoint: case selectors.ServiceSharePoint:
status, err = sharepoint.RestoreCollections(ctx, backupVersion, creds, gc.Service, dest, dcs, deets, errs) status, err = sharepoint.RestoreCollections(ctx, backupVersion, creds, gc.Service, dest, dcs, deets, errs)
default: default:

View File

@ -3,7 +3,6 @@ package onedrive
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"math" "math"
"runtime/trace" "runtime/trace"
@ -15,6 +14,7 @@ import (
"github.com/microsoftgraph/msgraph-sdk-go/models" "github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/connector/graph" "github.com/alcionai/corso/src/internal/connector/graph"
"github.com/alcionai/corso/src/internal/connector/support" "github.com/alcionai/corso/src/internal/connector/support"
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
@ -22,6 +22,7 @@ import (
"github.com/alcionai/corso/src/internal/observe" "github.com/alcionai/corso/src/internal/observe"
"github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
) )
@ -56,7 +57,7 @@ func getParentPermissions(
} }
if len(onedrivePath.Folders) != 0 { if len(onedrivePath.Folders) != 0 {
return nil, errors.Wrap(err, "unable to compute item permissions") return nil, errors.Wrap(err, "computing item permissions")
} }
parentPerms = []UserPermission{} parentPerms = []UserPermission{}
@ -75,11 +76,7 @@ func getParentAndCollectionPermissions(
return nil, nil, nil return nil, nil, nil
} }
var ( var parentPerms []UserPermission
err error
parentPerms []UserPermission
colPerms []UserPermission
)
// Only get parent permissions if we're not restoring the root. // Only get parent permissions if we're not restoring the root.
if len(drivePath.Folders) > 0 { if len(drivePath.Folders) > 0 {
@ -96,7 +93,7 @@ func getParentAndCollectionPermissions(
// TODO(ashmrtn): For versions after this pull the permissions from the // TODO(ashmrtn): For versions after this pull the permissions from the
// current collection with Fetch(). // current collection with Fetch().
colPerms, err = getParentPermissions(collectionPath, permissions) colPerms, err := getParentPermissions(collectionPath, permissions)
if err != nil { if err != nil {
return nil, nil, clues.Wrap(err, "getting collection permissions") return nil, nil, clues.Wrap(err, "getting collection permissions")
} }
@ -113,22 +110,22 @@ func RestoreCollections(
opts control.Options, opts control.Options,
dcs []data.RestoreCollection, dcs []data.RestoreCollection,
deets *details.Builder, deets *details.Builder,
errs *fault.Errors,
) (*support.ConnectorOperationStatus, error) { ) (*support.ConnectorOperationStatus, error) {
var ( var (
restoreMetrics support.CollectionMetrics restoreMetrics support.CollectionMetrics
restoreErrors error
metrics support.CollectionMetrics metrics support.CollectionMetrics
folderPerms map[string][]UserPermission folderPerms map[string][]UserPermission
canceled bool
// permissionIDMappings is used to map between old and new id // permissionIDMappings is used to map between old and new id
// of permissions as we restore them // of permissions as we restore them
permissionIDMappings = map[string]string{} permissionIDMappings = map[string]string{}
) )
errUpdater := func(id string, err error) { ctx = clues.Add(
restoreErrors = support.WrapAndAppend(id, err, restoreErrors) ctx,
} "backup_version", backupVersion,
"destination", dest.ContainerName)
// Reorder collections so that the parents directories are created // Reorder collections so that the parents directories are created
// before the child directories // before the child directories
@ -136,12 +133,28 @@ func RestoreCollections(
return dcs[i].FullPath().String() < dcs[j].FullPath().String() return dcs[i].FullPath().String() < dcs[j].FullPath().String()
}) })
parentPermissions := map[string][]UserPermission{} var (
et = errs.Tracker()
parentPermissions = map[string][]UserPermission{}
)
// Iterate through the data collections and restore the contents of each // Iterate through the data collections and restore the contents of each
for _, dc := range dcs { for _, dc := range dcs {
metrics, folderPerms, permissionIDMappings, canceled = RestoreCollection( if et.Err() != nil {
ctx, break
}
var (
err error
ictx = clues.Add(
ctx,
"resource_owner", dc.FullPath().ResourceOwner(), // TODO: pii
"category", dc.FullPath().Category(),
"path", dc.FullPath()) // TODO: pii
)
metrics, folderPerms, permissionIDMappings, err = RestoreCollection(
ictx,
backupVersion, backupVersion,
service, service,
dc, dc,
@ -149,10 +162,12 @@ func RestoreCollections(
OneDriveSource, OneDriveSource,
dest.ContainerName, dest.ContainerName,
deets, deets,
errUpdater,
permissionIDMappings, permissionIDMappings,
opts.RestorePermissions, opts.RestorePermissions,
) errs)
if err != nil {
et.Add(err)
}
for k, v := range folderPerms { for k, v := range folderPerms {
parentPermissions[k] = v parentPermissions[k] = v
@ -160,19 +175,20 @@ func RestoreCollections(
restoreMetrics.Combine(metrics) restoreMetrics.Combine(metrics)
if canceled { if errors.Is(err, context.Canceled) {
break break
} }
} }
return support.CreateStatus( status := support.CreateStatus(
ctx, ctx,
support.Restore, support.Restore,
len(dcs), len(dcs),
restoreMetrics, restoreMetrics,
restoreErrors, et.Err(),
dest.ContainerName), dest.ContainerName)
nil
return status, et.Err()
} }
// RestoreCollection handles restoration of an individual collection. // RestoreCollection handles restoration of an individual collection.
@ -188,13 +204,10 @@ func RestoreCollection(
source driveSource, source driveSource,
restoreContainerName string, restoreContainerName string,
deets *details.Builder, deets *details.Builder,
errUpdater func(string, error),
permissionIDMappings map[string]string, permissionIDMappings map[string]string,
restorePerms bool, restorePerms bool,
) (support.CollectionMetrics, map[string][]UserPermission, map[string]string, bool) { errs *fault.Errors,
ctx, end := D.Span(ctx, "gc:oneDrive:restoreCollection", D.Label("path", dc.FullPath())) ) (support.CollectionMetrics, map[string][]UserPermission, map[string]string, error) {
defer end()
var ( var (
metrics = support.CollectionMetrics{} metrics = support.CollectionMetrics{}
copyBuffer = make([]byte, copyBufferSize) copyBuffer = make([]byte, copyBufferSize)
@ -203,10 +216,12 @@ func RestoreCollection(
folderPerms = map[string][]UserPermission{} folderPerms = map[string][]UserPermission{}
) )
ctx, end := D.Span(ctx, "gc:oneDrive:restoreCollection", D.Label("path", directory))
defer end()
drivePath, err := path.ToOneDrivePath(directory) drivePath, err := path.ToOneDrivePath(directory)
if err != nil { if err != nil {
errUpdater(directory.String(), err) return metrics, folderPerms, permissionIDMappings, clues.Wrap(err, "creating drive path").WithClues(ctx)
return metrics, folderPerms, permissionIDMappings, false
} }
// Assemble folder hierarchy we're going to restore into (we recreate the folder hierarchy // Assemble folder hierarchy we're going to restore into (we recreate the folder hierarchy
@ -216,11 +231,14 @@ func RestoreCollection(
restoreFolderElements := []string{restoreContainerName} restoreFolderElements := []string{restoreContainerName}
restoreFolderElements = append(restoreFolderElements, drivePath.Folders...) restoreFolderElements = append(restoreFolderElements, drivePath.Folders...)
ctx = clues.Add(
ctx,
"directory", dc.FullPath().Folder(false),
"destination_elements", restoreFolderElements,
"drive_id", drivePath.DriveID)
trace.Log(ctx, "gc:oneDrive:restoreCollection", directory.String()) trace.Log(ctx, "gc:oneDrive:restoreCollection", directory.String())
logger.Ctx(ctx).Infow( logger.Ctx(ctx).Info("restoring onedrive collection")
"restoring to destination",
"origin", dc.FullPath().Folder(false),
"destination", restoreFolderElements)
parentPerms, colPerms, err := getParentAndCollectionPermissions( parentPerms, colPerms, err := getParentAndCollectionPermissions(
drivePath, drivePath,
@ -228,8 +246,7 @@ func RestoreCollection(
parentPermissions, parentPermissions,
restorePerms) restorePerms)
if err != nil { if err != nil {
errUpdater(directory.String(), err) return metrics, folderPerms, permissionIDMappings, clues.Wrap(err, "getting permissions").WithClues(ctx)
return metrics, folderPerms, permissionIDMappings, false
} }
// Create restore folders and get the folder ID of the folder the data stream will be restored in // Create restore folders and get the folder ID of the folder the data stream will be restored in
@ -243,35 +260,37 @@ func RestoreCollection(
permissionIDMappings, permissionIDMappings,
) )
if err != nil { if err != nil {
errUpdater(directory.String(), errors.Wrapf(err, "failed to create folders %v", restoreFolderElements)) return metrics, folderPerms, permissionIDMappings, clues.Wrap(err, "creating folders for restore")
return metrics, folderPerms, permissionIDMappings, false
} }
// Restore items from the collection var (
items := dc.Items(ctx, nil) // TODO: fault.Errors instead of nil et = errs.Tracker()
items = dc.Items(ctx, errs)
)
for { for {
if et.Err() != nil {
break
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
errUpdater("context canceled", ctx.Err()) return metrics, folderPerms, permissionIDMappings, err
return metrics, folderPerms, permissionIDMappings, true
case itemData, ok := <-items: case itemData, ok := <-items:
if !ok { if !ok {
return metrics, folderPerms, permissionIDMappings, false return metrics, folderPerms, permissionIDMappings, nil
} }
itemPath, err := dc.FullPath().Append(itemData.UUID(), true) itemPath, err := dc.FullPath().Append(itemData.UUID(), true)
if err != nil { if err != nil {
logger.Ctx(ctx).DPanicw("transforming item to full path", "error", err) et.Add(clues.Wrap(err, "appending item to full path").WithClues(ctx))
errUpdater(itemData.UUID(), err)
continue continue
} }
if source == OneDriveSource && backupVersion >= versionWithDataAndMetaFiles { if source == OneDriveSource && backupVersion >= versionWithDataAndMetaFiles {
name := itemData.UUID() name := itemData.UUID()
if strings.HasSuffix(name, DataFileSuffix) { if strings.HasSuffix(name, DataFileSuffix) {
metrics.Objects++ metrics.Objects++
metrics.TotalBytes += int64(len(copyBuffer)) metrics.TotalBytes += int64(len(copyBuffer))
@ -312,7 +331,7 @@ func RestoreCollection(
} }
if err != nil { if err != nil {
errUpdater(itemData.UUID(), err) et.Add(err)
continue continue
} }
@ -335,28 +354,25 @@ func RestoreCollection(
} }
metaReader := itemData.ToReader() metaReader := itemData.ToReader()
meta, err := getMetadata(metaReader) defer metaReader.Close()
metaReader.Close()
meta, err := getMetadata(metaReader)
if err != nil { if err != nil {
errUpdater(itemData.UUID(), clues.Wrap(err, "folder metadata")) et.Add(clues.Wrap(err, "getting directory metadata").WithClues(ctx))
continue continue
} }
trimmedPath := strings.TrimSuffix(itemPath.String(), DirMetaFileSuffix) trimmedPath := strings.TrimSuffix(itemPath.String(), DirMetaFileSuffix)
folderPerms[trimmedPath] = meta.Permissions folderPerms[trimmedPath] = meta.Permissions
} else {
if !ok {
errUpdater(itemData.UUID(), fmt.Errorf("invalid backup format, you might be using an old backup"))
continue
}
} }
} else { } else {
metrics.Objects++ metrics.Objects++
metrics.TotalBytes += int64(len(copyBuffer)) metrics.TotalBytes += int64(len(copyBuffer))
// No permissions stored at the moment for SharePoint // No permissions stored at the moment for SharePoint
_, itemInfo, err = restoreData(ctx, _, itemInfo, err = restoreData(
ctx,
service, service,
itemData.UUID(), itemData.UUID(),
itemData, itemData,
@ -365,7 +381,7 @@ func RestoreCollection(
copyBuffer, copyBuffer,
source) source)
if err != nil { if err != nil {
errUpdater(itemData.UUID(), err) et.Add(err)
continue continue
} }
@ -380,6 +396,8 @@ func RestoreCollection(
} }
} }
} }
return metrics, folderPerms, permissionIDMappings, et.Err()
} }
type fileFetcher interface { type fileFetcher interface {
@ -425,8 +443,7 @@ func restoreV1File(
meta, err := fetchAndReadMetadata(ctx, fetcher, metaName) meta, err := fetchAndReadMetadata(ctx, fetcher, metaName)
if err != nil { if err != nil {
err = clues.Wrap(err, "restoring file") return details.ItemInfo{}, clues.Wrap(err, "restoring file")
return details.ItemInfo{}, err
} }
err = restorePermissions( err = restorePermissions(
@ -439,8 +456,7 @@ func restoreV1File(
permissionIDMappings, permissionIDMappings,
) )
if err != nil { if err != nil {
err = clues.Wrap(err, "restoring item permissions") return details.ItemInfo{}, clues.Wrap(err, "restoring item permissions")
return details.ItemInfo{}, err
} }
return itemInfo, nil return itemInfo, nil
@ -466,13 +482,11 @@ func restoreV2File(
meta, err := fetchAndReadMetadata(ctx, fetcher, metaName) meta, err := fetchAndReadMetadata(ctx, fetcher, metaName)
if err != nil { if err != nil {
err = clues.Wrap(err, "restoring file") return details.ItemInfo{}, clues.Wrap(err, "restoring file")
return details.ItemInfo{}, err
} }
if err != nil { if err != nil {
err = clues.Wrap(err, "deserializing item metadata") return details.ItemInfo{}, clues.Wrap(err, "deserializing item metadata")
return details.ItemInfo{}, err
} }
// TODO(ashmrtn): Future versions could attempt to do the restore in a // TODO(ashmrtn): Future versions could attempt to do the restore in a
@ -511,8 +525,7 @@ func restoreV2File(
permissionIDMappings, permissionIDMappings,
) )
if err != nil { if err != nil {
err = clues.Wrap(err, "restoring item permissions") return details.ItemInfo{}, clues.Wrap(err, "restoring item permissions")
return details.ItemInfo{}, err
} }
return itemInfo, nil return itemInfo, nil
@ -558,44 +571,33 @@ func CreateRestoreFolders(
) (string, error) { ) (string, error) {
driveRoot, err := service.Client().DrivesById(driveID).Root().Get(ctx, nil) driveRoot, err := service.Client().DrivesById(driveID).Root().Get(ctx, nil)
if err != nil { if err != nil {
return "", errors.Wrapf( return "", clues.Wrap(err, "getting drive root").WithClues(ctx).With(graph.ErrData(err)...)
err,
"failed to get drive root. details: %s",
support.ConnectorStackErrorTrace(err),
)
} }
logger.Ctx(ctx).Debugf("Found Root for Drive %s with ID %s", driveID, *driveRoot.GetId()) parentFolderID := ptr.Val(driveRoot.GetId())
ctx = clues.Add(ctx, "drive_root_id", parentFolderID)
logger.Ctx(ctx).Debug("found drive root")
parentFolderID := *driveRoot.GetId()
for _, folder := range restoreFolders { for _, folder := range restoreFolders {
folderItem, err := getFolder(ctx, service, driveID, parentFolderID, folder) folderItem, err := getFolder(ctx, service, driveID, parentFolderID, folder)
if err == nil { if err == nil {
parentFolderID = *folderItem.GetId() parentFolderID = ptr.Val(folderItem.GetId())
logger.Ctx(ctx).Debugf("Found %s with ID %s", folder, parentFolderID)
continue continue
} }
if !errors.Is(err, errFolderNotFound) { if !errors.Is(err, errFolderNotFound) {
return "", errors.Wrapf(err, "folder %s not found in drive(%s) parentFolder(%s)", folder, driveID, parentFolderID) return "", clues.Wrap(err, "folder not found").With("folder_id", folder).WithClues(ctx)
} }
folderItem, err = createItem(ctx, service, driveID, parentFolderID, newItem(folder, true)) folderItem, err = createItem(ctx, service, driveID, parentFolderID, newItem(folder, true))
if err != nil { if err != nil {
return "", errors.Wrapf( return "", clues.Wrap(err, "creating folder")
err,
"failed to create folder %s/%s. details: %s", parentFolderID, folder,
support.ConnectorStackErrorTrace(err),
)
} }
logger.Ctx(ctx).Debugw("resolved restore destination", parentFolderID = ptr.Val(folderItem.GetId())
"dest_name", folder,
"parent", parentFolderID,
"dest_id", *folderItem.GetId())
parentFolderID = *folderItem.GetId() logger.Ctx(ctx).Debugw("resolved restore destination", "dest_id", parentFolderID)
} }
return parentFolderID, nil return parentFolderID, nil
@ -614,25 +616,27 @@ func restoreData(
ctx, end := D.Span(ctx, "gc:oneDrive:restoreItem", D.Label("item_uuid", itemData.UUID())) ctx, end := D.Span(ctx, "gc:oneDrive:restoreItem", D.Label("item_uuid", itemData.UUID()))
defer end() defer end()
ctx = clues.Add(ctx, "item_name", itemData.UUID())
itemName := itemData.UUID() itemName := itemData.UUID()
trace.Log(ctx, "gc:oneDrive:restoreItem", itemName) trace.Log(ctx, "gc:oneDrive:restoreItem", itemName)
// Get the stream size (needed to create the upload session) // Get the stream size (needed to create the upload session)
ss, ok := itemData.(data.StreamSize) ss, ok := itemData.(data.StreamSize)
if !ok { if !ok {
return "", details.ItemInfo{}, errors.Errorf("item %q does not implement DataStreamInfo", itemName) return "", details.ItemInfo{}, clues.New("item does not implement DataStreamInfo").WithClues(ctx)
} }
// Create Item // Create Item
newItem, err := createItem(ctx, service, driveID, parentFolderID, newItem(name, false)) newItem, err := createItem(ctx, service, driveID, parentFolderID, newItem(name, false))
if err != nil { if err != nil {
return "", details.ItemInfo{}, errors.Wrapf(err, "failed to create item %s", itemName) return "", details.ItemInfo{}, clues.Wrap(err, "creating item")
} }
// Get a drive item writer // Get a drive item writer
w, err := driveItemWriter(ctx, service, driveID, *newItem.GetId(), ss.Size()) w, err := driveItemWriter(ctx, service, driveID, *newItem.GetId(), ss.Size())
if err != nil { if err != nil {
return "", details.ItemInfo{}, errors.Wrapf(err, "failed to create item upload session %s", itemName) return "", details.ItemInfo{}, clues.Wrap(err, "creating item writer")
} }
iReader := itemData.ToReader() iReader := itemData.ToReader()
@ -643,7 +647,7 @@ func restoreData(
// Upload the stream data // Upload the stream data
written, err := io.CopyBuffer(w, progReader, copyBuffer) written, err := io.CopyBuffer(w, progReader, copyBuffer)
if err != nil { if err != nil {
return "", details.ItemInfo{}, errors.Wrapf(err, "failed to upload data: item %s", itemName) return "", details.ItemInfo{}, clues.Wrap(err, "writing item bytes").WithClues(ctx).With(graph.ErrData(err)...)
} }
dii := details.ItemInfo{} dii := details.ItemInfo{}
@ -757,16 +761,16 @@ func restorePermissions(
) error { ) error {
permAdded, permRemoved := getChildPermissions(childPerms, parentPerms) permAdded, permRemoved := getChildPermissions(childPerms, parentPerms)
ctx = clues.Add(ctx, "permission_item_id", itemID)
for _, p := range permRemoved { for _, p := range permRemoved {
err := service.Client().DrivesById(driveID).ItemsById(itemID). err := service.Client().
PermissionsById(permissionIDMappings[p.ID]).Delete(ctx, nil) DrivesById(driveID).
ItemsById(itemID).
PermissionsById(permissionIDMappings[p.ID]).
Delete(ctx, nil)
if err != nil { if err != nil {
return errors.Wrapf( return clues.Wrap(err, "removing permissions").WithClues(ctx).With(graph.ErrData(err)...)
err,
"failed to remove permission for item %s. details: %s",
itemID,
support.ConnectorStackErrorTrace(err),
)
} }
} }
@ -791,12 +795,7 @@ func restorePermissions(
np, err := service.Client().DrivesById(driveID).ItemsById(itemID).Invite().Post(ctx, pbody, nil) np, err := service.Client().DrivesById(driveID).ItemsById(itemID).Invite().Post(ctx, pbody, nil)
if err != nil { if err != nil {
return errors.Wrapf( return clues.Wrap(err, "setting permissions").WithClues(ctx).With(graph.ErrData(err)...)
err,
"failed to set permission for item %s. details: %s",
itemID,
support.ConnectorStackErrorTrace(err),
)
} }
permissionIDMappings[p.ID] = *np.GetValue()[0].GetId() permissionIDMappings[p.ID] = *np.GetValue()[0].GetId()

View File

@ -56,7 +56,6 @@ func RestoreCollections(
// Iterate through the data collections and restore the contents of each // Iterate through the data collections and restore the contents of each
for _, dc := range dcs { for _, dc := range dcs {
var ( var (
canceled bool
category = dc.FullPath().Category() category = dc.FullPath().Category()
metrics support.CollectionMetrics metrics support.CollectionMetrics
ictx = clues.Add(ctx, ictx = clues.Add(ctx,
@ -67,7 +66,7 @@ func RestoreCollections(
switch dc.FullPath().Category() { switch dc.FullPath().Category() {
case path.LibrariesCategory: case path.LibrariesCategory:
metrics, _, _, canceled = onedrive.RestoreCollection( metrics, _, _, err = onedrive.RestoreCollection(
ictx, ictx,
backupVersion, backupVersion,
service, service,
@ -76,9 +75,9 @@ func RestoreCollections(
onedrive.SharePointSource, onedrive.SharePointSource,
dest.ContainerName, dest.ContainerName,
deets, deets,
func(s string, err error) { errs.Add(err) },
map[string]string{}, map[string]string{},
false) false,
errs)
case path.ListsCategory: case path.ListsCategory:
metrics, err = RestoreListCollection( metrics, err = RestoreListCollection(
ictx, ictx,
@ -101,7 +100,7 @@ func RestoreCollections(
restoreMetrics.Combine(metrics) restoreMetrics.Combine(metrics)
if canceled || err != nil { if err != nil {
break break
} }
} }