diff --git a/src/internal/kopia/kopia.go b/src/internal/kopia/kopia.go index 9e76ddd22..1fa88b313 100644 --- a/src/internal/kopia/kopia.go +++ b/src/internal/kopia/kopia.go @@ -2,6 +2,7 @@ package kopia import ( "context" + "io" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/virtualfs" @@ -26,9 +27,10 @@ const ( ) var ( - errInit = errors.New("initializing repo") - errConnect = errors.New("connecting repo") - errNotConnected = errors.New("not connected to repo") + errInit = errors.New("initializing repo") + errConnect = errors.New("connecting repo") + errNotConnected = errors.New("not connected to repo") + errUnsupportedDir = errors.New("unsupported static children in streaming directory") ) type BackupStats struct { @@ -160,9 +162,145 @@ func (kw *KopiaWrapper) open(ctx context.Context, password string) error { return nil } +// getStreamItemFunc returns a function that can be used by kopia's +// virtualfs.StreamingDirectory to iterate through directory entries and call +// kopia callbacks on directory entries. It binds the directory to the given +// DataCollection. +func getStreamItemFunc( + collection connector.DataCollection, +) func(context.Context, func(context.Context, fs.Entry) error) error { + return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error { + for { + e, err := collection.NextItem() + if err != nil { + if err == io.EOF { + return nil + } + + return errors.Wrap(err, "materializing directory entry") + } + + entry := virtualfs.StreamingFileFromReader(e.UUID(), e.ToReader()) + if err = cb(ctx, entry); err != nil { + return errors.Wrap(err, "executing callback") + } + } + } +} + +// buildKopiaDirs recursively builds a directory hierarchy from the roots up. +// Returned directories are either virtualfs.StreamingDirectory or +// virtualfs.staticDirectory. +func buildKopiaDirs(dirName string, dir *treeMap) (fs.Directory, error) { + // Don't support directories that have both a DataCollection and a set of + // static child directories. + if dir.collection != nil && len(dir.childDirs) > 0 { + return nil, errors.New(errUnsupportedDir.Error()) + } + + if dir.collection != nil { + return virtualfs.NewStreamingDirectory(dirName, getStreamItemFunc(dir.collection)), nil + } + + // Need to build the directory tree from the leaves up because intermediate + // directories need to have all their entries at creation time. + childDirs := []fs.Entry{} + + for childName, childDir := range dir.childDirs { + child, err := buildKopiaDirs(childName, childDir) + if err != nil { + return nil, err + } + + childDirs = append(childDirs, child) + } + + return virtualfs.NewStaticDirectory(dirName, childDirs), nil +} + +type treeMap struct { + childDirs map[string]*treeMap + collection connector.DataCollection +} + +func newTreeMap() *treeMap { + return &treeMap{ + childDirs: map[string]*treeMap{}, + } +} + +// inflateDirTree returns an fs.Directory tree rooted at the oldest common +// ancestor of the streams and uses virtualfs.StaticDirectory for internal nodes +// in the hierarchy. Leaf nodes are virtualfs.StreamingDirectory with the given +// DataCollections. func inflateDirTree(ctx context.Context, collections []connector.DataCollection) (fs.Directory, error) { - // TODO(ashmrtnz): Implement when virtualfs.StreamingDirectory is available. - return virtualfs.NewStaticDirectory("sample-dir", []fs.Entry{}), nil + roots := make(map[string]*treeMap) + + for _, s := range collections { + path := s.FullPath() + + if len(path) == 0 { + return nil, errors.New("no identifier for collection") + } + + dir, ok := roots[path[0]] + if !ok { + dir = newTreeMap() + roots[path[0]] = dir + } + + // Single DataCollection with no ancestors. + if len(path) == 1 { + dir.collection = s + continue + } + + for _, p := range path[1 : len(path)-1] { + newDir, ok := dir.childDirs[p] + if !ok { + newDir = newTreeMap() + + if dir.childDirs == nil { + dir.childDirs = map[string]*treeMap{} + } + + dir.childDirs[p] = newDir + } + + dir = newDir + } + + // At this point we have all the ancestor directories of this DataCollection + // as treeMap objects and `dir` is the parent directory of this + // DataCollection. + + end := len(path) - 1 + + // Make sure this entry doesn't already exist. + if _, ok := dir.childDirs[path[end]]; ok { + return nil, errors.New(errUnsupportedDir.Error()) + } + + sd := newTreeMap() + sd.collection = s + dir.childDirs[path[end]] = sd + } + + if len(roots) > 1 { + return nil, errors.New("multiple root directories") + } + + var res fs.Directory + for dirName, dir := range roots { + tmp, err := buildKopiaDirs(dirName, dir) + if err != nil { + return nil, err + } + + res = tmp + } + + return res, nil } func (kw KopiaWrapper) BackupCollections( diff --git a/src/internal/kopia/kopia_test.go b/src/internal/kopia/kopia_test.go index 77f24ba85..90514fafb 100644 --- a/src/internal/kopia/kopia_test.go +++ b/src/internal/kopia/kopia_test.go @@ -4,10 +4,13 @@ import ( "context" "testing" + "github.com/kopia/kopia/fs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/internal/connector" + "github.com/alcionai/corso/internal/connector/mockconnector" ctesting "github.com/alcionai/corso/internal/testing" ) @@ -25,6 +28,16 @@ func openKopiaRepo(ctx context.Context, prefix string) (*KopiaWrapper, error) { return k, nil } +func entriesToNames(entries []fs.Entry) []string { + res := make([]string, 0, len(entries)) + + for _, e := range entries { + res = append(res, e.Name()) + } + + return res +} + // --------------- // unit tests // --------------- @@ -32,6 +45,10 @@ type KopiaUnitSuite struct { suite.Suite } +func TestKopiaUnitSuite(t *testing.T) { + suite.Run(t, new(KopiaUnitSuite)) +} + func (suite *KopiaUnitSuite) TestCloseWithoutOpenDoesNotCrash() { ctx := context.Background() ctesting.LogTimeOfTest(suite.T()) @@ -42,6 +59,158 @@ func (suite *KopiaUnitSuite) TestCloseWithoutOpenDoesNotCrash() { }) } +func (suite *KopiaUnitSuite) TestBuildDirectoryTree() { + ctesting.LogTimeOfTest(suite.T()) + + ctx := context.Background() + tenant := "a-tenant" + user1 := "user1" + user2 := "user2" + emails := "emails" + + expectedFileCount := map[string]int{ + user1: 5, + user2: 42, + } + + collections := []connector.DataCollection{ + mockconnector.NewMockExchangeDataCollection( + []string{tenant, user1, emails}, + expectedFileCount[user1], + ), + mockconnector.NewMockExchangeDataCollection( + []string{tenant, user2, emails}, + expectedFileCount[user2], + ), + } + + // Returned directory structure should look like: + // - a-tenant + // - user1 + // - emails + // - 5 separate files + // - user2 + // - emails + // - 42 separate files + dirTree, err := inflateDirTree(ctx, collections) + require.NoError(suite.T(), err) + assert.Equal(suite.T(), dirTree.Name(), tenant) + + entries, err := fs.GetAllEntries(ctx, dirTree) + require.NoError(suite.T(), err) + names := entriesToNames(entries) + assert.Len(suite.T(), names, 2) + assert.Contains(suite.T(), names, user1) + assert.Contains(suite.T(), names, user2) + + for _, entry := range entries { + dir, ok := entry.(fs.Directory) + require.True(suite.T(), ok) + + subEntries, err := fs.GetAllEntries(ctx, dir) + require.NoError(suite.T(), err) + require.Len(suite.T(), subEntries, 1) + assert.Contains(suite.T(), subEntries[0].Name(), emails) + + subDir, ok := subEntries[0].(fs.Directory) + emailFiles, err := fs.GetAllEntries(ctx, subDir) + require.NoError(suite.T(), err) + assert.Len(suite.T(), emailFiles, expectedFileCount[entry.Name()]) + } +} + +func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() { + ctesting.LogTimeOfTest(suite.T()) + + ctx := context.Background() + emails := "emails" + + expectedFileCount := 42 + + collections := []connector.DataCollection{ + mockconnector.NewMockExchangeDataCollection( + []string{emails}, + expectedFileCount, + ), + } + + // Returned directory structure should look like: + // - emails + // - 42 separate files + dirTree, err := inflateDirTree(ctx, collections) + require.NoError(suite.T(), err) + assert.Equal(suite.T(), dirTree.Name(), emails) + + entries, err := fs.GetAllEntries(ctx, dirTree) + require.NoError(suite.T(), err) + assert.Len(suite.T(), entries, 42) +} + +func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() { + table := []struct { + name string + layout []connector.DataCollection + }{ + { + "MultipleRoots", + // Directory structure would look like: + // - user1 + // - emails + // - 5 separate files + // - user2 + // - emails + // - 42 separate files + []connector.DataCollection{ + mockconnector.NewMockExchangeDataCollection( + []string{"user1", "emails"}, + 5, + ), + mockconnector.NewMockExchangeDataCollection( + []string{"user2", "emails"}, + 42, + ), + }, + }, + { + "NoCollectionPath", + []connector.DataCollection{ + mockconnector.NewMockExchangeDataCollection( + nil, + 5, + ), + }, + }, + { + "MixedDirectory", + // Directory structure would look like (but should return error): + // - a-tenant + // - user1 + // - emails + // - 5 separate files + // - 42 separate files + []connector.DataCollection{ + mockconnector.NewMockExchangeDataCollection( + []string{"a-tenant", "user1", "emails"}, + 5, + ), + mockconnector.NewMockExchangeDataCollection( + []string{"a-tenant", "user1"}, + 42, + ), + }, + }, + } + + for _, test := range table { + ctx := context.Background() + + suite.T().Run(test.name, func(t *testing.T) { + _, err := inflateDirTree(ctx, test.layout) + assert.Error(t, err) + }) + } +} + // --------------- // integration tests that use kopia // --------------- @@ -85,10 +254,21 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { assert.NoError(suite.T(), k.Close(ctx)) }() - stats, err := k.BackupCollections(ctx, nil) + collections := []connector.DataCollection{ + mockconnector.NewMockExchangeDataCollection( + []string{"a-tenant", "user1", "emails"}, + 5, + ), + mockconnector.NewMockExchangeDataCollection( + []string{"a-tenant", "user2", "emails"}, + 42, + ), + } + + stats, err := k.BackupCollections(ctx, collections) assert.NoError(suite.T(), err) - assert.Equal(suite.T(), stats.TotalFileCount, 0) - assert.Equal(suite.T(), stats.TotalDirectoryCount, 1) + assert.Equal(suite.T(), stats.TotalFileCount, 47) + assert.Equal(suite.T(), stats.TotalDirectoryCount, 5) assert.Equal(suite.T(), stats.IgnoredErrorCount, 0) assert.Equal(suite.T(), stats.ErrorCount, 0) assert.False(suite.T(), stats.Incomplete)