diff --git a/src/internal/kopia/data_collection.go b/src/internal/kopia/data_collection.go new file mode 100644 index 000000000..a682a93af --- /dev/null +++ b/src/internal/kopia/data_collection.go @@ -0,0 +1,44 @@ +package kopia + +import ( + "io" + + "github.com/alcionai/corso/internal/connector" +) + +var _ connector.DataCollection = &singleItemCollection{} +var _ connector.DataStream = &kopiaDataStream{} + +// singleItemCollection implements DataCollection but only returns a single +// DataStream. It is not safe for concurrent use. +type singleItemCollection struct { + path []string + stream connector.DataStream + used bool +} + +func (sic *singleItemCollection) NextItem() (connector.DataStream, error) { + if sic.used { + return nil, io.EOF + } + + sic.used = true + return sic.stream, nil +} + +func (sic singleItemCollection) FullPath() []string { + return append([]string{}, sic.path...) +} + +type kopiaDataStream struct { + reader io.ReadCloser + uuid string +} + +func (kds kopiaDataStream) ToReader() io.ReadCloser { + return kds.reader +} + +func (kds kopiaDataStream) UUID() string { + return kds.uuid +} diff --git a/src/internal/kopia/data_collection_test.go b/src/internal/kopia/data_collection_test.go new file mode 100644 index 000000000..ce112df74 --- /dev/null +++ b/src/internal/kopia/data_collection_test.go @@ -0,0 +1,64 @@ +package kopia + +import ( + "bytes" + "io" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +// --------------- +// unit tests +// --------------- +type SingleItemCollectionUnitSuite struct { + suite.Suite +} + +func TestSingleItemCollectionUnitSuite(t *testing.T) { + suite.Run(t, new(SingleItemCollectionUnitSuite)) +} + +func (suite *SingleItemCollectionUnitSuite) TestReturnsPath() { + t := suite.T() + + path := []string{"some", "path", "for", "data"} + + c := singleItemCollection{ + stream: kopiaDataStream{}, + path: path, + } + + assert.Equal(t, c.FullPath(), path) +} + +func (suite *SingleItemCollectionUnitSuite) TestReturnsOnlyOneItem() { + t := suite.T() + + data := []byte("abcdefghijklmnopqrstuvwxyz") + uuid := "a-file" + stream := &kopiaDataStream{ + reader: io.NopCloser(bytes.NewReader(data)), + uuid: uuid, + } + + c := singleItemCollection{ + stream: stream, + path: []string{}, + } + + returnedStream, err := c.NextItem() + require.NoError(t, err) + + assert.Equal(t, returnedStream.UUID(), uuid) + + _, err = c.NextItem() + assert.ErrorIs(t, err, io.EOF) + + buf, err := ioutil.ReadAll(returnedStream.ToReader()) + require.NoError(t, err) + assert.Equal(t, buf, data) +} diff --git a/src/internal/kopia/kopia.go b/src/internal/kopia/kopia.go index 1fa88b313..e11a2ec87 100644 --- a/src/internal/kopia/kopia.go +++ b/src/internal/kopia/kopia.go @@ -362,3 +362,42 @@ func (kw KopiaWrapper) makeSnapshotWithRoot( res := manifestToStats(man) return &res, nil } + +// restoreSingleItem looks up the item at the given path starting from rootDir +// where rootDir is the root of a snapshot. If the item is a file in kopia then +// it returns a DataCollection with the item as its sole element and +// DataCollection.FullPath() set to split(dirname(itemPath), "/"). If the item +// does not exist in kopia or is not a file an error is returned. The UUID of +// the returned DataStreams will be the name of the kopia file the data is +// sourced from. +func (kw KopiaWrapper) restoreSingleItem( + ctx context.Context, + rootDir fs.Entry, + itemPath []string, +) (connector.DataCollection, error) { + e, err := snapshotfs.GetNestedEntry(ctx, rootDir, itemPath) + if err != nil { + return nil, errors.Wrap(err, "getting object handle") + } + + f, ok := e.(fs.File) + if !ok { + return nil, errors.New("not a file") + } + + r, err := f.Open(ctx) + if err != nil { + return nil, errors.Wrap(err, "opening file") + } + + pathWithRoot := []string{rootDir.Name()} + pathWithRoot = append(pathWithRoot, itemPath[:len(itemPath)-1]...) + + return &singleItemCollection{ + stream: kopiaDataStream{ + uuid: itemPath[len(itemPath)-1], + reader: r, + }, + path: pathWithRoot, + }, nil +} diff --git a/src/internal/kopia/kopia_test.go b/src/internal/kopia/kopia_test.go index fcf69d3bc..5aea48903 100644 --- a/src/internal/kopia/kopia_test.go +++ b/src/internal/kopia/kopia_test.go @@ -1,10 +1,18 @@ package kopia import ( + "bytes" "context" + "io" + "io/ioutil" "testing" "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/fs/virtualfs" + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/snapshot" + "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -14,6 +22,18 @@ import ( ctesting "github.com/alcionai/corso/internal/testing" ) +const ( + testTenant = "a-tenant" + testUser = "user1" + testEmailDir = "mail" + testFileUUID = "a-file" +) + +var ( + testPath = []string{testTenant, testUser, testEmailDir} + testFileData = []byte("abcdefghijklmnopqrstuvwxyz") +) + func openKopiaRepo(ctx context.Context, prefix string) (*KopiaWrapper, error) { storage, err := ctesting.NewS3Storage(prefix) if err != nil { @@ -274,3 +294,140 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { assert.Equal(suite.T(), stats.ErrorCount, 0) assert.False(suite.T(), stats.Incomplete) } + +// TODO(ashmrtn): Update this once we have a helper for getting the snapshot +// root. +func getSnapshotRoot( + t *testing.T, + ctx context.Context, + rep repo.Repository, + rootName string, +) fs.Entry { + si := snapshot.SourceInfo{ + Host: kTestHost, + UserName: kTestUser, + Path: rootName, + } + + manifests, err := snapshot.ListSnapshots(ctx, rep, si) + require.NoError(t, err) + require.Len(t, manifests, 1) + + rootDirEntry, err := snapshotfs.SnapshotRoot(rep, manifests[0]) + require.NoError(t, err) + + rootDir, ok := rootDirEntry.(fs.Directory) + require.True(t, ok) + + return rootDir +} + +func setupSimpleRepo(t *testing.T, ctx context.Context, k *KopiaWrapper) { + collections := []connector.DataCollection{ + &singleItemCollection{ + path: testPath, + stream: &kopiaDataStream{ + uuid: testFileUUID, + reader: io.NopCloser(bytes.NewReader(testFileData)), + }, + }, + } + + stats, err := k.BackupCollections(ctx, collections) + require.NoError(t, err) + require.Equal(t, stats.TotalFileCount, 1) + require.Equal(t, stats.TotalDirectoryCount, 3) + require.Equal(t, stats.IgnoredErrorCount, 0) + require.Equal(t, stats.ErrorCount, 0) + require.False(t, stats.Incomplete) +} + +func (suite *KopiaIntegrationSuite) TestBackupAndRestoreSingleItem() { + ctx := context.Background() + timeOfTest := ctesting.LogTimeOfTest(suite.T()) + t := suite.T() + + k, err := openKopiaRepo(ctx, "backup-restore-single-item-"+timeOfTest) + require.NoError(t, err) + defer func() { + assert.NoError(t, k.Close(ctx)) + }() + + setupSimpleRepo(t, ctx, k) + + rootDir := getSnapshotRoot(t, ctx, k.rep, testTenant) + + c, err := k.restoreSingleItem(ctx, rootDir, append(testPath[1:], testFileUUID)) + require.NoError(t, err) + + assert.Equal(t, c.FullPath(), testPath) + + resultStream, err := c.NextItem() + require.NoError(t, err) + + _, err = c.NextItem() + assert.ErrorIs(t, err, io.EOF) + + buf, err := ioutil.ReadAll(resultStream.ToReader()) + require.NoError(t, err) + assert.Equal(t, buf, testFileData) +} + +func (suite *KopiaIntegrationSuite) TestBackupAndRestoreSingleItem_Errors() { + table := []struct { + name string + rootDirFunc func(*testing.T, context.Context, *KopiaWrapper) fs.Entry + path []string + }{ + { + "FileAsRoot", + func(t *testing.T, ctx context.Context, k *KopiaWrapper) fs.Entry { + return virtualfs.StreamingFileFromReader(testFileUUID, bytes.NewReader(testFileData)) + }, + append(testPath[1:], testFileUUID), + }, + { + "NoRootDir", + func(t *testing.T, ctx context.Context, k *KopiaWrapper) fs.Entry { + return nil + }, + append(testPath[1:], testFileUUID), + }, + { + "TargetNotAFile", + func(t *testing.T, ctx context.Context, k *KopiaWrapper) fs.Entry { + return getSnapshotRoot(t, ctx, k.rep, testPath[0]) + }, + []string{testPath[1]}, + }, + { + "NonExistentFile", + func(t *testing.T, ctx context.Context, k *KopiaWrapper) fs.Entry { + return getSnapshotRoot(t, ctx, k.rep, testPath[0]) + }, + append(testPath[1:], "foo"), + }, + } + + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + ctx := context.Background() + timeOfTest := ctesting.LogTimeOfTest(t) + + k, err := openKopiaRepo(ctx, "backup-restore-single-item-error-"+test.name+"-"+timeOfTest) + require.NoError(t, err) + defer func() { + assert.NoError(t, k.Close(ctx)) + }() + + setupSimpleRepo(t, ctx, k) + + _, err = k.restoreSingleItem( + ctx, + test.rootDirFunc(t, ctx, k), + test.path, + ) + require.Error(t, err) + }) + } +}