Shuffle around some kopia snapshot code (#1764)

## Description

Pull code specific to making snapshots in kopia into a separate file. Factor out code specific to handling collections in preparation for having to deal with kopia items as well during incremental backups. Apart from code movement and factoring into functions no other changes have been made

Viewing by commit will make changes easier to see

## Type of change

- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [x] 🐹 Trivial/Minor

## Issue(s)

* #1740 

## Test Plan

- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2022-12-09 13:58:16 -08:00 committed by GitHub
parent 8b2489b60c
commit e1738d657d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 1120 additions and 1034 deletions

View File

@ -0,0 +1,435 @@
package kopia
import (
"context"
"encoding/binary"
"io"
"runtime/trace"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/hashicorp/go-multierror"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/data"
D "github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path"
)
var versionSize = int(unsafe.Sizeof(serializationVersion))
// backupStreamReader is a wrapper around the io.Reader that other Corso
// components return when backing up information. It injects a version number at
// the start of the data stream. Future versions of Corso may not need this if
// they use more complex serialization logic as serialization/version injection
// will be handled by other components.
type backupStreamReader struct {
io.ReadCloser
version uint32
readBytes int
}
func (rw *backupStreamReader) Read(p []byte) (n int, err error) {
if rw.readBytes < versionSize {
marshalled := make([]byte, versionSize)
toCopy := len(marshalled) - rw.readBytes
if len(p) < toCopy {
toCopy = len(p)
}
binary.BigEndian.PutUint32(marshalled, rw.version)
copy(p, marshalled[rw.readBytes:rw.readBytes+toCopy])
rw.readBytes += toCopy
return toCopy, nil
}
return rw.ReadCloser.Read(p)
}
// restoreStreamReader is a wrapper around the io.Reader that kopia returns when
// reading data from an item. It examines and strips off the version number of
// the restored data. Future versions of Corso may not need this if they use
// more complex serialization logic as version checking/deserialization will be
// handled by other components. A reader that returns a version error is no
// longer valid and should not be used once the version error is returned.
type restoreStreamReader struct {
io.ReadCloser
expectedVersion uint32
readVersion bool
}
func (rw *restoreStreamReader) checkVersion() error {
versionBuf := make([]byte, versionSize)
for newlyRead := 0; newlyRead < versionSize; {
n, err := rw.ReadCloser.Read(versionBuf[newlyRead:])
if err != nil {
return errors.Wrap(err, "reading data format version")
}
newlyRead += n
}
version := binary.BigEndian.Uint32(versionBuf)
if version != rw.expectedVersion {
return errors.Errorf("unexpected data format %v", version)
}
return nil
}
func (rw *restoreStreamReader) Read(p []byte) (n int, err error) {
if !rw.readVersion {
rw.readVersion = true
if err := rw.checkVersion(); err != nil {
return 0, err
}
}
return rw.ReadCloser.Read(p)
}
type itemDetails struct {
info details.ItemInfo
repoPath path.Path
}
type corsoProgress struct {
snapshotfs.UploadProgress
pending map[string]*itemDetails
deets *details.Details
mu sync.RWMutex
totalBytes int64
}
// Kopia interface function used as a callback when kopia finishes processing a
// file.
func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
// Pass the call through as well so we don't break expected functionality.
defer cp.UploadProgress.FinishedFile(relativePath, err)
// Whether it succeeded or failed, remove the entry from our pending set so we
// don't leak references.
defer func() {
cp.mu.Lock()
defer cp.mu.Unlock()
delete(cp.pending, relativePath)
}()
if err != nil {
return
}
d := cp.get(relativePath)
if d == nil {
return
}
parent := d.repoPath.ToBuilder().Dir()
cp.deets.Add(
d.repoPath.String(),
d.repoPath.ShortRef(),
parent.ShortRef(),
d.info,
)
folders := []details.FolderEntry{}
for len(parent.Elements()) > 0 {
nextParent := parent.Dir()
folders = append(folders, details.FolderEntry{
RepoRef: parent.String(),
ShortRef: parent.ShortRef(),
ParentRef: nextParent.ShortRef(),
Info: details.ItemInfo{
Folder: &details.FolderInfo{
DisplayName: parent.Elements()[len(parent.Elements())-1],
},
},
})
parent = nextParent
}
cp.deets.AddFolders(folders)
}
// Kopia interface function used as a callback when kopia finishes hashing a file.
func (cp *corsoProgress) FinishedHashingFile(fname string, bytes int64) {
// Pass the call through as well so we don't break expected functionality.
defer cp.UploadProgress.FinishedHashingFile(fname, bytes)
atomic.AddInt64(&cp.totalBytes, bytes)
}
func (cp *corsoProgress) put(k string, v *itemDetails) {
cp.mu.Lock()
defer cp.mu.Unlock()
cp.pending[k] = v
}
func (cp *corsoProgress) get(k string) *itemDetails {
cp.mu.RLock()
defer cp.mu.RUnlock()
return cp.pending[k]
}
func collectionEntries(
ctx context.Context,
cb func(context.Context, fs.Entry) error,
streamedEnts data.Collection,
progress *corsoProgress,
) *multierror.Error {
if streamedEnts == nil {
return nil
}
var (
errs *multierror.Error
// Track which items have already been seen so we can skip them if we see
// them again in the data from the base snapshot.
items = streamedEnts.Items()
log = logger.Ctx(ctx)
)
for {
select {
case <-ctx.Done():
errs = multierror.Append(errs, ctx.Err())
return errs
case e, ok := <-items:
if !ok {
return errs
}
// For now assuming that item IDs don't need escaping.
itemPath, err := streamedEnts.FullPath().Append(e.UUID(), true)
if err != nil {
err = errors.Wrap(err, "getting full item path")
errs = multierror.Append(errs, err)
log.Error(err)
continue
}
log.Debugw("reading item", "path", itemPath.String())
trace.Log(ctx, "kopia:collectionEntries:item", itemPath.String())
// Not all items implement StreamInfo. For example, the metadata files
// do not because they don't contain information directly backed up or
// used for restore. If progress does not contain information about a
// finished file it just returns without an error so it's safe to skip
// adding something to it.
ei, ok := e.(data.StreamInfo)
if ok {
// Relative path given to us in the callback is missing the root
// element. Add to pending set before calling the callback to avoid race
// conditions when the item is completed.
d := &itemDetails{info: ei.Info(), repoPath: itemPath}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
}
modTime := time.Now()
if smt, ok := e.(data.StreamModTime); ok {
modTime = smt.ModTime()
}
entry := virtualfs.StreamingFileWithModTimeFromReader(
encodeAsPath(e.UUID()),
modTime,
&backupStreamReader{
version: serializationVersion,
ReadCloser: e.ToReader(),
},
)
if err := cb(ctx, entry); err != nil {
// Kopia's uploader swallows errors in most cases, so if we see
// something here it's probably a big issue and we should return.
errs = multierror.Append(errs, errors.Wrapf(err, "executing callback on %q", itemPath))
return errs
}
}
}
}
// getStreamItemFunc returns a function that can be used by kopia's
// virtualfs.StreamingDirectory to iterate through directory entries and call
// kopia callbacks on directory entries. It binds the directory to the given
// DataCollection.
func getStreamItemFunc(
staticEnts []fs.Entry,
streamedEnts data.Collection,
progress *corsoProgress,
) func(context.Context, func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
ctx, end := D.Span(ctx, "kopia:getStreamItemFunc")
defer end()
// Return static entries in this directory first.
for _, d := range staticEnts {
if err := cb(ctx, d); err != nil {
return errors.Wrap(err, "executing callback on static directory")
}
}
errs := collectionEntries(ctx, cb, streamedEnts, progress)
// TODO(ashmrtn): Stream entries from base snapshot if they exist.
return errs.ErrorOrNil()
}
}
// buildKopiaDirs recursively builds a directory hierarchy from the roots up.
// Returned directories are virtualfs.StreamingDirectory.
func buildKopiaDirs(dirName string, dir *treeMap, progress *corsoProgress) (fs.Directory, error) {
// Need to build the directory tree from the leaves up because intermediate
// directories need to have all their entries at creation time.
var childDirs []fs.Entry
for childName, childDir := range dir.childDirs {
child, err := buildKopiaDirs(childName, childDir, progress)
if err != nil {
return nil, err
}
childDirs = append(childDirs, child)
}
return virtualfs.NewStreamingDirectory(
encodeAsPath(dirName),
getStreamItemFunc(childDirs, dir.collection, progress),
), nil
}
type treeMap struct {
childDirs map[string]*treeMap
collection data.Collection
}
func newTreeMap() *treeMap {
return &treeMap{
childDirs: map[string]*treeMap{},
}
}
// getTreeNode walks the tree(s) with roots roots and returns the node specified
// by pathElements. If pathElements is nil or empty then returns nil. Tree nodes
// are created for any path elements where a node is not already present.
func getTreeNode(roots map[string]*treeMap, pathElements []string) *treeMap {
if len(pathElements) == 0 {
return nil
}
dir, ok := roots[pathElements[0]]
if !ok {
dir = newTreeMap()
roots[pathElements[0]] = dir
}
// Use actual indices so this is automatically skipped if
// len(pathElements) == 1.
for i := 1; i < len(pathElements); i++ {
p := pathElements[i]
newDir := dir.childDirs[p]
if newDir == nil {
newDir = newTreeMap()
if dir.childDirs == nil {
dir.childDirs = map[string]*treeMap{}
}
dir.childDirs[p] = newDir
}
dir = newDir
}
return dir
}
func inflateCollectionTree(
ctx context.Context,
collections []data.Collection,
) (map[string]*treeMap, *OwnersCats, error) {
roots := make(map[string]*treeMap)
ownerCats := &OwnersCats{
ResourceOwners: make(map[string]struct{}),
ServiceCats: make(map[string]struct{}),
}
for _, s := range collections {
if s.FullPath() == nil || len(s.FullPath().Elements()) == 0 {
return nil, nil, errors.New("no identifier for collection")
}
node := getTreeNode(roots, s.FullPath().Elements())
if node == nil {
return nil, nil, errors.Errorf(
"unable to get tree node for path %s",
s.FullPath(),
)
}
serviceCat := serviceCatTag(s.FullPath())
ownerCats.ServiceCats[serviceCat] = struct{}{}
ownerCats.ResourceOwners[s.FullPath().ResourceOwner()] = struct{}{}
node.collection = s
}
return roots, ownerCats, nil
}
// inflateDirTree returns a set of tags representing all the resource owners and
// service/categories in the snapshot and a fs.Directory tree rooted at the
// oldest common ancestor of the streams. All nodes are
// virtualfs.StreamingDirectory with the given DataCollections if there is one
// for that node. Tags can be used in future backups to fetch old snapshots for
// caching reasons.
func inflateDirTree(
ctx context.Context,
collections []data.Collection,
progress *corsoProgress,
) (fs.Directory, *OwnersCats, error) {
roots, ownerCats, err := inflateCollectionTree(ctx, collections)
if err != nil {
return nil, nil, errors.Wrap(err, "inflating collection tree")
}
if len(roots) > 1 {
return nil, nil, errors.New("multiple root directories")
}
var res fs.Directory
for dirName, dir := range roots {
tmp, err := buildKopiaDirs(dirName, dir, progress)
if err != nil {
return nil, nil, err
}
res = tmp
}
return res, ownerCats, nil
}

