Allow getting data from kopia for a directory subtree (#299)

* Implement getting data for directory subtree

Return a slice of collections with data for a given directory subtree in
kopia. Traverse the full directory before creating a DataCollection
instead of sending items as they are found because future
implementations may cause blocking on send. This could reduce
parallelism because the code won't be able to find other directories to
traverse until the files are seen. Kopia also currently loads the entire
directory at once so there's not much benefit to streaming.

System will now continuing pulling data until completion and report all
errors at the end of the run.

* Tests for getting persisted subtree data including some error cases
This commit is contained in:
ashmrtn 2022-07-11 11:22:28 -07:00 committed by GitHub
parent b9652c66e3
commit 2415addd05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 414 additions and 18 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"path"
"github.com/hashicorp/go-multierror"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/kopia/kopia/repo"
@ -15,6 +16,7 @@ import (
"github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/pkg/backup"
"github.com/alcionai/corso/pkg/logger"
)
const (
@ -340,14 +342,23 @@ func (w Wrapper) collectItems(
// The paths passed below is the path up to (but not including) the
// file/directory passed.
if isDirectory {
return nil, errors.New("directory restore not implemented")
dir, ok := e.(fs.Directory)
if !ok {
return nil, errors.New("requested object is not a directory")
}
c, err := restoreSubtree(ctx, dir, itemPath[:len(itemPath)-1])
// For some reason tests error out if the multierror is nil but we don't
// call ErrorOrNil.
return c, err.ErrorOrNil()
}
f, ok := e.(fs.File)
if !ok {
return nil, errors.New("requested object is not a file")
}
c, err := w.restoreSingleItem(ctx, f, itemPath[:len(itemPath)-1])
c, err := restoreSingleItem(ctx, f, itemPath[:len(itemPath)-1])
if err != nil {
return nil, err
}
@ -382,7 +393,7 @@ func (w Wrapper) RestoreSingleItem(
// does not exist in kopia or is not a file an error is returned. The UUID of
// the returned DataStreams will be the name of the kopia file the data is
// sourced from.
func (w Wrapper) restoreSingleItem(
func restoreSingleItem(
ctx context.Context,
f fs.File,
itemPath []string,
@ -402,3 +413,120 @@ func (w Wrapper) restoreSingleItem(
path: itemPath,
}, nil
}
func walkDirectory(
ctx context.Context,
dir fs.Directory,
) ([]fs.File, []fs.Directory, *multierror.Error) {
files := []fs.File{}
dirs := []fs.Directory{}
var errs *multierror.Error
err := dir.IterateEntries(ctx, func(innerCtx context.Context, e fs.Entry) error {
// Early exit on context cancel.
if err := innerCtx.Err(); err != nil {
return err
}
switch e.(type) {
case fs.Directory:
d := e.(fs.Directory)
dirs = append(dirs, d)
case fs.File:
f := e.(fs.File)
files = append(files, f)
default:
errs = multierror.Append(errs, errors.Errorf("unexpected item type %T", e))
logger.Ctx(ctx).Warnf("unexpected item of type %T; skipping", e)
}
return nil
})
if err != nil {
// If the iterator itself had an error add it to the list.
errs = multierror.Append(errs, errors.Wrap(err, "getting directory data"))
}
return files, dirs, errs
}
// restoreSubtree returns DataCollections for each subdirectory (or the
// directory itself) that contains files. The FullPath of each returned
// DataCollection is the path from the root of the kopia directory structure to
// the directory. The UUID of each DataStream in each DataCollection is the name
// of the kopia file the data is sourced from.
func restoreSubtree(
ctx context.Context,
dir fs.Directory,
relativePath []string,
) ([]connector.DataCollection, *multierror.Error) {
collections := []connector.DataCollection{}
// Want a local copy of relativePath with our new element.
fullPath := append(append([]string{}, relativePath...), dir.Name())
var errs *multierror.Error
files, dirs, err := walkDirectory(ctx, dir)
if err != nil {
errs = multierror.Append(
errs, errors.Wrapf(err, "walking directory %q", path.Join(fullPath...)))
}
if len(files) > 0 {
if ctxErr := ctx.Err(); ctxErr != nil {
errs = multierror.Append(errs, errors.WithStack(ctxErr))
return nil, errs
}
streams := make([]connector.DataStream, 0, len(files))
for _, f := range files {
r, err := f.Open(ctx)
if err != nil {
fileFullPath := path.Join(append(append([]string{}, fullPath...), f.Name())...)
errs = multierror.Append(
errs, errors.Wrapf(err, "getting reader for file %q", fileFullPath))
logger.Ctx(ctx).Warnf("skipping file %q", fileFullPath)
continue
}
streams = append(streams, &kopiaDataStream{
reader: r,
uuid: f.Name(),
})
}
collections = append(collections, &kopiaDataCollection{
streams: streams,
path: fullPath,
})
}
for _, d := range dirs {
if ctxErr := ctx.Err(); ctxErr != nil {
errs = multierror.Append(errs, errors.WithStack(ctxErr))
return nil, errs
}
c, err := restoreSubtree(ctx, d, fullPath)
if err != nil {
errs = multierror.Append(errs, errors.Wrapf(
err,
"traversing subdirectory %q",
path.Join(append(append([]string{}, fullPath...), d.Name())...),
))
}
collections = append(collections, c...)
}
return collections, errs
}
func (w Wrapper) RestoreDirectory(
ctx context.Context,
snapshotID string,
basePath []string,
) ([]connector.DataCollection, error) {
return w.collectItems(ctx, snapshotID, basePath, true)
}

View File

@ -5,9 +5,11 @@ import (
"context"
"io"
"io/ioutil"
"path"
"testing"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/kopia/kopia/repo/manifest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -15,6 +17,7 @@ import (
"github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/connector/mockconnector"
"github.com/alcionai/corso/internal/kopia/mockkopia"
ctesting "github.com/alcionai/corso/internal/testing"
"github.com/alcionai/corso/pkg/backup"
)
@ -23,12 +26,27 @@ const (
testTenant = "a-tenant"
testUser = "user1"
testEmailDir = "mail"
testFileUUID = "a-file"
testInboxDir = "inbox"
testArchiveDir = "archive"
testFileName = "file1"
testFileName2 = "file2"
testFileName3 = "file3"
testFileName4 = "file4"
testFileName5 = "file5"
testFileName6 = "file6"
)
var (
testPath = []string{testTenant, testUser, testEmailDir}
testPath = []string{testTenant, testUser, testEmailDir, testInboxDir}
testPath2 = []string{testTenant, testUser, testEmailDir, testArchiveDir}
testFileData = []byte("abcdefghijklmnopqrstuvwxyz")
testFileData2 = []byte("zyxwvutsrqponmlkjihgfedcba")
testFileData3 = []byte("foo")
testFileData4 = []byte("bar")
testFileData5 = []byte("baz")
// Intentional duplicate to make sure all files are scanned during recovery
// (contrast to behavior of snapshotfs.TreeWalker).
testFileData6 = testFileData
)
func entriesToNames(entries []fs.Entry) []string {
@ -41,6 +59,36 @@ func entriesToNames(entries []fs.Entry) []string {
return res
}
func testForFiles(
t *testing.T,
expected map[string][]byte,
collections []connector.DataCollection,
) {
count := 0
for _, c := range collections {
for s := range c.Items() {
count++
fullPath := path.Join(append(c.FullPath(), s.UUID())...)
expected, ok := expected[fullPath]
require.True(
t,
ok,
"unexpected file with path %q",
path.Join(append(c.FullPath(), fullPath)...),
)
buf, err := ioutil.ReadAll(s.ToReader())
require.NoError(t, err)
assert.Equal(t, expected, buf)
}
}
assert.Equal(t, len(expected), count)
}
// ---------------
// unit tests
// ---------------
@ -222,6 +270,102 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() {
}
}
func (suite *KopiaUnitSuite) TestRestoreItem() {
ctx := context.Background()
file := &mockkopia.MockFile{
Entry: &mockkopia.MockEntry{
EntryName: testFileName2,
EntryMode: mockkopia.DefaultPermissions,
},
OpenErr: assert.AnError,
}
_, err := restoreSingleItem(ctx, file, nil)
assert.Error(suite.T(), err)
}
func (suite *KopiaUnitSuite) TestRestoreDirectory_FailGettingReader() {
ctx := context.Background()
t := suite.T()
expectedStreamData := map[string][]byte{
path.Join(testInboxDir, testFileName): testFileData,
path.Join(testInboxDir, testFileName3): testFileData3,
}
dirs := virtualfs.NewStaticDirectory(testInboxDir, []fs.Entry{
&mockkopia.MockFile{
Entry: &mockkopia.MockEntry{
EntryName: testFileName,
EntryMode: mockkopia.DefaultPermissions,
},
Data: testFileData,
},
&mockkopia.MockFile{
Entry: &mockkopia.MockEntry{
EntryName: testFileName2,
EntryMode: mockkopia.DefaultPermissions,
},
OpenErr: assert.AnError,
},
&mockkopia.MockFile{
Entry: &mockkopia.MockEntry{
EntryName: testFileName3,
EntryMode: mockkopia.DefaultPermissions,
},
Data: testFileData3,
},
})
collections, err := restoreSubtree(ctx, dirs, nil)
assert.Error(t, err)
assert.Len(t, collections, 1)
testForFiles(t, expectedStreamData, collections)
}
func (suite *KopiaUnitSuite) TestRestoreDirectory_FailWrongItemType() {
ctx := context.Background()
t := suite.T()
expectedStreamData := map[string][]byte{
path.Join(testEmailDir, testInboxDir, testFileName): testFileData,
path.Join(testEmailDir, testArchiveDir, testFileName3): testFileData3,
}
dirs := virtualfs.NewStaticDirectory(testEmailDir, []fs.Entry{
virtualfs.NewStaticDirectory(testInboxDir, []fs.Entry{
&mockkopia.MockFile{
Entry: &mockkopia.MockEntry{
EntryName: testFileName,
EntryMode: mockkopia.DefaultPermissions,
},
Data: testFileData,
},
}),
virtualfs.NewStaticDirectory("foo", []fs.Entry{
virtualfs.StreamingFileFromReader(
testFileName2, bytes.NewReader(testFileData2)),
}),
virtualfs.NewStaticDirectory(testArchiveDir, []fs.Entry{
&mockkopia.MockFile{
Entry: &mockkopia.MockEntry{
EntryName: testFileName3,
EntryMode: mockkopia.DefaultPermissions,
},
Data: testFileData3,
},
}),
})
collections, err := restoreSubtree(ctx, dirs, nil)
assert.Error(t, err)
assert.Len(t, collections, 2)
testForFiles(t, expectedStreamData, collections)
}
// ---------------
// integration tests that use kopia
// ---------------
@ -287,6 +431,9 @@ type KopiaSimpleRepoIntegrationSuite struct {
w *Wrapper
ctx context.Context
snapshotID manifest.ID
inboxExpectedFiles map[string][]byte
archiveExpectedFiles map[string][]byte
allExpectedFiles map[string][]byte
}
func TestKopiaSimpleRepoIntegrationSuite(t *testing.T) {
@ -318,9 +465,34 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
path: testPath,
streams: []connector.DataStream{
&mockconnector.MockExchangeData{
ID: testFileUUID,
ID: testFileName,
Reader: io.NopCloser(bytes.NewReader(testFileData)),
},
&mockconnector.MockExchangeData{
ID: testFileName2,
Reader: io.NopCloser(bytes.NewReader(testFileData2)),
},
},
},
&kopiaDataCollection{
path: testPath2,
streams: []connector.DataStream{
&mockconnector.MockExchangeData{
ID: testFileName3,
Reader: io.NopCloser(bytes.NewReader(testFileData3)),
},
&mockconnector.MockExchangeData{
ID: testFileName4,
Reader: io.NopCloser(bytes.NewReader(testFileData4)),
},
&mockconnector.MockExchangeData{
ID: testFileName5,
Reader: io.NopCloser(bytes.NewReader(testFileData5)),
},
&mockconnector.MockExchangeData{
ID: testFileName6,
Reader: io.NopCloser(bytes.NewReader(testFileData6)),
},
},
},
}
@ -328,13 +500,33 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
stats, rp, err := suite.w.BackupCollections(suite.ctx, collections)
require.NoError(t, err)
require.Equal(t, stats.ErrorCount, 0)
require.Equal(t, stats.TotalFileCount, 1)
require.Equal(t, stats.TotalDirectoryCount, 3)
require.Equal(t, stats.TotalFileCount, 6)
require.Equal(t, stats.TotalDirectoryCount, 5)
require.Equal(t, stats.IgnoredErrorCount, 0)
require.False(t, stats.Incomplete)
assert.Len(t, rp.Entries, 1)
assert.Len(t, rp.Entries, 6)
suite.snapshotID = manifest.ID(stats.SnapshotID)
// path.Join doesn't like (testPath..., testFileName).
suite.inboxExpectedFiles = map[string][]byte{
path.Join(append(testPath, testFileName)...): testFileData,
path.Join(append(testPath, testFileName2)...): testFileData2,
}
suite.archiveExpectedFiles = map[string][]byte{
path.Join(append(testPath2, testFileName3)...): testFileData3,
path.Join(append(testPath2, testFileName4)...): testFileData4,
path.Join(append(testPath2, testFileName5)...): testFileData5,
path.Join(append(testPath2, testFileName6)...): testFileData6,
}
suite.allExpectedFiles = map[string][]byte{}
for k, v := range suite.inboxExpectedFiles {
suite.allExpectedFiles[k] = v
}
for k, v := range suite.archiveExpectedFiles {
suite.allExpectedFiles[k] = v
}
}
func (suite *KopiaSimpleRepoIntegrationSuite) TearDownTest() {
@ -347,7 +539,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem() {
c, err := suite.w.RestoreSingleItem(
suite.ctx,
string(suite.snapshotID),
append(testPath, testFileUUID),
append(testPath, testFileName),
)
require.NoError(t, err)
@ -381,7 +573,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem_Err
{
"NoSnapshot",
"foo",
append(testPath, testFileUUID),
append(testPath, testFileName),
},
{
"TargetNotAFile",
@ -406,3 +598,79 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem_Err
})
}
}
func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupRestoreDirectory() {
table := []struct {
name string
dirPath []string
expectedFiles map[string][]byte
}{
{
"RecoverUser",
[]string{testTenant, testUser},
suite.allExpectedFiles,
},
{
"RecoverMail",
[]string{testTenant, testUser, testEmailDir},
suite.allExpectedFiles,
},
{
"RecoverInbox",
[]string{testTenant, testUser, testEmailDir, testInboxDir},
suite.inboxExpectedFiles,
},
{
"RecoverArchive",
[]string{testTenant, testUser, testEmailDir, testArchiveDir},
suite.archiveExpectedFiles,
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
collections, err := suite.w.RestoreDirectory(
suite.ctx, string(suite.snapshotID), test.dirPath)
require.NoError(t, err)
testForFiles(t, test.expectedFiles, collections)
})
}
}
func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupRestoreDirectory_Errors() {
table := []struct {
name string
snapshotID string
dirPath []string
}{
{
"EmptyPath",
string(suite.snapshotID),
[]string{},
},
{
"BadSnapshotID",
"foo",
[]string{testTenant, testUser, testEmailDir},
},
{
"NotADirectory",
string(suite.snapshotID),
append(testPath, testFileName),
},
{
"NonExistantDirectory",
string(suite.snapshotID),
[]string{testTenant, testUser, testEmailDir, "subdir"},
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
_, err := suite.w.RestoreDirectory(
suite.ctx, test.snapshotID, test.dirPath)
assert.Error(t, err)
})
}
}