New restore path for items with name in metadata file (#2495)

## Description

Add code to restore items that have their file name stored in the corso metadata file instead of as the name of the kopia file.

This code is not executed as the conditional for it will not trigger (comparison to max int)

## Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No 

## Type of change

- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

## Issue(s)

* #1535 

## Test Plan

- [x] 💪 Manual
- [ ]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2023-02-20 20:26:25 -08:00 committed by GitHub
parent 4565c9f33b
commit ed6335e590
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"runtime/trace"
"sort"
"strings"
@ -35,6 +36,12 @@ const (
// in which we split from storing just the data to storing both
// the data and metadata in two files.
versionWithDataAndMetaFiles = 1
// versionWithNameInMeta points to the backup format version where we begin
// storing files in kopia with their item ID instead of their OneDrive file
// name.
// TODO(ashmrtn): Update this to a real value when we merge the file name
// change. Set to MAXINT for now to keep the if-check using it working.
versionWithNameInMeta = math.MaxInt
)
func getParentPermissions(
@ -193,7 +200,6 @@ func RestoreCollection(
copyBuffer = make([]byte, copyBufferSize)
directory = dc.FullPath()
itemInfo details.ItemInfo
itemID string
folderPerms = map[string][]UserPermission{}
)
@ -227,7 +233,7 @@ func RestoreCollection(
}
// Create restore folders and get the folder ID of the folder the data stream will be restored in
restoreFolderID, permissionIDMappings, err := createRestoreFoldersWithPermissions(
restoreFolderID, err := createRestoreFoldersWithPermissions(
ctx,
service,
drivePath.DriveID,
@ -269,17 +275,42 @@ func RestoreCollection(
if strings.HasSuffix(name, DataFileSuffix) {
metrics.Objects++
metrics.TotalBytes += int64(len(copyBuffer))
trimmedName := strings.TrimSuffix(name, DataFileSuffix)
itemID, itemInfo, err = restoreData(
ctx,
service,
trimmedName,
itemData,
drivePath.DriveID,
restoreFolderID,
copyBuffer,
source)
var (
itemInfo details.ItemInfo
err error
)
if backupVersion < versionWithNameInMeta {
itemInfo, err = restoreV1File(
ctx,
source,
service,
drivePath,
dc,
restoreFolderID,
copyBuffer,
colPerms,
permissionIDMappings,
restorePerms,
itemData,
)
} else {
itemInfo, err = restoreV2File(
ctx,
source,
service,
drivePath,
dc,
restoreFolderID,
copyBuffer,
colPerms,
permissionIDMappings,
restorePerms,
itemData,
)
}
if err != nil {
errUpdater(itemData.UUID(), err)
continue
@ -292,46 +323,6 @@ func RestoreCollection(
"", // TODO: implement locationRef
true,
itemInfo)
// Mark it as success without processing .meta
// file if we are not restoring permissions
if !restorePerms {
metrics.Successes++
continue
}
// Fetch item permissions from the collection and restore them.
metaName := trimmedName + MetaFileSuffix
permsFile, err := dc.Fetch(ctx, metaName)
if err != nil {
errUpdater(metaName, clues.Wrap(err, "getting item metadata"))
continue
}
metaReader := permsFile.ToReader()
meta, err := getMetadata(metaReader)
metaReader.Close()
if err != nil {
errUpdater(metaName, clues.Wrap(err, "deserializing item metadata"))
continue
}
permissionIDMappings, err = restorePermissions(
ctx,
service,
drivePath.DriveID,
itemID,
colPerms,
meta.Permissions,
permissionIDMappings,
)
if err != nil {
errUpdater(trimmedName, clues.Wrap(err, "restoring item permissions"))
continue
}
metrics.Successes++
} else if strings.HasSuffix(name, MetaFileSuffix) {
// Just skip this for the moment since we moved the code to the above
@ -391,6 +382,142 @@ func RestoreCollection(
}
}
type fileFetcher interface {
Fetch(ctx context.Context, name string) (data.Stream, error)
}
func restoreV1File(
ctx context.Context,
source driveSource,
service graph.Servicer,
drivePath *path.DrivePath,
fetcher fileFetcher,
restoreFolderID string,
copyBuffer []byte,
parentPerms []UserPermission,
permissionIDMappings map[string]string,
restorePerms bool,
itemData data.Stream,
) (details.ItemInfo, error) {
trimmedName := strings.TrimSuffix(itemData.UUID(), DataFileSuffix)
itemID, itemInfo, err := restoreData(
ctx,
service,
trimmedName,
itemData,
drivePath.DriveID,
restoreFolderID,
copyBuffer,
source)
if err != nil {
return details.ItemInfo{}, err
}
// Mark it as success without processing .meta
// file if we are not restoring permissions
if !restorePerms {
return itemInfo, nil
}
// Fetch item permissions from the collection and restore them.
metaName := trimmedName + MetaFileSuffix
meta, err := fetchAndReadMetadata(ctx, fetcher, metaName)
if err != nil {
err = clues.Wrap(err, "restoring file")
return details.ItemInfo{}, err
}
err = restorePermissions(
ctx,
service,
drivePath.DriveID,
itemID,
parentPerms,
meta.Permissions,
permissionIDMappings,
)
if err != nil {
err = clues.Wrap(err, "restoring item permissions")
return details.ItemInfo{}, err
}
return itemInfo, nil
}
func restoreV2File(
ctx context.Context,
source driveSource,
service graph.Servicer,
drivePath *path.DrivePath,
fetcher fileFetcher,
restoreFolderID string,
copyBuffer []byte,
parentPerms []UserPermission,
permissionIDMappings map[string]string,
restorePerms bool,
itemData data.Stream,
) (details.ItemInfo, error) {
trimmedName := strings.TrimSuffix(itemData.UUID(), DataFileSuffix)
// Get metadata file so we can determine the file name.
metaName := trimmedName + MetaFileSuffix
meta, err := fetchAndReadMetadata(ctx, fetcher, metaName)
if err != nil {
err = clues.Wrap(err, "restoring file")
return details.ItemInfo{}, err
}
if err != nil {
err = clues.Wrap(err, "deserializing item metadata")
return details.ItemInfo{}, err
}
// TODO(ashmrtn): Future versions could attempt to do the restore in a
// different location like "lost+found" and use the item ID if we want to do
// as much as possible to restore the data.
if len(meta.FileName) == 0 {
return details.ItemInfo{}, clues.New("item with empty name")
}
itemID, itemInfo, err := restoreData(
ctx,
service,
meta.FileName,
itemData,
drivePath.DriveID,
restoreFolderID,
copyBuffer,
source)
if err != nil {
return details.ItemInfo{}, err
}
// Mark it as success without processing .meta
// file if we are not restoring permissions
if !restorePerms {
return itemInfo, nil
}
err = restorePermissions(
ctx,
service,
drivePath.DriveID,
itemID,
parentPerms,
meta.Permissions,
permissionIDMappings,
)
if err != nil {
err = clues.Wrap(err, "restoring item permissions")
return details.ItemInfo{}, err
}
return itemInfo, nil
}
// createRestoreFoldersWithPermissions creates the restore folder hierarchy in
// the specified drive and returns the folder ID of the last folder entry in the
// hierarchy. Permissions are only applied to the last folder in the hierarchy.
@ -403,13 +530,13 @@ func createRestoreFoldersWithPermissions(
parentPermissions []UserPermission,
folderPermissions []UserPermission,
permissionIDMappings map[string]string,
) (string, map[string]string, error) {
) (string, error) {
id, err := CreateRestoreFolders(ctx, service, driveID, restoreFolders)
if err != nil {
return "", permissionIDMappings, err
return "", err
}
permissionIDMappings, err = restorePermissions(
err = restorePermissions(
ctx,
service,
driveID,
@ -418,7 +545,7 @@ func createRestoreFoldersWithPermissions(
folderPermissions,
permissionIDMappings)
return id, permissionIDMappings, err
return id, err
}
// CreateRestoreFolders creates the restore folder hierarchy in the specified
@ -531,6 +658,29 @@ func restoreData(
return *newItem.GetId(), dii, nil
}
func fetchAndReadMetadata(
ctx context.Context,
fetcher fileFetcher,
metaName string,
) (Metadata, error) {
metaFile, err := fetcher.Fetch(ctx, metaName)
if err != nil {
err = clues.Wrap(err, "getting item metadata").With("meta_file_name", metaName)
return Metadata{}, err
}
metaReader := metaFile.ToReader()
defer metaReader.Close()
meta, err := getMetadata(metaReader)
if err != nil {
err = clues.Wrap(err, "deserializing item metadata").With("meta_file_name", metaName)
return Metadata{}, err
}
return meta, nil
}
// getMetadata read and parses the metadata info for an item
func getMetadata(metar io.ReadCloser) (Metadata, error) {
var meta Metadata
@ -604,14 +754,14 @@ func restorePermissions(
parentPerms []UserPermission,
childPerms []UserPermission,
permissionIDMappings map[string]string,
) (map[string]string, error) {
) error {
permAdded, permRemoved := getChildPermissions(childPerms, parentPerms)
for _, p := range permRemoved {
err := service.Client().DrivesById(driveID).ItemsById(itemID).
PermissionsById(permissionIDMappings[p.ID]).Delete(ctx, nil)
if err != nil {
return permissionIDMappings, errors.Wrapf(
return errors.Wrapf(
err,
"failed to remove permission for item %s. details: %s",
itemID,
@ -641,7 +791,7 @@ func restorePermissions(
np, err := service.Client().DrivesById(driveID).ItemsById(itemID).Invite().Post(ctx, pbody, nil)
if err != nil {
return permissionIDMappings, errors.Wrapf(
return errors.Wrapf(
err,
"failed to set permission for item %s. details: %s",
itemID,
@ -652,5 +802,5 @@ func restorePermissions(
permissionIDMappings[p.ID] = *np.GetValue()[0].GetId()
}
return permissionIDMappings, nil
return nil
}