Update kopia restore logic to take a (RepoRef, Collection Restore Path) pair (#3337)

Begin expanding the restore logic to take a pair of
paths, one denoting the precise location of the item
in kopia and the other denoting the "restore location"
or path the item should be placed at during restore

This PR is not expected to change system functionality
at all

This is the first of 2 PRs to setup all the logic for
this. This PR does not handle properly merging
together multiple collections that have the same
restore location but different RepoRefs due to recent
updates to the kopia wrapper restore logic

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [x] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [ ] 🧹 Tech Debt/Cleanup

#### Issue(s)

* #3197

#### Test Plan

- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2023-05-09 18:50:38 -07:00 committed by GitHub
parent a162425c12
commit b508ea3b72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 958 additions and 94 deletions

View File

@ -781,17 +781,29 @@ func getMetadata(metar io.ReadCloser) (metadata.Metadata, error) {
// Augment restore path to add extra files(meta) needed for restore as
// well as do any other ordering operations on the paths
func AugmentRestorePaths(backupVersion int, paths []path.Path) ([]path.Path, error) {
colPaths := map[string]path.Path{}
//
// Only accepts StoragePath/RestorePath pairs where the RestorePath is
// at least as long as the StoragePath. If the RestorePath is longer than the
// StoragePath then the first few (closest to the root) directories will use
// default permissions during restore.
func AugmentRestorePaths(
backupVersion int,
paths []path.RestorePaths,
) ([]path.RestorePaths, error) {
// Keyed by each value's StoragePath.String() which corresponds to the RepoRef
// of the directory.
colPaths := map[string]path.RestorePaths{}
for _, p := range paths {
first := true
for {
np, err := p.Dir()
sp, err := p.StoragePath.Dir()
if err != nil {
return nil, err
}
drivePath, err := path.ToDrivePath(np)
drivePath, err := path.ToDrivePath(sp)
if err != nil {
return nil, err
}
@ -800,8 +812,31 @@ func AugmentRestorePaths(backupVersion int, paths []path.Path) ([]path.Path, err
break
}
colPaths[np.String()] = np
p = np
if len(p.RestorePath.Elements()) < len(sp.Elements()) {
return nil, clues.New("restorePath shorter than storagePath").
With("restore_path", p.RestorePath, "storage_path", sp)
}
rp := p.RestorePath
// Make sure the RestorePath always points to the level of the current
// collection. We need to track if it's the first iteration because the
// RestorePath starts out at the collection level to begin with.
if !first {
rp, err = p.RestorePath.Dir()
if err != nil {
return nil, err
}
}
paths := path.RestorePaths{
StoragePath: sp,
RestorePath: rp,
}
colPaths[sp.String()] = paths
p = paths
first = false
}
}
@ -814,32 +849,45 @@ func AugmentRestorePaths(backupVersion int, paths []path.Path) ([]path.Path, err
// As of now look up metadata for parent directories from a
// collection.
for _, p := range colPaths {
el := p.Elements()
el := p.StoragePath.Elements()
if backupVersion >= version.OneDrive6NameInMeta {
mPath, err := p.Append(".dirmeta", true)
mPath, err := p.StoragePath.Append(".dirmeta", true)
if err != nil {
return nil, err
}
paths = append(paths, mPath)
paths = append(
paths,
path.RestorePaths{StoragePath: mPath, RestorePath: p.RestorePath})
} else if backupVersion >= version.OneDrive4DirIncludesPermissions {
mPath, err := p.Append(el[len(el)-1]+".dirmeta", true)
mPath, err := p.StoragePath.Append(el[len(el)-1]+".dirmeta", true)
if err != nil {
return nil, err
}
paths = append(paths, mPath)
paths = append(
paths,
path.RestorePaths{StoragePath: mPath, RestorePath: p.RestorePath})
} else if backupVersion >= version.OneDrive1DataAndMetaFiles {
pp, err := p.Dir()
pp, err := p.StoragePath.Dir()
if err != nil {
return nil, err
}
mPath, err := pp.Append(el[len(el)-1]+".dirmeta", true)
if err != nil {
return nil, err
}
paths = append(paths, mPath)
prp, err := p.RestorePath.Dir()
if err != nil {
return nil, err
}
paths = append(
paths,
path.RestorePaths{StoragePath: mPath, RestorePath: prp})
}
}
@ -847,8 +895,11 @@ func AugmentRestorePaths(backupVersion int, paths []path.Path) ([]path.Path, err
// files. This is only a necessity for OneDrive as we are storing
// metadata for files/folders in separate meta files and we the
// data to be restored before we can restore the metadata.
//
// This sorting assumes stuff in the same StoragePath directory end up in the
// same RestorePath collection.
sort.Slice(paths, func(i, j int) bool {
return paths[i].String() < paths[j].String()
return paths[i].StoragePath.String() < paths[j].StoragePath.String()
})
return paths, nil

View File

@ -172,20 +172,30 @@ func (suite *RestoreUnitSuite) TestAugmentRestorePaths() {
base := "id/onedrive/user/files/drives/driveID/root:/"
inPaths := []path.Path{}
inPaths := []path.RestorePaths{}
for _, ps := range test.input {
p, err := path.FromDataLayerPath(base+ps, true)
require.NoError(t, err, "creating path", clues.ToCore(err))
inPaths = append(inPaths, p)
pd, err := p.Dir()
require.NoError(t, err, "creating collection path", clues.ToCore(err))
inPaths = append(
inPaths,
path.RestorePaths{StoragePath: p, RestorePath: pd})
}
outPaths := []path.Path{}
outPaths := []path.RestorePaths{}
for _, ps := range test.output {
p, err := path.FromDataLayerPath(base+ps, true)
require.NoError(t, err, "creating path", clues.ToCore(err))
outPaths = append(outPaths, p)
pd, err := p.Dir()
require.NoError(t, err, "creating collection path", clues.ToCore(err))
outPaths = append(
outPaths,
path.RestorePaths{StoragePath: p, RestorePath: pd})
}
actual, err := AugmentRestorePaths(test.version, inPaths)
@ -197,3 +207,111 @@ func (suite *RestoreUnitSuite) TestAugmentRestorePaths() {
})
}
}
// TestAugmentRestorePaths_DifferentRestorePath tests that RestorePath
// substitution works properly. Since it's only possible for future backup
// versions to need restore path substitution (i.e. due to storing folders by
// ID instead of name) this is only tested against the most recent backup
// version at the moment.
func (suite *RestoreUnitSuite) TestAugmentRestorePaths_DifferentRestorePath() {
// Adding a simple test here so that we can be sure that this
// function gets updated whenever we add a new version.
require.LessOrEqual(suite.T(), version.Backup, version.All8MigrateUserPNToID, "unsupported backup version")
type pathPair struct {
storage string
restore string
}
table := []struct {
name string
version int
input []pathPair
output []pathPair
errCheck assert.ErrorAssertionFunc
}{
{
name: "nested folders",
version: version.Backup,
input: []pathPair{
{storage: "folder-id/file.txt.data", restore: "folder"},
{storage: "folder-id/folder2-id/file.txt.data", restore: "folder/folder2"},
},
output: []pathPair{
{storage: "folder-id/.dirmeta", restore: "folder"},
{storage: "folder-id/file.txt.data", restore: "folder"},
{storage: "folder-id/folder2-id/.dirmeta", restore: "folder/folder2"},
{storage: "folder-id/folder2-id/file.txt.data", restore: "folder/folder2"},
},
errCheck: assert.NoError,
},
{
name: "restore path longer one folder",
version: version.Backup,
input: []pathPair{
{storage: "folder-id/file.txt.data", restore: "corso_restore/folder"},
},
output: []pathPair{
{storage: "folder-id/.dirmeta", restore: "corso_restore/folder"},
{storage: "folder-id/file.txt.data", restore: "corso_restore/folder"},
},
errCheck: assert.NoError,
},
{
name: "restore path shorter one folder",
version: version.Backup,
input: []pathPair{
{storage: "folder-id/file.txt.data", restore: ""},
},
errCheck: assert.Error,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
_, flush := tester.NewContext()
defer flush()
base := "id/onedrive/user/files/drives/driveID/root:/"
inPaths := []path.RestorePaths{}
for _, ps := range test.input {
p, err := path.FromDataLayerPath(base+ps.storage, true)
require.NoError(t, err, "creating path", clues.ToCore(err))
r, err := path.FromDataLayerPath(base+ps.restore, false)
require.NoError(t, err, "creating path", clues.ToCore(err))
inPaths = append(
inPaths,
path.RestorePaths{StoragePath: p, RestorePath: r})
}
outPaths := []path.RestorePaths{}
for _, ps := range test.output {
p, err := path.FromDataLayerPath(base+ps.storage, true)
require.NoError(t, err, "creating path", clues.ToCore(err))
r, err := path.FromDataLayerPath(base+ps.restore, false)
require.NoError(t, err, "creating path", clues.ToCore(err))
outPaths = append(
outPaths,
path.RestorePaths{StoragePath: p, RestorePath: r})
}
actual, err := AugmentRestorePaths(test.version, inPaths)
test.errCheck(t, err, "augmenting paths", clues.ToCore(err))
if err != nil {
return
}
// Ordering of paths matter here as we need dirmeta files
// to show up before file in dir
assert.Equal(t, outPaths, actual, "augmented paths")
})
}
}

View File

@ -0,0 +1,112 @@
package kopia
import (
"context"
"errors"
"github.com/alcionai/clues"
"golang.org/x/exp/slices"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
)
var _ data.RestoreCollection = &mergeCollection{}
type col struct {
storagePath string
data.RestoreCollection
}
type mergeCollection struct {
cols []col
// Technically don't need to track this but it can help detect errors.
fullPath path.Path
}
func (mc *mergeCollection) addCollection(
storagePath string,
c data.RestoreCollection,
) error {
if c == nil {
return clues.New("adding nil collection").
With("current_path", mc.FullPath())
} else if mc.FullPath().String() != c.FullPath().String() {
return clues.New("attempting to merge collection with different path").
With("current_path", mc.FullPath(), "new_path", c.FullPath())
}
mc.cols = append(mc.cols, col{storagePath: storagePath, RestoreCollection: c})
// Keep a stable sorting of this merged collection set so we can say there's
// some deterministic behavior when Fetch is called. We don't expect to have
// to merge many collections.
slices.SortStableFunc(mc.cols, func(a, b col) bool {
return a.storagePath < b.storagePath
})
return nil
}
func (mc mergeCollection) FullPath() path.Path {
return mc.fullPath
}
func (mc *mergeCollection) Items(
ctx context.Context,
errs *fault.Bus,
) <-chan data.Stream {
res := make(chan data.Stream)
go func() {
defer close(res)
logger.Ctx(ctx).Infow(
"getting items for merged collection",
"merged_collection_count", len(mc.cols))
for _, c := range mc.cols {
// Unfortunately doesn't seem to be a way right now to see if the
// iteration failed and we should be exiting early.
ictx := clues.Add(ctx, "merged_collection_storage_path", c.storagePath)
logger.Ctx(ictx).Debug("sending items from merged collection")
for item := range c.Items(ictx, errs) {
res <- item
}
}
}()
return res
}
// Fetch goes through all the collections in this one and returns the first
// match found or the first error that is not data.ErrNotFound. If multiple
// collections have the requested item, the instance in the collection with the
// lexicographically smallest storage path is returned.
func (mc *mergeCollection) Fetch(
ctx context.Context,
name string,
) (data.Stream, error) {
logger.Ctx(ctx).Infow(
"fetching item in merged collection",
"merged_collection_count", len(mc.cols))
for _, c := range mc.cols {
ictx := clues.Add(ctx, "merged_collection_storage_path", c.storagePath)
logger.Ctx(ictx).Debug("looking for item in merged collection")
s, err := c.Fetch(ictx, name)
if err == nil {
return s, nil
} else if err != nil && !errors.Is(err, data.ErrNotFound) {
return nil, clues.Wrap(err, "fetching from merged collection").
WithClues(ictx)
}
}
return nil, clues.Wrap(data.ErrNotFound, "merged collection fetch")
}

View File

@ -0,0 +1,297 @@
package kopia
import (
"bytes"
"io"
"testing"
"github.com/alcionai/clues"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/exchange/mock"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/fault"
"github.com/alcionai/corso/src/pkg/path"
)
type MergeCollectionUnitSuite struct {
tester.Suite
}
func TestMergeCollectionUnitSuite(t *testing.T) {
suite.Run(t, &MergeCollectionUnitSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *MergeCollectionUnitSuite) TestReturnsPath() {
t := suite.T()
pth, err := path.Build(
"a-tenant",
"a-user",
path.ExchangeService,
path.EmailCategory,
false,
"some", "path", "for", "data")
require.NoError(t, err, clues.ToCore(err))
c := mergeCollection{
fullPath: pth,
}
assert.Equal(t, pth, c.FullPath())
}
func (suite *MergeCollectionUnitSuite) TestItems() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
storagePaths := []string{
"tenant-id/exchange/user-id/mail/some/folder/path1",
"tenant-id/exchange/user-id/mail/some/folder/path2",
}
expectedItemNames := []string{"1", "2"}
pth, err := path.Build(
"a-tenant",
"a-user",
path.ExchangeService,
path.EmailCategory,
false,
"some", "path", "for", "data")
require.NoError(t, err, clues.ToCore(err))
c1 := mock.NewCollection(pth, nil, 1)
c1.Names[0] = expectedItemNames[0]
c2 := mock.NewCollection(pth, nil, 1)
c2.Names[0] = expectedItemNames[1]
// Not testing fetch here so safe to use this wrapper.
cols := []data.RestoreCollection{
data.NotFoundRestoreCollection{Collection: c1},
data.NotFoundRestoreCollection{Collection: c2},
}
dc := &mergeCollection{fullPath: pth}
for i, c := range cols {
err := dc.addCollection(storagePaths[i], c)
require.NoError(t, err, "adding collection", clues.ToCore(err))
}
gotItemNames := []string{}
for item := range dc.Items(ctx, fault.New(true)) {
gotItemNames = append(gotItemNames, item.UUID())
}
assert.ElementsMatch(t, expectedItemNames, gotItemNames)
}
func (suite *MergeCollectionUnitSuite) TestAddCollection_DifferentPathFails() {
t := suite.T()
pth1, err := path.Build(
"a-tenant",
"a-user",
path.ExchangeService,
path.EmailCategory,
false,
"some", "path", "for", "data")
require.NoError(t, err, clues.ToCore(err))
pth2, err := path.Build(
"a-tenant",
"a-user",
path.ExchangeService,
path.EmailCategory,
false,
"some", "path", "for", "data2")
require.NoError(t, err, clues.ToCore(err))
dc := mergeCollection{fullPath: pth1}
err = dc.addCollection("some/path", &kopiaDataCollection{path: pth2})
assert.Error(t, err, clues.ToCore(err))
}
func (suite *MergeCollectionUnitSuite) TestFetch() {
var (
fileData1 = []byte("abcdefghijklmnopqrstuvwxyz")
fileData2 = []byte("zyxwvutsrqponmlkjihgfedcba")
fileData3 = []byte("foo bar baz")
fileName1 = "file1"
fileName2 = "file2"
fileLookupErrName = "errLookup"
fileOpenErrName = "errOpen"
colPaths = []string{
"tenant-id/exchange/user-id/mail/some/data/directory1",
"tenant-id/exchange/user-id/mail/some/data/directory2",
}
)
pth, err := path.Build(
"a-tenant",
"a-user",
path.ExchangeService,
path.EmailCategory,
false,
"some", "path", "for", "data")
require.NoError(suite.T(), err, clues.ToCore(err))
// Needs to be a function so the readers get refreshed each time.
layouts := []func() fs.Directory{
// Has the following;
// - file1: data[0]
// - errOpen: (error opening file)
func() fs.Directory {
return virtualfs.NewStaticDirectory(encodeAsPath(colPaths[0]), []fs.Entry{
&mockFile{
StreamingFile: virtualfs.StreamingFileFromReader(
encodeAsPath(fileName1),
nil,
),
r: newBackupStreamReader(
serializationVersion,
io.NopCloser(bytes.NewReader(fileData1)),
),
size: int64(len(fileData1) + versionSize),
},
&mockFile{
StreamingFile: virtualfs.StreamingFileFromReader(
encodeAsPath(fileOpenErrName),
nil,
),
openErr: assert.AnError,
},
})
},
// Has the following;
// - file1: data[1]
// - file2: data[0]
// - errOpen: data[2]
func() fs.Directory {
return virtualfs.NewStaticDirectory(encodeAsPath(colPaths[1]), []fs.Entry{
&mockFile{
StreamingFile: virtualfs.StreamingFileFromReader(
encodeAsPath(fileName1),
nil,
),
r: newBackupStreamReader(
serializationVersion,
io.NopCloser(bytes.NewReader(fileData2)),
),
size: int64(len(fileData2) + versionSize),
},
&mockFile{
StreamingFile: virtualfs.StreamingFileFromReader(
encodeAsPath(fileName2),
nil,
),
r: newBackupStreamReader(
serializationVersion,
io.NopCloser(bytes.NewReader(fileData1)),
),
size: int64(len(fileData1) + versionSize),
},
&mockFile{
StreamingFile: virtualfs.StreamingFileFromReader(
encodeAsPath(fileOpenErrName),
nil,
),
r: newBackupStreamReader(
serializationVersion,
io.NopCloser(bytes.NewReader(fileData3)),
),
size: int64(len(fileData3) + versionSize),
},
})
},
}
table := []struct {
name string
fileName string
expectError assert.ErrorAssertionFunc
expectData []byte
notFoundErr bool
}{
{
name: "Duplicate File, first collection",
fileName: fileName1,
expectError: assert.NoError,
expectData: fileData1,
},
{
name: "Distinct File, second collection",
fileName: fileName2,
expectError: assert.NoError,
expectData: fileData1,
},
{
name: "Error opening file",
fileName: fileOpenErrName,
expectError: assert.Error,
},
{
name: "File not found",
fileName: fileLookupErrName,
expectError: assert.Error,
notFoundErr: true,
},
}
for _, test := range table {
suite.Run(test.name, func() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
c := &i64counter{}
dc := mergeCollection{fullPath: pth}
for i, layout := range layouts {
col := &kopiaDataCollection{
path: pth,
dir: layout(),
counter: c,
expectedVersion: serializationVersion,
}
err := dc.addCollection(colPaths[i], col)
require.NoError(t, err, "adding collection", clues.ToCore(err))
}
s, err := dc.Fetch(ctx, test.fileName)
test.expectError(t, err, clues.ToCore(err))
if err != nil {
if test.notFoundErr {
assert.ErrorIs(t, err, data.ErrNotFound, clues.ToCore(err))
}
return
}
fileData, err := io.ReadAll(s.ToReader())
require.NoError(t, err, "reading file data", clues.ToCore(err))
if err != nil {
return
}
assert.Equal(t, test.expectData, fileData)
})
}
}

View File

@ -365,6 +365,11 @@ type ByteCounter interface {
Count(numBytes int64)
}
type restoreCollection struct {
restorePath path.Path
storageDirs map[string]*dirAndItems
}
type dirAndItems struct {
dir path.Path
items []string
@ -380,7 +385,7 @@ func loadDirsAndItems(
ctx context.Context,
snapshotRoot fs.Entry,
bcounter ByteCounter,
toLoad map[string]*dirAndItems,
toLoad map[string]*restoreCollection,
bus *fault.Bus,
) ([]data.RestoreCollection, error) {
var (
@ -389,50 +394,67 @@ func loadDirsAndItems(
loadCount = 0
)
for _, dirItems := range toLoad {
for _, col := range toLoad {
if el.Failure() != nil {
return nil, el.Failure()
}
ictx := clues.Add(ctx, "directory_path", dirItems.dir)
ictx := clues.Add(ctx, "restore_path", col.restorePath)
dir, err := getDir(ictx, dirItems.dir, snapshotRoot)
if err != nil {
el.AddRecoverable(clues.Wrap(err, "loading directory").
WithClues(ictx).
Label(fault.LabelForceNoBackupCreation))
mergeCol := &mergeCollection{fullPath: col.restorePath}
res = append(res, mergeCol)
continue
}
dc := &kopiaDataCollection{
path: dirItems.dir,
dir: dir,
counter: bcounter,
expectedVersion: serializationVersion,
}
res = append(res, dc)
for _, item := range dirItems.items {
for _, dirItems := range col.storageDirs {
if el.Failure() != nil {
return nil, el.Failure()
}
err := dc.addStream(ictx, item)
ictx = clues.Add(ictx, "storage_directory_path", dirItems.dir)
dir, err := getDir(ictx, dirItems.dir, snapshotRoot)
if err != nil {
el.AddRecoverable(clues.Wrap(err, "loading item").
el.AddRecoverable(clues.Wrap(err, "loading storage directory").
WithClues(ictx).
Label(fault.LabelForceNoBackupCreation))
continue
}
loadCount++
if loadCount%1000 == 0 {
logger.Ctx(ctx).Infow(
"loading items from kopia",
"loaded_items", loadCount)
dc := &kopiaDataCollection{
path: col.restorePath,
dir: dir,
counter: bcounter,
expectedVersion: serializationVersion,
}
if err := mergeCol.addCollection(dirItems.dir.String(), dc); err != nil {
el.AddRecoverable(clues.Wrap(err, "adding collection to merge collection").
WithClues(ctx).
Label(fault.LabelForceNoBackupCreation))
continue
}
for _, item := range dirItems.items {
if el.Failure() != nil {
return nil, el.Failure()
}
err := dc.addStream(ictx, item)
if err != nil {
el.AddRecoverable(clues.Wrap(err, "loading item").
WithClues(ictx).
Label(fault.LabelForceNoBackupCreation))
continue
}
loadCount++
if loadCount%1000 == 0 {
logger.Ctx(ctx).Infow(
"loading items from kopia",
"loaded_items", loadCount)
}
}
}
}
@ -454,7 +476,7 @@ func loadDirsAndItems(
func (w Wrapper) ProduceRestoreCollections(
ctx context.Context,
snapshotID string,
paths []path.Path,
paths []path.RestorePaths,
bcounter ByteCounter,
errs *fault.Bus,
) ([]data.RestoreCollection, error) {
@ -474,36 +496,53 @@ func (w Wrapper) ProduceRestoreCollections(
var (
loadCount int
// Directory path -> set of items to load from the directory.
dirsToItems = map[string]*dirAndItems{}
// RestorePath -> []StoragePath directory -> set of items to load from the
// directory.
dirsToItems = map[string]*restoreCollection{}
el = errs.Local()
)
for _, itemPath := range paths {
for _, itemPaths := range paths {
if el.Failure() != nil {
return nil, el.Failure()
}
// Group things by directory so we can load all items from a single
// directory instance lower down.
ictx := clues.Add(ctx, "item_path", itemPath.String())
// Group things by RestorePath and then StoragePath so we can load multiple
// items from a single directory instance lower down.
ictx := clues.Add(
ctx,
"item_path", itemPaths.StoragePath.String(),
"restore_path", itemPaths.RestorePath.String())
parentPath, err := itemPath.Dir()
parentStoragePath, err := itemPaths.StoragePath.Dir()
if err != nil {
el.AddRecoverable(clues.Wrap(err, "making directory collection").
el.AddRecoverable(clues.Wrap(err, "getting storage directory path").
WithClues(ictx).
Label(fault.LabelForceNoBackupCreation))
continue
}
di := dirsToItems[parentPath.ShortRef()]
if di == nil {
dirsToItems[parentPath.ShortRef()] = &dirAndItems{dir: parentPath}
di = dirsToItems[parentPath.ShortRef()]
// Find the location this item is restored to.
rc := dirsToItems[itemPaths.RestorePath.ShortRef()]
if rc == nil {
dirsToItems[itemPaths.RestorePath.ShortRef()] = &restoreCollection{
restorePath: itemPaths.RestorePath,
storageDirs: map[string]*dirAndItems{},
}
rc = dirsToItems[itemPaths.RestorePath.ShortRef()]
}
di.items = append(di.items, itemPath.Item())
// Find the collection this item is sourced from.
di := rc.storageDirs[parentStoragePath.ShortRef()]
if di == nil {
rc.storageDirs[parentStoragePath.ShortRef()] = &dirAndItems{
dir: parentStoragePath,
}
di = rc.storageDirs[parentStoragePath.ShortRef()]
}
di.items = append(di.items, itemPaths.StoragePath.Item())
loadCount++
if loadCount%1000 == 0 {

View File

@ -59,14 +59,12 @@ var (
testFileData6 = testFileData
)
//revive:disable:context-as-argument
func testForFiles(
t *testing.T,
ctx context.Context,
ctx context.Context, //revive:disable-line:context-as-argument
expected map[string][]byte,
collections []data.RestoreCollection,
) {
//revive:enable:context-as-argument
t.Helper()
count := 0
@ -107,6 +105,19 @@ func checkSnapshotTags(
assert.Equal(t, expectedTags, man.Tags)
}
func toRestorePaths(t *testing.T, paths ...path.Path) []path.RestorePaths {
res := make([]path.RestorePaths, 0, len(paths))
for _, p := range paths {
dir, err := p.Dir()
require.NoError(t, err, clues.ToCore(err))
res = append(res, path.RestorePaths{StoragePath: p, RestorePath: dir})
}
return res
}
// ---------------
// unit tests
// ---------------
@ -705,10 +716,7 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() {
result, err := w.ProduceRestoreCollections(
ctx,
string(stats.SnapshotID),
[]path.Path{
fp1,
fp2,
},
toRestorePaths(t, fp1, fp2),
nil,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
@ -838,7 +846,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() {
_, err = suite.w.ProduceRestoreCollections(
suite.ctx,
string(stats.SnapshotID),
[]path.Path{failedPath},
toRestorePaths(t, failedPath),
&ic,
fault.New(true))
// Files that had an error shouldn't make a dir entry in kopia. If they do we
@ -1219,9 +1227,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() {
_, err = suite.w.ProduceRestoreCollections(
suite.ctx,
string(stats.SnapshotID),
[]path.Path{
suite.files[suite.testPath1.String()][0].itemPath,
},
toRestorePaths(t, suite.files[suite.testPath1.String()][0].itemPath),
&ic,
fault.New(true))
test.restoreCheck(t, err, clues.ToCore(err))
@ -1322,7 +1328,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections() {
result, err := suite.w.ProduceRestoreCollections(
suite.ctx,
string(suite.snapshotID),
test.inputPaths,
toRestorePaths(t, test.inputPaths...),
&ic,
fault.New(true))
test.expectedErr(t, err, clues.ToCore(err))
@ -1338,6 +1344,193 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections() {
}
}
// TestProduceRestoreCollections_PathChanges tests that having different
// Restore and Storage paths works properly. Having the same Restore and Storage
// paths is tested by TestProduceRestoreCollections.
func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_PathChanges() {
rp1, err := path.Build(
testTenant,
testUser,
path.ExchangeService,
path.EmailCategory,
false,
"corso_restore", "Inbox")
require.NoError(suite.T(), err)
rp2, err := path.Build(
testTenant,
testUser,
path.ExchangeService,
path.EmailCategory,
false,
"corso_restore", "Archive")
require.NoError(suite.T(), err)
// Expected items is generated during the test by looking up paths in the
// suite's map of files.
table := []struct {
name string
inputPaths []path.RestorePaths
expectedCollections int
}{
{
name: "SingleItem",
inputPaths: []path.RestorePaths{
{
StoragePath: suite.files[suite.testPath1.String()][0].itemPath,
RestorePath: rp1,
},
},
expectedCollections: 1,
},
{
name: "MultipleItemsSameCollection",
inputPaths: []path.RestorePaths{
{
StoragePath: suite.files[suite.testPath1.String()][0].itemPath,
RestorePath: rp1,
},
{
StoragePath: suite.files[suite.testPath1.String()][1].itemPath,
RestorePath: rp1,
},
},
expectedCollections: 1,
},
{
name: "MultipleItemsDifferentCollections",
inputPaths: []path.RestorePaths{
{
StoragePath: suite.files[suite.testPath1.String()][0].itemPath,
RestorePath: rp1,
},
{
StoragePath: suite.files[suite.testPath2.String()][0].itemPath,
RestorePath: rp2,
},
},
expectedCollections: 2,
},
{
name: "Multiple Items From Different Collections To Same Collection",
inputPaths: []path.RestorePaths{
{
StoragePath: suite.files[suite.testPath1.String()][0].itemPath,
RestorePath: rp1,
},
{
StoragePath: suite.files[suite.testPath2.String()][0].itemPath,
RestorePath: rp1,
},
},
expectedCollections: 1,
},
}
for _, test := range table {
suite.Run(test.name, func() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
expected := make(map[string][]byte, len(test.inputPaths))
for _, pth := range test.inputPaths {
item, ok := suite.filesByPath[pth.StoragePath.String()]
require.True(t, ok, "getting expected file data")
itemPath, err := pth.RestorePath.Append(pth.StoragePath.Item(), true)
require.NoError(t, err, "getting expected item path")
expected[itemPath.String()] = item.data
}
ic := i64counter{}
result, err := suite.w.ProduceRestoreCollections(
suite.ctx,
string(suite.snapshotID),
test.inputPaths,
&ic,
fault.New(true))
require.NoError(t, err, clues.ToCore(err))
assert.Len(t, result, test.expectedCollections)
assert.Less(t, int64(0), ic.i)
testForFiles(t, ctx, expected, result)
})
}
}
// TestProduceRestoreCollections_Fetch tests that the Fetch function still works
// properly even with different Restore and Storage paths and items from
// different kopia directories.
func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_Fetch() {
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
rp1, err := path.Build(
testTenant,
testUser,
path.ExchangeService,
path.EmailCategory,
false,
"corso_restore", "Inbox")
require.NoError(suite.T(), err)
inputPaths := []path.RestorePaths{
{
StoragePath: suite.files[suite.testPath1.String()][0].itemPath,
RestorePath: rp1,
},
{
StoragePath: suite.files[suite.testPath2.String()][0].itemPath,
RestorePath: rp1,
},
}
// Really only interested in getting the collection so we can call fetch on
// it.
ic := i64counter{}
result, err := suite.w.ProduceRestoreCollections(
suite.ctx,
string(suite.snapshotID),
inputPaths,
&ic,
fault.New(true))
require.NoError(t, err, "getting collection", clues.ToCore(err))
require.Len(t, result, 1)
// Item from first kopia directory.
f := suite.files[suite.testPath1.String()][0]
item, err := result[0].Fetch(ctx, f.itemPath.Item())
require.NoError(t, err, "fetching file", clues.ToCore(err))
r := item.ToReader()
buf, err := io.ReadAll(r)
require.NoError(t, err, "reading file data", clues.ToCore(err))
assert.Equal(t, f.data, buf)
// Item from second kopia directory.
f = suite.files[suite.testPath2.String()][0]
item, err = result[0].Fetch(ctx, f.itemPath.Item())
require.NoError(t, err, "fetching file", clues.ToCore(err))
r = item.ToReader()
buf, err = io.ReadAll(r)
require.NoError(t, err, "reading file data", clues.ToCore(err))
assert.Equal(t, f.data, buf)
}
func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_Errors() {
itemPath, err := suite.testPath1.Append(testFileName, true)
require.NoError(suite.T(), err, clues.ToCore(err))
@ -1345,7 +1538,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_Erro
table := []struct {
name string
snapshotID string
paths []path.Path
paths []path.RestorePaths
}{
{
"NilPaths",
@ -1355,12 +1548,12 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_Erro
{
"EmptyPaths",
string(suite.snapshotID),
[]path.Path{},
[]path.RestorePaths{},
},
{
"NoSnapshot",
"foo",
[]path.Path{itemPath},
toRestorePaths(suite.T(), itemPath),
},
}
@ -1393,7 +1586,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestDeleteSnapshot() {
c, err := suite.w.ProduceRestoreCollections(
suite.ctx,
string(suite.snapshotID),
[]path.Path{itemPath},
toRestorePaths(t, itemPath),
&ic,
fault.New(true))
assert.Error(t, err, "snapshot should be deleted", clues.ToCore(err))

View File

@ -248,10 +248,9 @@ func checkBackupIsInManifests(
}
}
//revive:disable:context-as-argument
func checkMetadataFilesExist(
t *testing.T,
ctx context.Context,
ctx context.Context, //revive:disable-line:context-as-argument
backupID model.StableID,
kw *kopia.Wrapper,
ms *kopia.ModelStore,
@ -259,7 +258,6 @@ func checkMetadataFilesExist(
service path.ServiceType,
filesByCat map[path.CategoryType][]string,
) {
//revive:enable:context-as-argument
for category, files := range filesByCat {
t.Run(category.String(), func(t *testing.T) {
bup := &backup.Backup{}
@ -269,7 +267,7 @@ func checkMetadataFilesExist(
return
}
paths := []path.Path{}
paths := []path.RestorePaths{}
pathsByRef := map[string][]string{}
for _, fName := range files {
@ -285,11 +283,18 @@ func checkMetadataFilesExist(
continue
}
paths = append(paths, p)
paths = append(
paths,
path.RestorePaths{StoragePath: p, RestorePath: dir})
pathsByRef[dir.ShortRef()] = append(pathsByRef[dir.ShortRef()], fName)
}
cols, err := kw.ProduceRestoreCollections(ctx, bup.SnapshotID, paths, nil, fault.New(true))
cols, err := kw.ProduceRestoreCollections(
ctx,
bup.SnapshotID,
paths,
nil,
fault.New(true))
assert.NoError(t, err, clues.ToCore(err))
for _, col := range cols {

View File

@ -46,16 +46,28 @@ type mockRestoreProducer struct {
onRestore restoreFunc
}
type restoreFunc func(id string, ps []path.Path) ([]data.RestoreCollection, error)
type restoreFunc func(
id string,
ps []path.RestorePaths,
) ([]data.RestoreCollection, error)
func (mr *mockRestoreProducer) buildRestoreFunc(
t *testing.T,
oid string,
ops []path.Path,
) {
mr.onRestore = func(id string, ps []path.Path) ([]data.RestoreCollection, error) {
mr.onRestore = func(
id string,
ps []path.RestorePaths,
) ([]data.RestoreCollection, error) {
gotPaths := make([]path.Path, 0, len(ps))
for _, rp := range ps {
gotPaths = append(gotPaths, rp.StoragePath)
}
assert.Equal(t, oid, id, "manifest id")
checkPaths(t, ops, ps)
checkPaths(t, ops, gotPaths)
return mr.colls, mr.err
}
@ -64,11 +76,13 @@ func (mr *mockRestoreProducer) buildRestoreFunc(
func (mr *mockRestoreProducer) ProduceRestoreCollections(
ctx context.Context,
snapshotID string,
paths []path.Path,
paths []path.RestorePaths,
bc kopia.ByteCounter,
errs *fault.Bus,
) ([]data.RestoreCollection, error) {
mr.gotPaths = append(mr.gotPaths, paths...)
for _, ps := range paths {
mr.gotPaths = append(mr.gotPaths, ps.StoragePath)
}
if mr.onRestore != nil {
return mr.onRestore(snapshotID, paths)

View File

@ -47,7 +47,7 @@ type (
ProduceRestoreCollections(
ctx context.Context,
snapshotID string,
paths []path.Path,
paths []path.RestorePaths,
bc kopia.ByteCounter,
errs *fault.Bus,
) ([]data.RestoreCollection, error)

View File

@ -308,7 +308,7 @@ func collectMetadata(
tenantID string,
errs *fault.Bus,
) ([]data.RestoreCollection, error) {
paths := []path.Path{}
paths := []path.RestorePaths{}
for _, fn := range fileNames {
for _, reason := range man.Reasons {
@ -326,7 +326,14 @@ func collectMetadata(
With("metadata_file", fn, "category", reason.Category)
}
paths = append(paths, p)
dir, err := p.Dir()
if err != nil {
return nil, clues.
Wrap(err, "building metadata collection path").
With("metadata_file", fn, "category", reason.Category)
}
paths = append(paths, path.RestorePaths{StoragePath: p, RestorePath: dir})
}
}

View File

@ -349,7 +349,7 @@ func formatDetailsForRestoration(
sel selectors.Selector,
deets *details.Details,
errs *fault.Bus,
) ([]path.Path, error) {
) ([]path.RestorePaths, error) {
fds, err := sel.Reduce(ctx, deets, errs)
if err != nil {
return nil, err
@ -357,7 +357,7 @@ func formatDetailsForRestoration(
var (
fdsPaths = fds.Paths()
paths = make([]path.Path, len(fdsPaths))
paths = make([]path.RestorePaths, len(fdsPaths))
shortRefs = make([]string, len(fdsPaths))
el = errs.Local()
)
@ -377,7 +377,18 @@ func formatDetailsForRestoration(
continue
}
paths[i] = p
dir, err := p.Dir()
if err != nil {
el.AddRecoverable(clues.
Wrap(err, "getting restore directory after reduction").
WithClues(ctx).
With("path", fdsPaths[i]))
continue
}
paths[i].StoragePath = p
paths[i].RestorePath = dir
shortRefs[i] = p.ShortRef()
}

View File

@ -262,12 +262,22 @@ func read(
return clues.Stack(err).WithClues(ctx)
}
pd, err := p.Dir()
if err != nil {
return clues.Stack(err).WithClues(ctx)
}
ctx = clues.Add(ctx, "snapshot_id", snapshotID)
cs, err := rer.ProduceRestoreCollections(
ctx,
snapshotID,
[]path.Path{p},
[]path.RestorePaths{
{
StoragePath: p,
RestorePath: pd,
},
},
&stats.ByteCounter{},
errs)
if err != nil {

View File

@ -130,6 +130,13 @@ var (
_ fmt.Stringer = &Builder{}
)
// RestorePaths denotes the location to find an item in kopia and the path of
// the collection to place the item in for restore.
type RestorePaths struct {
StoragePath Path
RestorePath Path
}
// Builder is a simple path representation that only tracks path elements. It
// can join, escape, and unescape elements. Higher-level packages are expected
// to wrap this struct to build resource-specific contexts (e.x. an