kopia code to translate single item to DataCollection (#216)
* DataCollection and DataStream structs for kopia * Helper function to get a single item of data Returns a DataCollection with a single item. * Test for backing up and restoring a single item
This commit is contained in:
parent
0e261fb96a
commit
622e8eab95
44
src/internal/kopia/data_collection.go
Normal file
44
src/internal/kopia/data_collection.go
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
package kopia
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/internal/connector"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ connector.DataCollection = &singleItemCollection{}
|
||||||
|
var _ connector.DataStream = &kopiaDataStream{}
|
||||||
|
|
||||||
|
// singleItemCollection implements DataCollection but only returns a single
|
||||||
|
// DataStream. It is not safe for concurrent use.
|
||||||
|
type singleItemCollection struct {
|
||||||
|
path []string
|
||||||
|
stream connector.DataStream
|
||||||
|
used bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sic *singleItemCollection) NextItem() (connector.DataStream, error) {
|
||||||
|
if sic.used {
|
||||||
|
return nil, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
sic.used = true
|
||||||
|
return sic.stream, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sic singleItemCollection) FullPath() []string {
|
||||||
|
return append([]string{}, sic.path...)
|
||||||
|
}
|
||||||
|
|
||||||
|
type kopiaDataStream struct {
|
||||||
|
reader io.ReadCloser
|
||||||
|
uuid string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kds kopiaDataStream) ToReader() io.ReadCloser {
|
||||||
|
return kds.reader
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kds kopiaDataStream) UUID() string {
|
||||||
|
return kds.uuid
|
||||||
|
}
|
||||||
64
src/internal/kopia/data_collection_test.go
Normal file
64
src/internal/kopia/data_collection_test.go
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
package kopia
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ---------------
|
||||||
|
// unit tests
|
||||||
|
// ---------------
|
||||||
|
type SingleItemCollectionUnitSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSingleItemCollectionUnitSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(SingleItemCollectionUnitSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *SingleItemCollectionUnitSuite) TestReturnsPath() {
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
|
path := []string{"some", "path", "for", "data"}
|
||||||
|
|
||||||
|
c := singleItemCollection{
|
||||||
|
stream: kopiaDataStream{},
|
||||||
|
path: path,
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, c.FullPath(), path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *SingleItemCollectionUnitSuite) TestReturnsOnlyOneItem() {
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
|
data := []byte("abcdefghijklmnopqrstuvwxyz")
|
||||||
|
uuid := "a-file"
|
||||||
|
stream := &kopiaDataStream{
|
||||||
|
reader: io.NopCloser(bytes.NewReader(data)),
|
||||||
|
uuid: uuid,
|
||||||
|
}
|
||||||
|
|
||||||
|
c := singleItemCollection{
|
||||||
|
stream: stream,
|
||||||
|
path: []string{},
|
||||||
|
}
|
||||||
|
|
||||||
|
returnedStream, err := c.NextItem()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, returnedStream.UUID(), uuid)
|
||||||
|
|
||||||
|
_, err = c.NextItem()
|
||||||
|
assert.ErrorIs(t, err, io.EOF)
|
||||||
|
|
||||||
|
buf, err := ioutil.ReadAll(returnedStream.ToReader())
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, buf, data)
|
||||||
|
}
|
||||||
@ -362,3 +362,42 @@ func (kw KopiaWrapper) makeSnapshotWithRoot(
|
|||||||
res := manifestToStats(man)
|
res := manifestToStats(man)
|
||||||
return &res, nil
|
return &res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// restoreSingleItem looks up the item at the given path starting from rootDir
|
||||||
|
// where rootDir is the root of a snapshot. If the item is a file in kopia then
|
||||||
|
// it returns a DataCollection with the item as its sole element and
|
||||||
|
// DataCollection.FullPath() set to split(dirname(itemPath), "/"). If the item
|
||||||
|
// does not exist in kopia or is not a file an error is returned. The UUID of
|
||||||
|
// the returned DataStreams will be the name of the kopia file the data is
|
||||||
|
// sourced from.
|
||||||
|
func (kw KopiaWrapper) restoreSingleItem(
|
||||||
|
ctx context.Context,
|
||||||
|
rootDir fs.Entry,
|
||||||
|
itemPath []string,
|
||||||
|
) (connector.DataCollection, error) {
|
||||||
|
e, err := snapshotfs.GetNestedEntry(ctx, rootDir, itemPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "getting object handle")
|
||||||
|
}
|
||||||
|
|
||||||
|
f, ok := e.(fs.File)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("not a file")
|
||||||
|
}
|
||||||
|
|
||||||
|
r, err := f.Open(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "opening file")
|
||||||
|
}
|
||||||
|
|
||||||
|
pathWithRoot := []string{rootDir.Name()}
|
||||||
|
pathWithRoot = append(pathWithRoot, itemPath[:len(itemPath)-1]...)
|
||||||
|
|
||||||
|
return &singleItemCollection{
|
||||||
|
stream: kopiaDataStream{
|
||||||
|
uuid: itemPath[len(itemPath)-1],
|
||||||
|
reader: r,
|
||||||
|
},
|
||||||
|
path: pathWithRoot,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|||||||
@ -1,10 +1,18 @@
|
|||||||
package kopia
|
package kopia
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/kopia/kopia/fs"
|
"github.com/kopia/kopia/fs"
|
||||||
|
"github.com/kopia/kopia/fs/virtualfs"
|
||||||
|
"github.com/kopia/kopia/repo"
|
||||||
|
"github.com/kopia/kopia/snapshot"
|
||||||
|
"github.com/kopia/kopia/snapshot/snapshotfs"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
@ -14,6 +22,18 @@ import (
|
|||||||
ctesting "github.com/alcionai/corso/internal/testing"
|
ctesting "github.com/alcionai/corso/internal/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
testTenant = "a-tenant"
|
||||||
|
testUser = "user1"
|
||||||
|
testEmailDir = "mail"
|
||||||
|
testFileUUID = "a-file"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
testPath = []string{testTenant, testUser, testEmailDir}
|
||||||
|
testFileData = []byte("abcdefghijklmnopqrstuvwxyz")
|
||||||
|
)
|
||||||
|
|
||||||
func openKopiaRepo(ctx context.Context, prefix string) (*KopiaWrapper, error) {
|
func openKopiaRepo(ctx context.Context, prefix string) (*KopiaWrapper, error) {
|
||||||
storage, err := ctesting.NewS3Storage(prefix)
|
storage, err := ctesting.NewS3Storage(prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -274,3 +294,140 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
|
|||||||
assert.Equal(suite.T(), stats.ErrorCount, 0)
|
assert.Equal(suite.T(), stats.ErrorCount, 0)
|
||||||
assert.False(suite.T(), stats.Incomplete)
|
assert.False(suite.T(), stats.Incomplete)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(ashmrtn): Update this once we have a helper for getting the snapshot
|
||||||
|
// root.
|
||||||
|
func getSnapshotRoot(
|
||||||
|
t *testing.T,
|
||||||
|
ctx context.Context,
|
||||||
|
rep repo.Repository,
|
||||||
|
rootName string,
|
||||||
|
) fs.Entry {
|
||||||
|
si := snapshot.SourceInfo{
|
||||||
|
Host: kTestHost,
|
||||||
|
UserName: kTestUser,
|
||||||
|
Path: rootName,
|
||||||
|
}
|
||||||
|
|
||||||
|
manifests, err := snapshot.ListSnapshots(ctx, rep, si)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, manifests, 1)
|
||||||
|
|
||||||
|
rootDirEntry, err := snapshotfs.SnapshotRoot(rep, manifests[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
rootDir, ok := rootDirEntry.(fs.Directory)
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
return rootDir
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupSimpleRepo(t *testing.T, ctx context.Context, k *KopiaWrapper) {
|
||||||
|
collections := []connector.DataCollection{
|
||||||
|
&singleItemCollection{
|
||||||
|
path: testPath,
|
||||||
|
stream: &kopiaDataStream{
|
||||||
|
uuid: testFileUUID,
|
||||||
|
reader: io.NopCloser(bytes.NewReader(testFileData)),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
stats, err := k.BackupCollections(ctx, collections)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, stats.TotalFileCount, 1)
|
||||||
|
require.Equal(t, stats.TotalDirectoryCount, 3)
|
||||||
|
require.Equal(t, stats.IgnoredErrorCount, 0)
|
||||||
|
require.Equal(t, stats.ErrorCount, 0)
|
||||||
|
require.False(t, stats.Incomplete)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *KopiaIntegrationSuite) TestBackupAndRestoreSingleItem() {
|
||||||
|
ctx := context.Background()
|
||||||
|
timeOfTest := ctesting.LogTimeOfTest(suite.T())
|
||||||
|
t := suite.T()
|
||||||
|
|
||||||
|
k, err := openKopiaRepo(ctx, "backup-restore-single-item-"+timeOfTest)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
assert.NoError(t, k.Close(ctx))
|
||||||
|
}()
|
||||||
|
|
||||||
|
setupSimpleRepo(t, ctx, k)
|
||||||
|
|
||||||
|
rootDir := getSnapshotRoot(t, ctx, k.rep, testTenant)
|
||||||
|
|
||||||
|
c, err := k.restoreSingleItem(ctx, rootDir, append(testPath[1:], testFileUUID))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, c.FullPath(), testPath)
|
||||||
|
|
||||||
|
resultStream, err := c.NextItem()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = c.NextItem()
|
||||||
|
assert.ErrorIs(t, err, io.EOF)
|
||||||
|
|
||||||
|
buf, err := ioutil.ReadAll(resultStream.ToReader())
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, buf, testFileData)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *KopiaIntegrationSuite) TestBackupAndRestoreSingleItem_Errors() {
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
rootDirFunc func(*testing.T, context.Context, *KopiaWrapper) fs.Entry
|
||||||
|
path []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"FileAsRoot",
|
||||||
|
func(t *testing.T, ctx context.Context, k *KopiaWrapper) fs.Entry {
|
||||||
|
return virtualfs.StreamingFileFromReader(testFileUUID, bytes.NewReader(testFileData))
|
||||||
|
},
|
||||||
|
append(testPath[1:], testFileUUID),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"NoRootDir",
|
||||||
|
func(t *testing.T, ctx context.Context, k *KopiaWrapper) fs.Entry {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
append(testPath[1:], testFileUUID),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"TargetNotAFile",
|
||||||
|
func(t *testing.T, ctx context.Context, k *KopiaWrapper) fs.Entry {
|
||||||
|
return getSnapshotRoot(t, ctx, k.rep, testPath[0])
|
||||||
|
},
|
||||||
|
[]string{testPath[1]},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"NonExistentFile",
|
||||||
|
func(t *testing.T, ctx context.Context, k *KopiaWrapper) fs.Entry {
|
||||||
|
return getSnapshotRoot(t, ctx, k.rep, testPath[0])
|
||||||
|
},
|
||||||
|
append(testPath[1:], "foo"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range table {
|
||||||
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
timeOfTest := ctesting.LogTimeOfTest(t)
|
||||||
|
|
||||||
|
k, err := openKopiaRepo(ctx, "backup-restore-single-item-error-"+test.name+"-"+timeOfTest)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
assert.NoError(t, k.Close(ctx))
|
||||||
|
}()
|
||||||
|
|
||||||
|
setupSimpleRepo(t, ctx, k)
|
||||||
|
|
||||||
|
_, err = k.restoreSingleItem(
|
||||||
|
ctx,
|
||||||
|
test.rootDirFunc(t, ctx, k),
|
||||||
|
test.path,
|
||||||
|
)
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user