View File

@ -0,0 +1,685 @@
package kopia
import (
"bytes"
"context"
"io"
stdpath "path"
"testing"
"unsafe"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/connector/mockconnector"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/path"
)
func expectDirs(
t *testing.T,
entries []fs.Entry,
dirs []string,
exactly bool,
) {
t.Helper()
if exactly {
require.Len(t, entries, len(dirs))
}
names := make([]string, 0, len(entries))
for _, e := range entries {
names = append(names, e.Name())
}
assert.Subset(t, names, dirs)
}
//revive:disable:context-as-argument
func getDirEntriesForEntry(
t *testing.T,
ctx context.Context,
entry fs.Entry,
) []fs.Entry {
//revive:enable:context-as-argument
d, ok := entry.(fs.Directory)
require.True(t, ok, "returned entry is not a directory")
entries, err := fs.GetAllEntries(ctx, d)
require.NoError(t, err)
return entries
}
// ---------------
// unit tests
// ---------------
type limitedRangeReader struct {
readLen int
io.ReadCloser
}
func (lrr *limitedRangeReader) Read(p []byte) (int, error) {
if len(p) == 0 {
// Not well specified behavior, defer to underlying reader.
return lrr.ReadCloser.Read(p)
}
toRead := lrr.readLen
if len(p) < toRead {
toRead = len(p)
}
return lrr.ReadCloser.Read(p[:toRead])
}
type VersionReadersUnitSuite struct {
suite.Suite
}
func TestVersionReadersUnitSuite(t *testing.T) {
suite.Run(t, new(VersionReadersUnitSuite))
}
func (suite *VersionReadersUnitSuite) TestWriteAndRead() {
inputData := []byte("This is some data for the reader to test with")
table := []struct {
name string
readVersion uint32
writeVersion uint32
check assert.ErrorAssertionFunc
}{
{
name: "SameVersionSucceeds",
readVersion: 42,
writeVersion: 42,
check: assert.NoError,
},
{
name: "DifferentVersionsFail",
readVersion: 7,
writeVersion: 42,
check: assert.Error,
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
baseReader := bytes.NewReader(inputData)
reversible := &restoreStreamReader{
expectedVersion: test.readVersion,
ReadCloser: &backupStreamReader{
version: test.writeVersion,
ReadCloser: io.NopCloser(baseReader),
},
}
defer reversible.Close()
allData, err := io.ReadAll(reversible)
test.check(t, err)
if err != nil {
return
}
assert.Equal(t, inputData, allData)
})
}
}
func readAllInParts(
t *testing.T,
partLen int,
reader io.ReadCloser,
) ([]byte, int) {
res := []byte{}
read := 0
tmp := make([]byte, partLen)
for {
n, err := reader.Read(tmp)
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
read += n
res = append(res, tmp[:n]...)
}
return res, read
}
func (suite *VersionReadersUnitSuite) TestWriteHandlesShortReads() {
t := suite.T()
inputData := []byte("This is some data for the reader to test with")
version := uint32(42)
baseReader := bytes.NewReader(inputData)
versioner := &backupStreamReader{
version: version,
ReadCloser: io.NopCloser(baseReader),
}
expectedToWrite := len(inputData) + int(unsafe.Sizeof(versioner.version))
// "Write" all the data.
versionedData, writtenLen := readAllInParts(t, 1, versioner)
assert.Equal(t, expectedToWrite, writtenLen)
// Read all of the data back.
baseReader = bytes.NewReader(versionedData)
reader := &restoreStreamReader{
expectedVersion: version,
// Be adversarial and only allow reads of length 1 from the byte reader.
ReadCloser: &limitedRangeReader{
readLen: 1,
ReadCloser: io.NopCloser(baseReader),
},
}
readData, readLen := readAllInParts(t, 1, reader)
// This reports the bytes read and returned to the user, excluding the version
// that is stripped off at the start.
assert.Equal(t, len(inputData), readLen)
assert.Equal(t, inputData, readData)
}
type CorsoProgressUnitSuite struct {
suite.Suite
targetFilePath path.Path
targetFileName string
}
func TestCorsoProgressUnitSuite(t *testing.T) {
suite.Run(t, new(CorsoProgressUnitSuite))
}
func (suite *CorsoProgressUnitSuite) SetupSuite() {
p, err := path.Builder{}.Append(
testInboxDir,
"testFile",
).ToDataLayerExchangePathForCategory(
testTenant,
testUser,
path.EmailCategory,
true,
)
require.NoError(suite.T(), err)
suite.targetFilePath = p
suite.targetFileName = suite.targetFilePath.ToBuilder().Dir().String()
}
type testInfo struct {
info *itemDetails
err error
totalBytes int64
}
var finishedFileTable = []struct {
name string
cachedItems func(fname string, fpath path.Path) map[string]testInfo
expectedBytes int64
expectedNumEntries int
err error
}{
{
name: "DetailsExist",
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
return map[string]testInfo{
fname: {
info: &itemDetails{details.ItemInfo{}, fpath},
err: nil,
totalBytes: 100,
},
}
},
expectedBytes: 100,
// 1 file and 5 folders.
expectedNumEntries: 6,
},
{
name: "PendingNoDetails",
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
return map[string]testInfo{
fname: {
info: nil,
err: nil,
},
}
},
expectedNumEntries: 0,
},
{
name: "HadError",
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
return map[string]testInfo{
fname: {
info: &itemDetails{details.ItemInfo{}, fpath},
err: assert.AnError,
},
}
},
expectedNumEntries: 0,
},
{
name: "NotPending",
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
return nil
},
expectedNumEntries: 0,
},
}
func (suite *CorsoProgressUnitSuite) TestFinishedFile() {
for _, test := range finishedFileTable {
suite.T().Run(test.name, func(t *testing.T) {
bd := &details.Details{}
cp := corsoProgress{
UploadProgress: &snapshotfs.NullUploadProgress{},
deets: bd,
pending: map[string]*itemDetails{},
}
ci := test.cachedItems(suite.targetFileName, suite.targetFilePath)
for k, v := range ci {
cp.put(k, v.info)
}
require.Len(t, cp.pending, len(ci))
for k, v := range ci {
cp.FinishedFile(k, v.err)
}
assert.Empty(t, cp.pending)
assert.Len(t, bd.Entries, test.expectedNumEntries)
})
}
}
func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchy() {
t := suite.T()
// Order of folders in hierarchy from root to leaf (excluding the item).
expectedFolderOrder := suite.targetFilePath.ToBuilder().Dir().Elements()
// Setup stuff.
bd := &details.Details{}
cp := corsoProgress{
UploadProgress: &snapshotfs.NullUploadProgress{},
deets: bd,
pending: map[string]*itemDetails{},
}
deets := &itemDetails{details.ItemInfo{}, suite.targetFilePath}
cp.put(suite.targetFileName, deets)
require.Len(t, cp.pending, 1)
cp.FinishedFile(suite.targetFileName, nil)
// Gather information about the current state.
var (
curRef *details.DetailsEntry
refToEntry = map[string]*details.DetailsEntry{}
)
for i := 0; i < len(bd.Entries); i++ {
e := &bd.Entries[i]
if e.Folder == nil {
continue
}
refToEntry[e.ShortRef] = e
if e.Folder.DisplayName == expectedFolderOrder[len(expectedFolderOrder)-1] {
curRef = e
}
}
// Actual tests start here.
var rootRef *details.DetailsEntry
// Traverse the details entries from leaf to root, following the ParentRef
// fields. At the end rootRef should point to the root of the path.
for i := len(expectedFolderOrder) - 1; i >= 0; i-- {
name := expectedFolderOrder[i]
require.NotNil(t, curRef)
assert.Equal(t, name, curRef.Folder.DisplayName)
rootRef = curRef
curRef = refToEntry[curRef.ParentRef]
}
// Hierarchy root's ParentRef = "" and map will return nil.
assert.Nil(t, curRef)
require.NotNil(t, rootRef)
assert.Empty(t, rootRef.ParentRef)
}
func (suite *CorsoProgressUnitSuite) TestFinishedHashingFile() {
for _, test := range finishedFileTable {
suite.T().Run(test.name, func(t *testing.T) {
bd := &details.Details{}
cp := corsoProgress{
UploadProgress: &snapshotfs.NullUploadProgress{},
deets: bd,
pending: map[string]*itemDetails{},
}
ci := test.cachedItems(suite.targetFileName, suite.targetFilePath)
for k, v := range ci {
cp.FinishedHashingFile(k, v.totalBytes)
}
assert.Empty(t, cp.pending)
assert.Equal(t, test.expectedBytes, cp.totalBytes)
})
}
}
type HierarchyBuilderUnitSuite struct {
suite.Suite
testPath path.Path
}
func (suite *HierarchyBuilderUnitSuite) SetupSuite() {
tmp, err := path.FromDataLayerPath(
stdpath.Join(
testTenant,
path.ExchangeService.String(),
testUser,
path.EmailCategory.String(),
testInboxDir,
),
false,
)
require.NoError(suite.T(), err)
suite.testPath = tmp
}
func TestHierarchyBuilderUnitSuite(t *testing.T) {
suite.Run(t, new(HierarchyBuilderUnitSuite))
}
func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree() {
tester.LogTimeOfTest(suite.T())
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
tenant := "a-tenant"
user1 := testUser
user1Encoded := encodeAsPath(user1)
user2 := "user2"
user2Encoded := encodeAsPath(user2)
p2, err := path.FromDataLayerPath(
stdpath.Join(
tenant,
service,
user2,
category,
testInboxDir,
),
false,
)
require.NoError(t, err)
// Encode user names here so we don't have to decode things later.
expectedFileCount := map[string]int{
user1Encoded: 5,
user2Encoded: 42,
}
expectedServiceCats := map[string]struct{}{
serviceCatTag(suite.testPath): {},
serviceCatTag(p2): {},
}
expectedResourceOwners := map[string]struct{}{
suite.testPath.ResourceOwner(): {},
p2.ResourceOwner(): {},
}
progress := &corsoProgress{pending: map[string]*itemDetails{}}
collections := []data.Collection{
mockconnector.NewMockExchangeCollection(
suite.testPath,
expectedFileCount[user1Encoded],
),
mockconnector.NewMockExchangeCollection(
p2,
expectedFileCount[user2Encoded],
),
}
// Returned directory structure should look like:
// - a-tenant
// - exchange
// - user1
// - emails
// - Inbox
// - 5 separate files
// - user2
// - emails
// - Inbox
// - 42 separate files
dirTree, oc, err := inflateDirTree(ctx, collections, progress)
require.NoError(t, err)
assert.Equal(t, expectedServiceCats, oc.ServiceCats)
assert.Equal(t, expectedResourceOwners, oc.ResourceOwners)
assert.Equal(t, encodeAsPath(testTenant), dirTree.Name())
entries, err := fs.GetAllEntries(ctx, dirTree)
require.NoError(t, err)
expectDirs(t, entries, encodeElements(service), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
expectDirs(t, entries, encodeElements(user1, user2), true)
for _, entry := range entries {
userName := entry.Name()
entries = getDirEntriesForEntry(t, ctx, entry)
expectDirs(t, entries, encodeElements(category), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
expectDirs(t, entries, encodeElements(testInboxDir), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
assert.Len(t, entries, expectedFileCount[userName])
}
totalFileCount := 0
for _, c := range expectedFileCount {
totalFileCount += c
}
assert.Len(t, progress.pending, totalFileCount)
}
func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_MixedDirectory() {
ctx, flush := tester.NewContext()
defer flush()
subdir := "subfolder"
p2, err := suite.testPath.Append(subdir, false)
require.NoError(suite.T(), err)
expectedServiceCats := map[string]struct{}{
serviceCatTag(suite.testPath): {},
serviceCatTag(p2): {},
}
expectedResourceOwners := map[string]struct{}{
suite.testPath.ResourceOwner(): {},
p2.ResourceOwner(): {},
}
// Test multiple orders of items because right now order can matter. Both
// orders result in a directory structure like:
// - a-tenant
// - exchange
// - user1
// - emails
// - Inbox
// - subfolder
// - 5 separate files
// - 42 separate files
table := []struct {
name string
layout []data.Collection
}{
{
name: "SubdirFirst",
layout: []data.Collection{
mockconnector.NewMockExchangeCollection(
p2,
5,
),
mockconnector.NewMockExchangeCollection(
suite.testPath,
42,
),
},
},
{
name: "SubdirLast",
layout: []data.Collection{
mockconnector.NewMockExchangeCollection(
suite.testPath,
42,
),
mockconnector.NewMockExchangeCollection(
p2,
5,
),
},
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
progress := &corsoProgress{pending: map[string]*itemDetails{}}
dirTree, oc, err := inflateDirTree(ctx, test.layout, progress)
require.NoError(t, err)
assert.Equal(t, expectedServiceCats, oc.ServiceCats)
assert.Equal(t, expectedResourceOwners, oc.ResourceOwners)
assert.Equal(t, encodeAsPath(testTenant), dirTree.Name())
entries, err := fs.GetAllEntries(ctx, dirTree)
require.NoError(t, err)
expectDirs(t, entries, encodeElements(service), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
expectDirs(t, entries, encodeElements(testUser), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
expectDirs(t, entries, encodeElements(category), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
expectDirs(t, entries, encodeElements(testInboxDir), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
// 42 files and 1 subdirectory.
assert.Len(t, entries, 43)
// One of these entries should be a subdirectory with items in it.
subDirs := []fs.Directory(nil)
for _, e := range entries {
d, ok := e.(fs.Directory)
if !ok {
continue
}
subDirs = append(subDirs, d)
assert.Equal(t, encodeAsPath(subdir), d.Name())
}
require.Len(t, subDirs, 1)
entries = getDirEntriesForEntry(t, ctx, entries[0])
assert.Len(t, entries, 5)
})
}
}
func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_Fails() {
p2, err := path.Builder{}.Append(testInboxDir).ToDataLayerExchangePathForCategory(
"tenant2",
"user2",
path.EmailCategory,
false,
)
require.NoError(suite.T(), err)
table := []struct {
name string
layout []data.Collection
}{
{
"MultipleRoots",
// Directory structure would look like:
// - tenant1
// - exchange
// - user1
// - emails
// - Inbox
// - 5 separate files
// - tenant2
// - exchange
// - user2
// - emails
// - Inbox
// - 42 separate files
[]data.Collection{
mockconnector.NewMockExchangeCollection(
suite.testPath,
5,
),
mockconnector.NewMockExchangeCollection(
p2,
42,
),
},
},
{
"NoCollectionPath",
[]data.Collection{
mockconnector.NewMockExchangeCollection(
nil,
5,
),
},
},
}
for _, test := range table {
ctx, flush := tester.NewContext()
defer flush()
suite.T().Run(test.name, func(t *testing.T) {
_, _, err := inflateDirTree(ctx, test.layout, nil)
assert.Error(t, err)
})
}
}

View File

@ -2,17 +2,9 @@ package kopia
import ( import (
"context" "context"
"encoding/binary"
"io"
"runtime/trace"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
"github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot"
@ -41,86 +33,8 @@ const (
var ( var (
errNotConnected = errors.New("not connected to repo") errNotConnected = errors.New("not connected to repo")
errNoRestorePath = errors.New("no restore path given") errNoRestorePath = errors.New("no restore path given")
versionSize = int(unsafe.Sizeof(serializationVersion))
) )
// backupStreamReader is a wrapper around the io.Reader that other Corso
// components return when backing up information. It injects a version number at
// the start of the data stream. Future versions of Corso may not need this if
// they use more complex serialization logic as serialization/version injection
// will be handled by other components.
type backupStreamReader struct {
io.ReadCloser
version uint32
readBytes int
}
func (rw *backupStreamReader) Read(p []byte) (n int, err error) {
if rw.readBytes < versionSize {
marshalled := make([]byte, versionSize)
toCopy := len(marshalled) - rw.readBytes
if len(p) < toCopy {
toCopy = len(p)
}
binary.BigEndian.PutUint32(marshalled, rw.version)
copy(p, marshalled[rw.readBytes:rw.readBytes+toCopy])
rw.readBytes += toCopy
return toCopy, nil
}
return rw.ReadCloser.Read(p)
}
// restoreStreamReader is a wrapper around the io.Reader that kopia returns when
// reading data from an item. It examines and strips off the version number of
// the restored data. Future versions of Corso may not need this if they use
// more complex serialization logic as version checking/deserialization will be
// handled by other components. A reader that returns a version error is no
// longer valid and should not be used once the version error is returned.
type restoreStreamReader struct {
io.ReadCloser
expectedVersion uint32
readVersion bool
}
func (rw *restoreStreamReader) checkVersion() error {
versionBuf := make([]byte, versionSize)
for newlyRead := 0; newlyRead < versionSize; {
n, err := rw.ReadCloser.Read(versionBuf[newlyRead:])
if err != nil {
return errors.Wrap(err, "reading data format version")
}
newlyRead += n
}
version := binary.BigEndian.Uint32(versionBuf)
if version != rw.expectedVersion {
return errors.Errorf("unexpected data format %v", version)
}
return nil
}
func (rw *restoreStreamReader) Read(p []byte) (n int, err error) {
if !rw.readVersion {
rw.readVersion = true
if err := rw.checkVersion(); err != nil {
return 0, err
}
}
return rw.ReadCloser.Read(p)
}
type BackupStats struct { type BackupStats struct {
SnapshotID string SnapshotID string
@ -161,95 +75,6 @@ func manifestToStats(
} }
} }
type itemDetails struct {
info details.ItemInfo
repoPath path.Path
}
type corsoProgress struct {
snapshotfs.UploadProgress
pending map[string]*itemDetails
deets *details.Details
mu sync.RWMutex
totalBytes int64
}
// Kopia interface function used as a callback when kopia finishes processing a
// file.
func (cp *corsoProgress) FinishedFile(relativePath string, err error) {
// Pass the call through as well so we don't break expected functionality.
defer cp.UploadProgress.FinishedFile(relativePath, err)
// Whether it succeeded or failed, remove the entry from our pending set so we
// don't leak references.
defer func() {
cp.mu.Lock()
defer cp.mu.Unlock()
delete(cp.pending, relativePath)
}()
if err != nil {
return
}
d := cp.get(relativePath)
if d == nil {
return
}
parent := d.repoPath.ToBuilder().Dir()
cp.deets.Add(
d.repoPath.String(),
d.repoPath.ShortRef(),
parent.ShortRef(),
d.info,
)
folders := []details.FolderEntry{}
for len(parent.Elements()) > 0 {
nextParent := parent.Dir()
folders = append(folders, details.FolderEntry{
RepoRef: parent.String(),
ShortRef: parent.ShortRef(),
ParentRef: nextParent.ShortRef(),
Info: details.ItemInfo{
Folder: &details.FolderInfo{
DisplayName: parent.Elements()[len(parent.Elements())-1],
},
},
})
parent = nextParent
}
cp.deets.AddFolders(folders)
}
// Kopia interface function used as a callback when kopia finishes hashing a file.
func (cp *corsoProgress) FinishedHashingFile(fname string, bytes int64) {
// Pass the call through as well so we don't break expected functionality.
defer cp.UploadProgress.FinishedHashingFile(fname, bytes)
atomic.AddInt64(&cp.totalBytes, bytes)
}
func (cp *corsoProgress) put(k string, v *itemDetails) {
cp.mu.Lock()
defer cp.mu.Unlock()
cp.pending[k] = v
}
func (cp *corsoProgress) get(k string) *itemDetails {
cp.mu.RLock()
defer cp.mu.RUnlock()
return cp.pending[k]
}
func NewWrapper(c *conn) (*Wrapper, error) { func NewWrapper(c *conn) (*Wrapper, error) {
if err := c.wrap(); err != nil { if err := c.wrap(); err != nil {
return nil, errors.Wrap(err, "creating Wrapper") return nil, errors.Wrap(err, "creating Wrapper")
@ -273,226 +98,6 @@ func (w *Wrapper) Close(ctx context.Context) error {
return errors.Wrap(err, "closing Wrapper") return errors.Wrap(err, "closing Wrapper")
} }
// getStreamItemFunc returns a function that can be used by kopia's
// virtualfs.StreamingDirectory to iterate through directory entries and call
// kopia callbacks on directory entries. It binds the directory to the given
// DataCollection.
func getStreamItemFunc(
staticEnts []fs.Entry,
streamedEnts data.Collection,
progress *corsoProgress,
) func(context.Context, func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
ctx, end := D.Span(ctx, "kopia:getStreamItemFunc")
defer end()
log := logger.Ctx(ctx)
// Collect all errors and return them at the end so that iteration for this
// directory doesn't end early.
var errs *multierror.Error
// Return static entries in this directory first.
for _, d := range staticEnts {
if err := cb(ctx, d); err != nil {
return errors.Wrap(err, "executing callback on static directory")
}
}
if streamedEnts == nil {
return nil
}
items := streamedEnts.Items()
for {
select {
case <-ctx.Done():
return ctx.Err()
case e, ok := <-items:
if !ok {
return errs.ErrorOrNil()
}
// For now assuming that item IDs don't need escaping.
itemPath, err := streamedEnts.FullPath().Append(e.UUID(), true)
if err != nil {
err = errors.Wrap(err, "getting full item path")
errs = multierror.Append(errs, err)
log.Error(err)
continue
}
log.Debugw("reading item", "path", itemPath.String())
trace.Log(ctx, "kopia:getStreamItemFunc:item", itemPath.String())
// Not all items implement StreamInfo. For example, the metadata files
// do not because they don't contain information directly backed up or
// used for restore. If progress does not contain information about a
// finished file it just returns without an error so it's safe to skip
// adding something to it.
ei, ok := e.(data.StreamInfo)
if ok {
// Relative path given to us in the callback is missing the root
// element. Add to pending set before calling the callback to avoid race
// conditions when the item is completed.
d := &itemDetails{info: ei.Info(), repoPath: itemPath}
progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d)
}
modTime := time.Now()
if smt, ok := e.(data.StreamModTime); ok {
modTime = smt.ModTime()
}
entry := virtualfs.StreamingFileWithModTimeFromReader(
encodeAsPath(e.UUID()),
modTime,
&backupStreamReader{
version: serializationVersion,
ReadCloser: e.ToReader(),
},
)
if err := cb(ctx, entry); err != nil {
// Kopia's uploader swallows errors in most cases, so if we see
// something here it's probably a big issue and we should return.
errs = multierror.Append(errs, errors.Wrapf(err, "executing callback on %q", itemPath))
return errs.ErrorOrNil()
}
}
}
}
}
// buildKopiaDirs recursively builds a directory hierarchy from the roots up.
// Returned directories are virtualfs.StreamingDirectory.
func buildKopiaDirs(dirName string, dir *treeMap, progress *corsoProgress) (fs.Directory, error) {
// Need to build the directory tree from the leaves up because intermediate
// directories need to have all their entries at creation time.
var childDirs []fs.Entry
for childName, childDir := range dir.childDirs {
child, err := buildKopiaDirs(childName, childDir, progress)
if err != nil {
return nil, err
}
childDirs = append(childDirs, child)
}
return virtualfs.NewStreamingDirectory(
encodeAsPath(dirName),
getStreamItemFunc(childDirs, dir.collection, progress),
), nil
}
type treeMap struct {
childDirs map[string]*treeMap
collection data.Collection
}
func newTreeMap() *treeMap {
return &treeMap{
childDirs: map[string]*treeMap{},
}
}
// inflateDirTree returns a set of tags representing all the resource owners and
// service/categories in the snapshot and a fs.Directory tree rooted at the
// oldest common ancestor of the streams. All nodes are
// virtualfs.StreamingDirectory with the given DataCollections if there is one
// for that node. Tags can be used in future backups to fetch old snapshots for
// caching reasons.
func inflateDirTree(
ctx context.Context,
collections []data.Collection,
progress *corsoProgress,
) (fs.Directory, *OwnersCats, error) {
roots := make(map[string]*treeMap)
ownerCats := &OwnersCats{
ResourceOwners: make(map[string]struct{}),
ServiceCats: make(map[string]struct{}),
}
for _, s := range collections {
if s.FullPath() == nil {
return nil, nil, errors.New("no identifier for collection")
}
serviceCat := serviceCatTag(s.FullPath())
ownerCats.ServiceCats[serviceCat] = struct{}{}
ownerCats.ResourceOwners[s.FullPath().ResourceOwner()] = struct{}{}
itemPath := s.FullPath().Elements()
if len(itemPath) == 0 {
return nil, nil, errors.New("no identifier for collection")
}
dir, ok := roots[itemPath[0]]
if !ok {
dir = newTreeMap()
roots[itemPath[0]] = dir
}
// Single DataCollection with no ancestors.
if len(itemPath) == 1 {
dir.collection = s
continue
}
for _, p := range itemPath[1 : len(itemPath)-1] {
newDir := dir.childDirs[p]
if newDir == nil {
newDir = newTreeMap()
if dir.childDirs == nil {
dir.childDirs = map[string]*treeMap{}
}
dir.childDirs[p] = newDir
}
dir = newDir
}
// At this point we have all the ancestor directories of this DataCollection
// as treeMap objects and `dir` is the parent directory of this
// DataCollection.
end := len(itemPath) - 1
// Make sure this entry doesn't already exist.
tmpDir := dir.childDirs[itemPath[end]]
if tmpDir == nil {
tmpDir = newTreeMap()
dir.childDirs[itemPath[end]] = tmpDir
}
tmpDir.collection = s
}
if len(roots) > 1 {
return nil, nil, errors.New("multiple root directories")
}
var res fs.Directory
for dirName, dir := range roots {
tmp, err := buildKopiaDirs(dirName, dir, progress)
if err != nil {
return nil, nil, err
}
res = tmp
}
return res, ownerCats, nil
}
// BackupCollections takes a set of collections and creates a kopia snapshot // BackupCollections takes a set of collections and creates a kopia snapshot
// with the data that they contain. previousSnapshots is used for incremental // with the data that they contain. previousSnapshots is used for incremental
// backups and should represent the base snapshot from which metadata is sourced // backups and should represent the base snapshot from which metadata is sourced

View File

@ -3,19 +3,15 @@ package kopia
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"io" "io"
"io/ioutil" "io/ioutil"
stdpath "path" stdpath "path"
"testing" "testing"
"unsafe"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -24,7 +20,6 @@ import (
"github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/model"
"github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/backup/details"
"github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/logger"
"github.com/alcionai/corso/src/pkg/path" "github.com/alcionai/corso/src/pkg/path"
) )
@ -88,42 +83,6 @@ func testForFiles(
assert.Equal(t, len(expected), count) assert.Equal(t, len(expected), count)
} }
func expectDirs(
t *testing.T,
entries []fs.Entry,
dirs []string,
exactly bool,
) {
t.Helper()
if exactly {
require.Len(t, entries, len(dirs))
}
names := make([]string, 0, len(entries))
for _, e := range entries {
names = append(names, e.Name())
}
assert.Subset(t, names, dirs)
}
//revive:disable:context-as-argument
func getDirEntriesForEntry(
t *testing.T,
ctx context.Context,
entry fs.Entry,
) []fs.Entry {
//revive:enable:context-as-argument
d, ok := entry.(fs.Directory)
require.True(t, ok, "returned entry is not a directory")
entries, err := fs.GetAllEntries(ctx, d)
require.NoError(t, err)
return entries
}
//revive:disable:context-as-argument //revive:disable:context-as-argument
func checkSnapshotTags( func checkSnapshotTags(
t *testing.T, t *testing.T,
@ -141,333 +100,6 @@ func checkSnapshotTags(
// --------------- // ---------------
// unit tests // unit tests
// --------------- // ---------------
type limitedRangeReader struct {
readLen int
io.ReadCloser
}
func (lrr *limitedRangeReader) Read(p []byte) (int, error) {
if len(p) == 0 {
// Not well specified behavior, defer to underlying reader.
return lrr.ReadCloser.Read(p)
}
toRead := lrr.readLen
if len(p) < toRead {
toRead = len(p)
}
return lrr.ReadCloser.Read(p[:toRead])
}
type VersionReadersUnitSuite struct {
suite.Suite
}
func TestVersionReadersUnitSuite(t *testing.T) {
suite.Run(t, new(VersionReadersUnitSuite))
}
func (suite *VersionReadersUnitSuite) TestWriteAndRead() {
inputData := []byte("This is some data for the reader to test with")
table := []struct {
name string
readVersion uint32
writeVersion uint32
check assert.ErrorAssertionFunc
}{
{
name: "SameVersionSucceeds",
readVersion: 42,
writeVersion: 42,
check: assert.NoError,
},
{
name: "DifferentVersionsFail",
readVersion: 7,
writeVersion: 42,
check: assert.Error,
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
baseReader := bytes.NewReader(inputData)
reversible := &restoreStreamReader{
expectedVersion: test.readVersion,
ReadCloser: &backupStreamReader{
version: test.writeVersion,
ReadCloser: io.NopCloser(baseReader),
},
}
defer reversible.Close()
allData, err := io.ReadAll(reversible)
test.check(t, err)
if err != nil {
return
}
assert.Equal(t, inputData, allData)
})
}
}
func readAllInParts(
t *testing.T,
partLen int,
reader io.ReadCloser,
) ([]byte, int) {
res := []byte{}
read := 0
tmp := make([]byte, partLen)
for {
n, err := reader.Read(tmp)
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
read += n
res = append(res, tmp[:n]...)
}
return res, read
}
func (suite *VersionReadersUnitSuite) TestWriteHandlesShortReads() {
t := suite.T()
inputData := []byte("This is some data for the reader to test with")
version := uint32(42)
baseReader := bytes.NewReader(inputData)
versioner := &backupStreamReader{
version: version,
ReadCloser: io.NopCloser(baseReader),
}
expectedToWrite := len(inputData) + int(unsafe.Sizeof(versioner.version))
// "Write" all the data.
versionedData, writtenLen := readAllInParts(t, 1, versioner)
assert.Equal(t, expectedToWrite, writtenLen)
// Read all of the data back.
baseReader = bytes.NewReader(versionedData)
reader := &restoreStreamReader{
expectedVersion: version,
// Be adversarial and only allow reads of length 1 from the byte reader.
ReadCloser: &limitedRangeReader{
readLen: 1,
ReadCloser: io.NopCloser(baseReader),
},
}
readData, readLen := readAllInParts(t, 1, reader)
// This reports the bytes read and returned to the user, excluding the version
// that is stripped off at the start.
assert.Equal(t, len(inputData), readLen)
assert.Equal(t, inputData, readData)
}
type CorsoProgressUnitSuite struct {
suite.Suite
targetFilePath path.Path
targetFileName string
}
func TestCorsoProgressUnitSuite(t *testing.T) {
suite.Run(t, new(CorsoProgressUnitSuite))
}
func (suite *CorsoProgressUnitSuite) SetupSuite() {
p, err := path.Builder{}.Append(
testInboxDir,
"testFile",
).ToDataLayerExchangePathForCategory(
testTenant,
testUser,
path.EmailCategory,
true,
)
require.NoError(suite.T(), err)
suite.targetFilePath = p
suite.targetFileName = suite.targetFilePath.ToBuilder().Dir().String()
}
type testInfo struct {
info *itemDetails
err error
totalBytes int64
}
var finishedFileTable = []struct {
name string
cachedItems func(fname string, fpath path.Path) map[string]testInfo
expectedBytes int64
expectedNumEntries int
err error
}{
{
name: "DetailsExist",
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
return map[string]testInfo{
fname: {
info: &itemDetails{details.ItemInfo{}, fpath},
err: nil,
totalBytes: 100,
},
}
},
expectedBytes: 100,
// 1 file and 5 folders.
expectedNumEntries: 6,
},
{
name: "PendingNoDetails",
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
return map[string]testInfo{
fname: {
info: nil,
err: nil,
},
}
},
expectedNumEntries: 0,
},
{
name: "HadError",
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
return map[string]testInfo{
fname: {
info: &itemDetails{details.ItemInfo{}, fpath},
err: assert.AnError,
},
}
},
expectedNumEntries: 0,
},
{
name: "NotPending",
cachedItems: func(fname string, fpath path.Path) map[string]testInfo {
return nil
},
expectedNumEntries: 0,
},
}
func (suite *CorsoProgressUnitSuite) TestFinishedFile() {
for _, test := range finishedFileTable {
suite.T().Run(test.name, func(t *testing.T) {
bd := &details.Details{}
cp := corsoProgress{
UploadProgress: &snapshotfs.NullUploadProgress{},
deets: bd,
pending: map[string]*itemDetails{},
}
ci := test.cachedItems(suite.targetFileName, suite.targetFilePath)
for k, v := range ci {
cp.put(k, v.info)
}
require.Len(t, cp.pending, len(ci))
for k, v := range ci {
cp.FinishedFile(k, v.err)
}
assert.Empty(t, cp.pending)
assert.Len(t, bd.Entries, test.expectedNumEntries)
})
}
}
func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchy() {
t := suite.T()
// Order of folders in hierarchy from root to leaf (excluding the item).
expectedFolderOrder := suite.targetFilePath.ToBuilder().Dir().Elements()
// Setup stuff.
bd := &details.Details{}
cp := corsoProgress{
UploadProgress: &snapshotfs.NullUploadProgress{},
deets: bd,
pending: map[string]*itemDetails{},
}
deets := &itemDetails{details.ItemInfo{}, suite.targetFilePath}
cp.put(suite.targetFileName, deets)
require.Len(t, cp.pending, 1)
cp.FinishedFile(suite.targetFileName, nil)
// Gather information about the current state.
var (
curRef *details.DetailsEntry
refToEntry = map[string]*details.DetailsEntry{}
)
for i := 0; i < len(bd.Entries); i++ {
e := &bd.Entries[i]
if e.Folder == nil {
continue
}
refToEntry[e.ShortRef] = e
if e.Folder.DisplayName == expectedFolderOrder[len(expectedFolderOrder)-1] {
curRef = e
}
}
// Actual tests start here.
var rootRef *details.DetailsEntry
// Traverse the details entries from leaf to root, following the ParentRef
// fields. At the end rootRef should point to the root of the path.
for i := len(expectedFolderOrder) - 1; i >= 0; i-- {
name := expectedFolderOrder[i]
require.NotNil(t, curRef)
assert.Equal(t, name, curRef.Folder.DisplayName)
rootRef = curRef
curRef = refToEntry[curRef.ParentRef]
}
// Hierarchy root's ParentRef = "" and map will return nil.
assert.Nil(t, curRef)
require.NotNil(t, rootRef)
assert.Empty(t, rootRef.ParentRef)
}
func (suite *CorsoProgressUnitSuite) TestFinishedHashingFile() {
for _, test := range finishedFileTable {
suite.T().Run(test.name, func(t *testing.T) {
bd := &details.Details{}
cp := corsoProgress{
UploadProgress: &snapshotfs.NullUploadProgress{},
deets: bd,
pending: map[string]*itemDetails{},
}
ci := test.cachedItems(suite.targetFileName, suite.targetFilePath)
for k, v := range ci {
cp.FinishedHashingFile(k, v.totalBytes)
}
assert.Empty(t, cp.pending)
assert.Equal(t, test.expectedBytes, cp.totalBytes)
})
}
}
type KopiaUnitSuite struct { type KopiaUnitSuite struct {
suite.Suite suite.Suite
testPath path.Path testPath path.Path
@ -503,277 +135,6 @@ func (suite *KopiaUnitSuite) TestCloseWithoutInitDoesNotPanic() {
}) })
} }
func (suite *KopiaUnitSuite) TestBuildDirectoryTree() {
tester.LogTimeOfTest(suite.T())
ctx, flush := tester.NewContext()
defer flush()
t := suite.T()
tenant := "a-tenant"
user1 := testUser
user1Encoded := encodeAsPath(user1)
user2 := "user2"
user2Encoded := encodeAsPath(user2)
p2, err := path.FromDataLayerPath(
stdpath.Join(
tenant,
service,
user2,
category,
testInboxDir,
),
false,
)
require.NoError(t, err)
// Encode user names here so we don't have to decode things later.
expectedFileCount := map[string]int{
user1Encoded: 5,
user2Encoded: 42,
}
expectedServiceCats := map[string]struct{}{
serviceCatTag(suite.testPath): {},
serviceCatTag(p2): {},
}
expectedResourceOwners := map[string]struct{}{
suite.testPath.ResourceOwner(): {},
p2.ResourceOwner(): {},
}
progress := &corsoProgress{pending: map[string]*itemDetails{}}
collections := []data.Collection{
mockconnector.NewMockExchangeCollection(
suite.testPath,
expectedFileCount[user1Encoded],
),
mockconnector.NewMockExchangeCollection(
p2,
expectedFileCount[user2Encoded],
),
}
// Returned directory structure should look like:
// - a-tenant
// - exchange
// - user1
// - emails
// - Inbox
// - 5 separate files
// - user2
// - emails
// - Inbox
// - 42 separate files
dirTree, oc, err := inflateDirTree(ctx, collections, progress)
require.NoError(t, err)
assert.Equal(t, expectedServiceCats, oc.ServiceCats)
assert.Equal(t, expectedResourceOwners, oc.ResourceOwners)
assert.Equal(t, encodeAsPath(testTenant), dirTree.Name())
entries, err := fs.GetAllEntries(ctx, dirTree)
require.NoError(t, err)
expectDirs(t, entries, encodeElements(service), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
expectDirs(t, entries, encodeElements(user1, user2), true)
for _, entry := range entries {
userName := entry.Name()
entries = getDirEntriesForEntry(t, ctx, entry)
expectDirs(t, entries, encodeElements(category), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
expectDirs(t, entries, encodeElements(testInboxDir), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
assert.Len(t, entries, expectedFileCount[userName])
}
totalFileCount := 0
for _, c := range expectedFileCount {
totalFileCount += c
}
assert.Len(t, progress.pending, totalFileCount)
}
func (suite *KopiaUnitSuite) TestBuildDirectoryTree_MixedDirectory() {
ctx, flush := tester.NewContext()
defer flush()
subdir := "subfolder"
p2, err := suite.testPath.Append(subdir, false)
require.NoError(suite.T(), err)
expectedServiceCats := map[string]struct{}{
serviceCatTag(suite.testPath): {},
serviceCatTag(p2): {},
}
expectedResourceOwners := map[string]struct{}{
suite.testPath.ResourceOwner(): {},
p2.ResourceOwner(): {},
}
// Test multiple orders of items because right now order can matter. Both
// orders result in a directory structure like:
// - a-tenant
// - exchange
// - user1
// - emails
// - Inbox
// - subfolder
// - 5 separate files
// - 42 separate files
table := []struct {
name string
layout []data.Collection
}{
{
name: "SubdirFirst",
layout: []data.Collection{
mockconnector.NewMockExchangeCollection(
p2,
5,
),
mockconnector.NewMockExchangeCollection(
suite.testPath,
42,
),
},
},
{
name: "SubdirLast",
layout: []data.Collection{
mockconnector.NewMockExchangeCollection(
suite.testPath,
42,
),
mockconnector.NewMockExchangeCollection(
p2,
5,
),
},
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
progress := &corsoProgress{pending: map[string]*itemDetails{}}
dirTree, oc, err := inflateDirTree(ctx, test.layout, progress)
require.NoError(t, err)
assert.Equal(t, expectedServiceCats, oc.ServiceCats)
assert.Equal(t, expectedResourceOwners, oc.ResourceOwners)
assert.Equal(t, encodeAsPath(testTenant), dirTree.Name())
entries, err := fs.GetAllEntries(ctx, dirTree)
require.NoError(t, err)
expectDirs(t, entries, encodeElements(service), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
expectDirs(t, entries, encodeElements(testUser), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
expectDirs(t, entries, encodeElements(category), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
expectDirs(t, entries, encodeElements(testInboxDir), true)
entries = getDirEntriesForEntry(t, ctx, entries[0])
// 42 files and 1 subdirectory.
assert.Len(t, entries, 43)
// One of these entries should be a subdirectory with items in it.
subDirs := []fs.Directory(nil)
for _, e := range entries {
d, ok := e.(fs.Directory)
if !ok {
continue
}
subDirs = append(subDirs, d)
assert.Equal(t, encodeAsPath(subdir), d.Name())
}
require.Len(t, subDirs, 1)
entries = getDirEntriesForEntry(t, ctx, entries[0])
assert.Len(t, entries, 5)
})
}
}
func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() {
p2, err := path.Builder{}.Append(testInboxDir).ToDataLayerExchangePathForCategory(
"tenant2",
"user2",
path.EmailCategory,
false,
)
require.NoError(suite.T(), err)
table := []struct {
name string
layout []data.Collection
}{
{
"MultipleRoots",
// Directory structure would look like:
// - tenant1
// - exchange
// - user1
// - emails
// - Inbox
// - 5 separate files
// - tenant2
// - exchange
// - user2
// - emails
// - Inbox
// - 42 separate files
[]data.Collection{
mockconnector.NewMockExchangeCollection(
suite.testPath,
5,
),
mockconnector.NewMockExchangeCollection(
p2,
42,
),
},
},
{
"NoCollectionPath",
[]data.Collection{
mockconnector.NewMockExchangeCollection(
nil,
5,
),
},
},
}
for _, test := range table {
ctx, flush := tester.NewContext()
defer flush()
suite.T().Run(test.name, func(t *testing.T) {
_, _, err := inflateDirTree(ctx, test.layout, nil)
assert.Error(t, err)
})
}
}
// --------------- // ---------------
// integration tests that use kopia // integration tests that use kopia
// --------------- // ---------------