Add remainder of BackupCollections code (#170)
Fill in BackupCollection logic for kopia dirs and test "Rehydrate" the given DataCollections into a hierarchy of kopia virtualfs directories, allowing kopia to make a single snapshot of them. Directory structure is extracted from the information returned by DataCollection.FullPath().
This commit is contained in:
parent
875e3cb517
commit
d00f9c856f
@ -2,6 +2,7 @@ package kopia
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
"github.com/kopia/kopia/fs"
|
"github.com/kopia/kopia/fs"
|
||||||
"github.com/kopia/kopia/fs/virtualfs"
|
"github.com/kopia/kopia/fs/virtualfs"
|
||||||
@ -26,9 +27,10 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errInit = errors.New("initializing repo")
|
errInit = errors.New("initializing repo")
|
||||||
errConnect = errors.New("connecting repo")
|
errConnect = errors.New("connecting repo")
|
||||||
errNotConnected = errors.New("not connected to repo")
|
errNotConnected = errors.New("not connected to repo")
|
||||||
|
errUnsupportedDir = errors.New("unsupported static children in streaming directory")
|
||||||
)
|
)
|
||||||
|
|
||||||
type BackupStats struct {
|
type BackupStats struct {
|
||||||
@ -160,9 +162,145 @@ func (kw *KopiaWrapper) open(ctx context.Context, password string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getStreamItemFunc returns a function that can be used by kopia's
|
||||||
|
// virtualfs.StreamingDirectory to iterate through directory entries and call
|
||||||
|
// kopia callbacks on directory entries. It binds the directory to the given
|
||||||
|
// DataCollection.
|
||||||
|
func getStreamItemFunc(
|
||||||
|
collection connector.DataCollection,
|
||||||
|
) func(context.Context, func(context.Context, fs.Entry) error) error {
|
||||||
|
return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
|
||||||
|
for {
|
||||||
|
e, err := collection.NextItem()
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.Wrap(err, "materializing directory entry")
|
||||||
|
}
|
||||||
|
|
||||||
|
entry := virtualfs.StreamingFileFromReader(e.UUID(), e.ToReader())
|
||||||
|
if err = cb(ctx, entry); err != nil {
|
||||||
|
return errors.Wrap(err, "executing callback")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildKopiaDirs recursively builds a directory hierarchy from the roots up.
|
||||||
|
// Returned directories are either virtualfs.StreamingDirectory or
|
||||||
|
// virtualfs.staticDirectory.
|
||||||
|
func buildKopiaDirs(dirName string, dir *treeMap) (fs.Directory, error) {
|
||||||
|
// Don't support directories that have both a DataCollection and a set of
|
||||||
|
// static child directories.
|
||||||
|
if dir.collection != nil && len(dir.childDirs) > 0 {
|
||||||
|
return nil, errors.New(errUnsupportedDir.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if dir.collection != nil {
|
||||||
|
return virtualfs.NewStreamingDirectory(dirName, getStreamItemFunc(dir.collection)), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Need to build the directory tree from the leaves up because intermediate
|
||||||
|
// directories need to have all their entries at creation time.
|
||||||
|
childDirs := []fs.Entry{}
|
||||||
|
|
||||||
|
for childName, childDir := range dir.childDirs {
|
||||||
|
child, err := buildKopiaDirs(childName, childDir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
childDirs = append(childDirs, child)
|
||||||
|
}
|
||||||
|
|
||||||
|
return virtualfs.NewStaticDirectory(dirName, childDirs), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type treeMap struct {
|
||||||
|
childDirs map[string]*treeMap
|
||||||
|
collection connector.DataCollection
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTreeMap() *treeMap {
|
||||||
|
return &treeMap{
|
||||||
|
childDirs: map[string]*treeMap{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// inflateDirTree returns an fs.Directory tree rooted at the oldest common
|
||||||
|
// ancestor of the streams and uses virtualfs.StaticDirectory for internal nodes
|
||||||
|
// in the hierarchy. Leaf nodes are virtualfs.StreamingDirectory with the given
|
||||||
|
// DataCollections.
|
||||||
func inflateDirTree(ctx context.Context, collections []connector.DataCollection) (fs.Directory, error) {
|
func inflateDirTree(ctx context.Context, collections []connector.DataCollection) (fs.Directory, error) {
|
||||||
// TODO(ashmrtnz): Implement when virtualfs.StreamingDirectory is available.
|
roots := make(map[string]*treeMap)
|
||||||
return virtualfs.NewStaticDirectory("sample-dir", []fs.Entry{}), nil
|
|
||||||
|
for _, s := range collections {
|
||||||
|
path := s.FullPath()
|
||||||
|
|
||||||
|
if len(path) == 0 {
|
||||||
|
return nil, errors.New("no identifier for collection")
|
||||||
|
}
|
||||||
|
|
||||||
|
dir, ok := roots[path[0]]
|
||||||
|
if !ok {
|
||||||
|
dir = newTreeMap()
|
||||||
|
roots[path[0]] = dir
|
||||||
|
}
|
||||||
|
|
||||||
|
// Single DataCollection with no ancestors.
|
||||||
|
if len(path) == 1 {
|
||||||
|
dir.collection = s
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range path[1 : len(path)-1] {
|
||||||
|
newDir, ok := dir.childDirs[p]
|
||||||
|
if !ok {
|
||||||
|
newDir = newTreeMap()
|
||||||
|
|
||||||
|
if dir.childDirs == nil {
|
||||||
|
dir.childDirs = map[string]*treeMap{}
|
||||||
|
}
|
||||||
|
|
||||||
|
dir.childDirs[p] = newDir
|
||||||
|
}
|
||||||
|
|
||||||
|
dir = newDir
|
||||||
|
}
|
||||||
|
|
||||||
|
// At this point we have all the ancestor directories of this DataCollection
|
||||||
|
// as treeMap objects and `dir` is the parent directory of this
|
||||||
|
// DataCollection.
|
||||||
|
|
||||||
|
end := len(path) - 1
|
||||||
|
|
||||||
|
// Make sure this entry doesn't already exist.
|
||||||
|
if _, ok := dir.childDirs[path[end]]; ok {
|
||||||
|
return nil, errors.New(errUnsupportedDir.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
sd := newTreeMap()
|
||||||
|
sd.collection = s
|
||||||
|
dir.childDirs[path[end]] = sd
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(roots) > 1 {
|
||||||
|
return nil, errors.New("multiple root directories")
|
||||||
|
}
|
||||||
|
|
||||||
|
var res fs.Directory
|
||||||
|
for dirName, dir := range roots {
|
||||||
|
tmp, err := buildKopiaDirs(dirName, dir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res = tmp
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kw KopiaWrapper) BackupCollections(
|
func (kw KopiaWrapper) BackupCollections(
|
||||||
|
|||||||
@ -4,10 +4,13 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/kopia/kopia/fs"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/stretchr/testify/suite"
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
"github.com/alcionai/corso/internal/connector"
|
||||||
|
"github.com/alcionai/corso/internal/connector/mockconnector"
|
||||||
ctesting "github.com/alcionai/corso/internal/testing"
|
ctesting "github.com/alcionai/corso/internal/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -25,6 +28,16 @@ func openKopiaRepo(ctx context.Context, prefix string) (*KopiaWrapper, error) {
|
|||||||
return k, nil
|
return k, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func entriesToNames(entries []fs.Entry) []string {
|
||||||
|
res := make([]string, 0, len(entries))
|
||||||
|
|
||||||
|
for _, e := range entries {
|
||||||
|
res = append(res, e.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------
|
// ---------------
|
||||||
// unit tests
|
// unit tests
|
||||||
// ---------------
|
// ---------------
|
||||||
@ -32,6 +45,10 @@ type KopiaUnitSuite struct {
|
|||||||
suite.Suite
|
suite.Suite
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestKopiaUnitSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(KopiaUnitSuite))
|
||||||
|
}
|
||||||
|
|
||||||
func (suite *KopiaUnitSuite) TestCloseWithoutOpenDoesNotCrash() {
|
func (suite *KopiaUnitSuite) TestCloseWithoutOpenDoesNotCrash() {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
ctesting.LogTimeOfTest(suite.T())
|
ctesting.LogTimeOfTest(suite.T())
|
||||||
@ -42,6 +59,158 @@ func (suite *KopiaUnitSuite) TestCloseWithoutOpenDoesNotCrash() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (suite *KopiaUnitSuite) TestBuildDirectoryTree() {
|
||||||
|
ctesting.LogTimeOfTest(suite.T())
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
tenant := "a-tenant"
|
||||||
|
user1 := "user1"
|
||||||
|
user2 := "user2"
|
||||||
|
emails := "emails"
|
||||||
|
|
||||||
|
expectedFileCount := map[string]int{
|
||||||
|
user1: 5,
|
||||||
|
user2: 42,
|
||||||
|
}
|
||||||
|
|
||||||
|
collections := []connector.DataCollection{
|
||||||
|
mockconnector.NewMockExchangeDataCollection(
|
||||||
|
[]string{tenant, user1, emails},
|
||||||
|
expectedFileCount[user1],
|
||||||
|
),
|
||||||
|
mockconnector.NewMockExchangeDataCollection(
|
||||||
|
[]string{tenant, user2, emails},
|
||||||
|
expectedFileCount[user2],
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returned directory structure should look like:
|
||||||
|
// - a-tenant
|
||||||
|
// - user1
|
||||||
|
// - emails
|
||||||
|
// - 5 separate files
|
||||||
|
// - user2
|
||||||
|
// - emails
|
||||||
|
// - 42 separate files
|
||||||
|
dirTree, err := inflateDirTree(ctx, collections)
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
assert.Equal(suite.T(), dirTree.Name(), tenant)
|
||||||
|
|
||||||
|
entries, err := fs.GetAllEntries(ctx, dirTree)
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
names := entriesToNames(entries)
|
||||||
|
assert.Len(suite.T(), names, 2)
|
||||||
|
assert.Contains(suite.T(), names, user1)
|
||||||
|
assert.Contains(suite.T(), names, user2)
|
||||||
|
|
||||||
|
for _, entry := range entries {
|
||||||
|
dir, ok := entry.(fs.Directory)
|
||||||
|
require.True(suite.T(), ok)
|
||||||
|
|
||||||
|
subEntries, err := fs.GetAllEntries(ctx, dir)
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
require.Len(suite.T(), subEntries, 1)
|
||||||
|
assert.Contains(suite.T(), subEntries[0].Name(), emails)
|
||||||
|
|
||||||
|
subDir, ok := subEntries[0].(fs.Directory)
|
||||||
|
emailFiles, err := fs.GetAllEntries(ctx, subDir)
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
assert.Len(suite.T(), emailFiles, expectedFileCount[entry.Name()])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() {
|
||||||
|
ctesting.LogTimeOfTest(suite.T())
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
emails := "emails"
|
||||||
|
|
||||||
|
expectedFileCount := 42
|
||||||
|
|
||||||
|
collections := []connector.DataCollection{
|
||||||
|
mockconnector.NewMockExchangeDataCollection(
|
||||||
|
[]string{emails},
|
||||||
|
expectedFileCount,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returned directory structure should look like:
|
||||||
|
// - emails
|
||||||
|
// - 42 separate files
|
||||||
|
dirTree, err := inflateDirTree(ctx, collections)
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
assert.Equal(suite.T(), dirTree.Name(), emails)
|
||||||
|
|
||||||
|
entries, err := fs.GetAllEntries(ctx, dirTree)
|
||||||
|
require.NoError(suite.T(), err)
|
||||||
|
assert.Len(suite.T(), entries, 42)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() {
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
layout []connector.DataCollection
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"MultipleRoots",
|
||||||
|
// Directory structure would look like:
|
||||||
|
// - user1
|
||||||
|
// - emails
|
||||||
|
// - 5 separate files
|
||||||
|
// - user2
|
||||||
|
// - emails
|
||||||
|
// - 42 separate files
|
||||||
|
[]connector.DataCollection{
|
||||||
|
mockconnector.NewMockExchangeDataCollection(
|
||||||
|
[]string{"user1", "emails"},
|
||||||
|
5,
|
||||||
|
),
|
||||||
|
mockconnector.NewMockExchangeDataCollection(
|
||||||
|
[]string{"user2", "emails"},
|
||||||
|
42,
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"NoCollectionPath",
|
||||||
|
[]connector.DataCollection{
|
||||||
|
mockconnector.NewMockExchangeDataCollection(
|
||||||
|
nil,
|
||||||
|
5,
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"MixedDirectory",
|
||||||
|
// Directory structure would look like (but should return error):
|
||||||
|
// - a-tenant
|
||||||
|
// - user1
|
||||||
|
// - emails
|
||||||
|
// - 5 separate files
|
||||||
|
// - 42 separate files
|
||||||
|
[]connector.DataCollection{
|
||||||
|
mockconnector.NewMockExchangeDataCollection(
|
||||||
|
[]string{"a-tenant", "user1", "emails"},
|
||||||
|
5,
|
||||||
|
),
|
||||||
|
mockconnector.NewMockExchangeDataCollection(
|
||||||
|
[]string{"a-tenant", "user1"},
|
||||||
|
42,
|
||||||
|
),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range table {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
|
_, err := inflateDirTree(ctx, test.layout)
|
||||||
|
assert.Error(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------
|
// ---------------
|
||||||
// integration tests that use kopia
|
// integration tests that use kopia
|
||||||
// ---------------
|
// ---------------
|
||||||
@ -85,10 +254,21 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
|
|||||||
assert.NoError(suite.T(), k.Close(ctx))
|
assert.NoError(suite.T(), k.Close(ctx))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
stats, err := k.BackupCollections(ctx, nil)
|
collections := []connector.DataCollection{
|
||||||
|
mockconnector.NewMockExchangeDataCollection(
|
||||||
|
[]string{"a-tenant", "user1", "emails"},
|
||||||
|
5,
|
||||||
|
),
|
||||||
|
mockconnector.NewMockExchangeDataCollection(
|
||||||
|
[]string{"a-tenant", "user2", "emails"},
|
||||||
|
42,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
stats, err := k.BackupCollections(ctx, collections)
|
||||||
assert.NoError(suite.T(), err)
|
assert.NoError(suite.T(), err)
|
||||||
assert.Equal(suite.T(), stats.TotalFileCount, 0)
|
assert.Equal(suite.T(), stats.TotalFileCount, 47)
|
||||||
assert.Equal(suite.T(), stats.TotalDirectoryCount, 1)
|
assert.Equal(suite.T(), stats.TotalDirectoryCount, 5)
|
||||||
assert.Equal(suite.T(), stats.IgnoredErrorCount, 0)
|
assert.Equal(suite.T(), stats.IgnoredErrorCount, 0)
|
||||||
assert.Equal(suite.T(), stats.ErrorCount, 0)
|
assert.Equal(suite.T(), stats.ErrorCount, 0)
|
||||||
assert.False(suite.T(), stats.Incomplete)
|
assert.False(suite.T(), stats.Incomplete)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user