Coalesce items into collections in kopia/wrapper.go during restore (#888)

## Description

The wrapper.go file was not properly grouping items during restore, instead returning a bunch of single item data.Collections. Update this to properly group items by folder. Also update the tests to be more strict about checking for this.

Remove dead code around restoring a single item as it was unused. Lose some visibility into what happens if the code is unable to open the reader for a given item.

Generally moves some logic around in wrapper.go

## Type of change

<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [x] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🐹 Trivial/Minor

## Issue(s)

<!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. -->
* closes #879 

## Test Plan

<!-- How will this be tested prior to merging.-->
- [ ] 💪 Manual
- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
ashmrtn 2022-09-16 15:12:30 -07:00 committed by GitHub
parent 609bbc3512
commit 09343c0220
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 187 deletions

View File

@ -448,130 +448,61 @@ func (w Wrapper) makeSnapshotWithRoot(
return &res, nil
}
// getEntry returns the item that the restore operation is rooted at. For
// single-item restores, this is the kopia file the data is sourced from. For
// restores of directories or subtrees it is the directory at the root of the
// subtree.
func (w Wrapper) getEntry(
func (w Wrapper) getSnapshotRoot(
ctx context.Context,
snapshotID string,
itemPath path.Path,
) (fs.Entry, error) {
if itemPath == nil {
return nil, errors.WithStack(errNoRestorePath)
}
man, err := snapshot.LoadSnapshot(ctx, w.c, manifest.ID(snapshotID))
if err != nil {
return nil, errors.Wrap(err, "getting snapshot handle")
}
rootDirEntry, err := snapshotfs.SnapshotRoot(w.c, man)
if err != nil {
return nil, errors.Wrap(err, "getting root directory")
return rootDirEntry, errors.Wrap(err, "getting root directory")
}
// getItemStream looks up the item at the given path starting from snapshotRoot.
// If the item is a file in kopia then it returns a data.Stream of the item. If
// the item does not exist in kopia or is not a file an error is returned. The
// UUID of the returned data.Stream will be the name of the kopia file the data
// is sourced from.
func getItemStream(
ctx context.Context,
itemPath path.Path,
snapshotRoot fs.Entry,
) (data.Stream, error) {
if itemPath == nil {
return nil, errors.WithStack(errNoRestorePath)
}
// GetNestedEntry handles nil properly.
e, err := snapshotfs.GetNestedEntry(
ctx,
rootDirEntry,
snapshotRoot,
itemPath.PopFront().Elements(),
)
if err != nil {
return nil, errors.Wrap(err, "getting nested object handle")
}
return e, nil
}
// CollectItems pulls data from kopia for the given items in the snapshot with
// ID snapshotID. If isDirectory is true, it returns a slice of DataCollections
// with data from directories in the subtree rooted at itemPath. If isDirectory
// is false it returns a DataCollection (in a slice) with a single item for each
// requested item. If the item does not exist or a file is found when a directory
// is expected (or the opposite) it returns an error.
func (w Wrapper) collectItems(
ctx context.Context,
snapshotID string,
itemPath path.Path,
) ([]data.Collection, error) {
if itemPath == nil {
return nil, errors.WithStack(errNoRestorePath)
}
parentDir, err := itemPath.Dir()
if err != nil {
return nil, errors.Wrap(err, "getting parent directory from path")
}
e, err := w.getEntry(ctx, snapshotID, itemPath)
if err != nil {
return nil, err
}
f, ok := e.(fs.File)
if !ok {
return nil, errors.New("requested object is not a file")
}
c, err := restoreSingleItem(ctx, f, parentDir)
if err != nil {
return nil, err
}
return []data.Collection{c}, nil
}
// RestoreSingleItem looks up the item at the given path in the snapshot with id
// snapshotID. The path should be the full path of the item from the root.
// If the item is a file in kopia then it returns a DataCollection with the item
// as its sole element and DataCollection.FullPath() set to
// split(dirname(itemPath), "/"). If the item does not exist in kopia or is not
// a file an error is returned. The UUID of the returned DataStreams will be the
// name of the kopia file the data is sourced from.
func (w Wrapper) RestoreSingleItem(
ctx context.Context,
snapshotID string,
itemPath path.Path,
) (data.Collection, error) {
c, err := w.collectItems(ctx, snapshotID, itemPath)
if err != nil {
return nil, err
}
return c[0], nil
}
// restoreSingleItem looks up the item at the given path starting from rootDir
// where rootDir is the root of a snapshot. If the item is a file in kopia then
// it returns a DataCollection with the item as its sole element and
// DataCollection.FullPath() set to split(dirname(itemPath), "/"). If the item
// does not exist in kopia or is not a file an error is returned. The UUID of
// the returned DataStreams will be the name of the kopia file the data is
// sourced from.
func restoreSingleItem(
ctx context.Context,
f fs.File,
itemDir path.Path,
) (data.Collection, error) {
r, err := f.Open(ctx)
if err != nil {
return nil, errors.Wrap(err, "opening file")
}
return &kopiaDataCollection{
streams: []data.Stream{
&kopiaDataStream{
uuid: f.Name(),
reader: r,
size: f.Size(),
},
},
path: itemDir,
return &kopiaDataStream{
uuid: f.Name(),
reader: r,
}, nil
}
// RestoreSingleItem looks up all paths- assuming each is an item declaration,
// RestoreMultipleItems looks up all paths- assuming each is an item declaration,
// not a directory- in the snapshot with id snapshotID. The path should be the
// full path of the item from the root. Returns the results as a slice of single-
// item DataCollections, where the DataCollection.FullPath() matches the path.
@ -587,21 +518,45 @@ func (w Wrapper) RestoreMultipleItems(
return nil, errors.WithStack(errNoRestorePath)
}
snapshotRoot, err := w.getSnapshotRoot(ctx, snapshotID)
if err != nil {
return nil, err
}
var (
dcs = []data.Collection{}
errs *multierror.Error
// Maps short ID of parent path to data collection for that folder.
cols = map[string]*kopiaDataCollection{}
)
for _, itemPath := range paths {
dc, err := w.RestoreSingleItem(ctx, snapshotID, itemPath)
ds, err := getItemStream(ctx, itemPath, snapshotRoot)
if err != nil {
errs = multierror.Append(errs, err)
} else {
dcs = append(dcs, dc)
continue
}
parentPath, err := itemPath.Dir()
if err != nil {
errs = multierror.Append(errs, errors.Wrap(err, "making directory collection"))
continue
}
c, ok := cols[parentPath.ShortRef()]
if !ok {
cols[parentPath.ShortRef()] = &kopiaDataCollection{path: parentPath}
c = cols[parentPath.ShortRef()]
}
c.streams = append(c.streams, ds)
}
return dcs, errs.ErrorOrNil()
res := make([]data.Collection, 0, len(cols))
for _, c := range cols {
res = append(res, c)
}
return res, errs.ErrorOrNil()
}
// DeleteSnapshot removes the provided manifest from kopia.

View File

@ -18,7 +18,6 @@ import (
"github.com/alcionai/corso/src/internal/connector/mockconnector"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/kopia/mockkopia"
"github.com/alcionai/corso/src/internal/path"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
@ -548,21 +547,6 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() {
}
}
func (suite *KopiaUnitSuite) TestRestoreItem() {
ctx := context.Background()
file := &mockkopia.MockFile{
Entry: &mockkopia.MockEntry{
EntryName: testFileName2,
EntryMode: mockkopia.DefaultPermissions,
},
OpenErr: assert.AnError,
}
_, err := restoreSingleItem(ctx, file, nil)
assert.Error(suite.T(), err)
}
// ---------------
// integration tests that use kopia
// ---------------
@ -903,99 +887,61 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TearDownTest() {
assert.NoError(suite.T(), suite.w.Close(suite.ctx))
}
func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem() {
t := suite.T()
item := suite.files[suite.testPath1.String()][0]
expected := map[string][]byte{
item.itemPath.String(): item.data,
}
c, err := suite.w.RestoreSingleItem(
suite.ctx,
string(suite.snapshotID),
item.itemPath,
)
require.NoError(t, err)
assert.Equal(t, suite.testPath1, c.FullPath())
testForFiles(t, expected, []data.Collection{c})
}
// TestBackupAndRestoreSingleItem_Errors exercises the public RestoreSingleItem
// function.
func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem_Errors() {
itemPath := suite.files[suite.testPath1.String()][0].itemPath
doesntExist, err := path.Builder{}.Append("subdir", "foo").ToDataLayerExchangePathForCategory(
testTenant,
testUser,
path.EmailCategory,
true,
)
require.NoError(suite.T(), err)
func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() {
table := []struct {
name string
snapshotID string
path path.Path
name string
// This is both input and can be used to lookup expected output information.
items []*backedupFile
expectedCollections int
}{
{
"EmptyPath",
string(suite.snapshotID),
nil,
name: "SingleItem",
items: []*backedupFile{
suite.files[suite.testPath1.String()][0],
},
expectedCollections: 1,
},
{
"NoSnapshot",
"foo",
itemPath,
name: "MultipleItemsSameCollection",
items: []*backedupFile{
suite.files[suite.testPath1.String()][0],
suite.files[suite.testPath1.String()][1],
},
expectedCollections: 1,
},
{
"TargetNotAFile",
string(suite.snapshotID),
suite.testPath1,
},
{
"NonExistentFile",
string(suite.snapshotID),
doesntExist,
name: "MultipleItemsDifferentCollections",
items: []*backedupFile{
suite.files[suite.testPath1.String()][0],
suite.files[suite.testPath2.String()][0],
},
expectedCollections: 2,
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
_, err := suite.w.RestoreSingleItem(
inputPaths := make([]path.Path, 0, len(test.items))
expected := make(map[string][]byte, len(test.items))
for _, item := range test.items {
inputPaths = append(inputPaths, item.itemPath)
expected[item.itemPath.String()] = suite.filesByPath[item.itemPath.String()].data
}
result, err := suite.w.RestoreMultipleItems(
suite.ctx,
test.snapshotID,
test.path,
string(suite.snapshotID),
inputPaths,
)
require.Error(t, err)
require.NoError(t, err)
assert.Len(t, result, test.expectedCollections)
testForFiles(t, expected, result)
})
}
}
func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() {
t := suite.T()
inputPaths := []path.Path{}
expected := map[string][]byte{}
for _, parent := range []path.Path{suite.testPath1, suite.testPath2} {
item := suite.files[parent.String()][0]
inputPaths = append(inputPaths, item.itemPath)
expected[item.itemPath.String()] = item.data
}
result, err := suite.w.RestoreMultipleItems(
suite.ctx,
string(suite.snapshotID),
inputPaths,
)
require.NoError(t, err)
assert.Equal(t, 2, len(result))
testForFiles(t, expected, result)
}
func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems_Errors() {
itemPath, err := suite.testPath1.Append(testFileName, true)
require.NoError(suite.T(), err)