diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go new file mode 100644 index 000000000..4b0659927 --- /dev/null +++ b/src/internal/kopia/upload.go @@ -0,0 +1,435 @@ +package kopia + +import ( + "context" + "encoding/binary" + "io" + "runtime/trace" + "sync" + "sync/atomic" + "time" + "unsafe" + + "github.com/hashicorp/go-multierror" + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/fs/virtualfs" + "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/pkg/errors" + + "github.com/alcionai/corso/src/internal/data" + D "github.com/alcionai/corso/src/internal/diagnostics" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/logger" + "github.com/alcionai/corso/src/pkg/path" +) + +var versionSize = int(unsafe.Sizeof(serializationVersion)) + +// backupStreamReader is a wrapper around the io.Reader that other Corso +// components return when backing up information. It injects a version number at +// the start of the data stream. Future versions of Corso may not need this if +// they use more complex serialization logic as serialization/version injection +// will be handled by other components. +type backupStreamReader struct { + io.ReadCloser + version uint32 + readBytes int +} + +func (rw *backupStreamReader) Read(p []byte) (n int, err error) { + if rw.readBytes < versionSize { + marshalled := make([]byte, versionSize) + + toCopy := len(marshalled) - rw.readBytes + if len(p) < toCopy { + toCopy = len(p) + } + + binary.BigEndian.PutUint32(marshalled, rw.version) + + copy(p, marshalled[rw.readBytes:rw.readBytes+toCopy]) + rw.readBytes += toCopy + + return toCopy, nil + } + + return rw.ReadCloser.Read(p) +} + +// restoreStreamReader is a wrapper around the io.Reader that kopia returns when +// reading data from an item. It examines and strips off the version number of +// the restored data. Future versions of Corso may not need this if they use +// more complex serialization logic as version checking/deserialization will be +// handled by other components. A reader that returns a version error is no +// longer valid and should not be used once the version error is returned. +type restoreStreamReader struct { + io.ReadCloser + expectedVersion uint32 + readVersion bool +} + +func (rw *restoreStreamReader) checkVersion() error { + versionBuf := make([]byte, versionSize) + + for newlyRead := 0; newlyRead < versionSize; { + n, err := rw.ReadCloser.Read(versionBuf[newlyRead:]) + if err != nil { + return errors.Wrap(err, "reading data format version") + } + + newlyRead += n + } + + version := binary.BigEndian.Uint32(versionBuf) + + if version != rw.expectedVersion { + return errors.Errorf("unexpected data format %v", version) + } + + return nil +} + +func (rw *restoreStreamReader) Read(p []byte) (n int, err error) { + if !rw.readVersion { + rw.readVersion = true + + if err := rw.checkVersion(); err != nil { + return 0, err + } + } + + return rw.ReadCloser.Read(p) +} + +type itemDetails struct { + info details.ItemInfo + repoPath path.Path +} + +type corsoProgress struct { + snapshotfs.UploadProgress + pending map[string]*itemDetails + deets *details.Details + mu sync.RWMutex + totalBytes int64 +} + +// Kopia interface function used as a callback when kopia finishes processing a +// file. +func (cp *corsoProgress) FinishedFile(relativePath string, err error) { + // Pass the call through as well so we don't break expected functionality. + defer cp.UploadProgress.FinishedFile(relativePath, err) + // Whether it succeeded or failed, remove the entry from our pending set so we + // don't leak references. + defer func() { + cp.mu.Lock() + defer cp.mu.Unlock() + + delete(cp.pending, relativePath) + }() + + if err != nil { + return + } + + d := cp.get(relativePath) + if d == nil { + return + } + + parent := d.repoPath.ToBuilder().Dir() + + cp.deets.Add( + d.repoPath.String(), + d.repoPath.ShortRef(), + parent.ShortRef(), + d.info, + ) + + folders := []details.FolderEntry{} + + for len(parent.Elements()) > 0 { + nextParent := parent.Dir() + + folders = append(folders, details.FolderEntry{ + RepoRef: parent.String(), + ShortRef: parent.ShortRef(), + ParentRef: nextParent.ShortRef(), + Info: details.ItemInfo{ + Folder: &details.FolderInfo{ + DisplayName: parent.Elements()[len(parent.Elements())-1], + }, + }, + }) + + parent = nextParent + } + + cp.deets.AddFolders(folders) +} + +// Kopia interface function used as a callback when kopia finishes hashing a file. +func (cp *corsoProgress) FinishedHashingFile(fname string, bytes int64) { + // Pass the call through as well so we don't break expected functionality. + defer cp.UploadProgress.FinishedHashingFile(fname, bytes) + + atomic.AddInt64(&cp.totalBytes, bytes) +} + +func (cp *corsoProgress) put(k string, v *itemDetails) { + cp.mu.Lock() + defer cp.mu.Unlock() + + cp.pending[k] = v +} + +func (cp *corsoProgress) get(k string) *itemDetails { + cp.mu.RLock() + defer cp.mu.RUnlock() + + return cp.pending[k] +} + +func collectionEntries( + ctx context.Context, + cb func(context.Context, fs.Entry) error, + streamedEnts data.Collection, + progress *corsoProgress, +) *multierror.Error { + if streamedEnts == nil { + return nil + } + + var ( + errs *multierror.Error + // Track which items have already been seen so we can skip them if we see + // them again in the data from the base snapshot. + items = streamedEnts.Items() + log = logger.Ctx(ctx) + ) + + for { + select { + case <-ctx.Done(): + errs = multierror.Append(errs, ctx.Err()) + return errs + + case e, ok := <-items: + if !ok { + return errs + } + + // For now assuming that item IDs don't need escaping. + itemPath, err := streamedEnts.FullPath().Append(e.UUID(), true) + if err != nil { + err = errors.Wrap(err, "getting full item path") + errs = multierror.Append(errs, err) + + log.Error(err) + + continue + } + + log.Debugw("reading item", "path", itemPath.String()) + trace.Log(ctx, "kopia:collectionEntries:item", itemPath.String()) + + // Not all items implement StreamInfo. For example, the metadata files + // do not because they don't contain information directly backed up or + // used for restore. If progress does not contain information about a + // finished file it just returns without an error so it's safe to skip + // adding something to it. + ei, ok := e.(data.StreamInfo) + if ok { + // Relative path given to us in the callback is missing the root + // element. Add to pending set before calling the callback to avoid race + // conditions when the item is completed. + d := &itemDetails{info: ei.Info(), repoPath: itemPath} + progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) + } + + modTime := time.Now() + if smt, ok := e.(data.StreamModTime); ok { + modTime = smt.ModTime() + } + + entry := virtualfs.StreamingFileWithModTimeFromReader( + encodeAsPath(e.UUID()), + modTime, + &backupStreamReader{ + version: serializationVersion, + ReadCloser: e.ToReader(), + }, + ) + if err := cb(ctx, entry); err != nil { + // Kopia's uploader swallows errors in most cases, so if we see + // something here it's probably a big issue and we should return. + errs = multierror.Append(errs, errors.Wrapf(err, "executing callback on %q", itemPath)) + return errs + } + } + } +} + +// getStreamItemFunc returns a function that can be used by kopia's +// virtualfs.StreamingDirectory to iterate through directory entries and call +// kopia callbacks on directory entries. It binds the directory to the given +// DataCollection. +func getStreamItemFunc( + staticEnts []fs.Entry, + streamedEnts data.Collection, + progress *corsoProgress, +) func(context.Context, func(context.Context, fs.Entry) error) error { + return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error { + ctx, end := D.Span(ctx, "kopia:getStreamItemFunc") + defer end() + + // Return static entries in this directory first. + for _, d := range staticEnts { + if err := cb(ctx, d); err != nil { + return errors.Wrap(err, "executing callback on static directory") + } + } + + errs := collectionEntries(ctx, cb, streamedEnts, progress) + + // TODO(ashmrtn): Stream entries from base snapshot if they exist. + + return errs.ErrorOrNil() + } +} + +// buildKopiaDirs recursively builds a directory hierarchy from the roots up. +// Returned directories are virtualfs.StreamingDirectory. +func buildKopiaDirs(dirName string, dir *treeMap, progress *corsoProgress) (fs.Directory, error) { + // Need to build the directory tree from the leaves up because intermediate + // directories need to have all their entries at creation time. + var childDirs []fs.Entry + + for childName, childDir := range dir.childDirs { + child, err := buildKopiaDirs(childName, childDir, progress) + if err != nil { + return nil, err + } + + childDirs = append(childDirs, child) + } + + return virtualfs.NewStreamingDirectory( + encodeAsPath(dirName), + getStreamItemFunc(childDirs, dir.collection, progress), + ), nil +} + +type treeMap struct { + childDirs map[string]*treeMap + collection data.Collection +} + +func newTreeMap() *treeMap { + return &treeMap{ + childDirs: map[string]*treeMap{}, + } +} + +// getTreeNode walks the tree(s) with roots roots and returns the node specified +// by pathElements. If pathElements is nil or empty then returns nil. Tree nodes +// are created for any path elements where a node is not already present. +func getTreeNode(roots map[string]*treeMap, pathElements []string) *treeMap { + if len(pathElements) == 0 { + return nil + } + + dir, ok := roots[pathElements[0]] + if !ok { + dir = newTreeMap() + roots[pathElements[0]] = dir + } + + // Use actual indices so this is automatically skipped if + // len(pathElements) == 1. + for i := 1; i < len(pathElements); i++ { + p := pathElements[i] + + newDir := dir.childDirs[p] + if newDir == nil { + newDir = newTreeMap() + + if dir.childDirs == nil { + dir.childDirs = map[string]*treeMap{} + } + + dir.childDirs[p] = newDir + } + + dir = newDir + } + + return dir +} + +func inflateCollectionTree( + ctx context.Context, + collections []data.Collection, +) (map[string]*treeMap, *OwnersCats, error) { + roots := make(map[string]*treeMap) + ownerCats := &OwnersCats{ + ResourceOwners: make(map[string]struct{}), + ServiceCats: make(map[string]struct{}), + } + + for _, s := range collections { + if s.FullPath() == nil || len(s.FullPath().Elements()) == 0 { + return nil, nil, errors.New("no identifier for collection") + } + + node := getTreeNode(roots, s.FullPath().Elements()) + if node == nil { + return nil, nil, errors.Errorf( + "unable to get tree node for path %s", + s.FullPath(), + ) + } + + serviceCat := serviceCatTag(s.FullPath()) + ownerCats.ServiceCats[serviceCat] = struct{}{} + ownerCats.ResourceOwners[s.FullPath().ResourceOwner()] = struct{}{} + + node.collection = s + } + + return roots, ownerCats, nil +} + +// inflateDirTree returns a set of tags representing all the resource owners and +// service/categories in the snapshot and a fs.Directory tree rooted at the +// oldest common ancestor of the streams. All nodes are +// virtualfs.StreamingDirectory with the given DataCollections if there is one +// for that node. Tags can be used in future backups to fetch old snapshots for +// caching reasons. +func inflateDirTree( + ctx context.Context, + collections []data.Collection, + progress *corsoProgress, +) (fs.Directory, *OwnersCats, error) { + roots, ownerCats, err := inflateCollectionTree(ctx, collections) + if err != nil { + return nil, nil, errors.Wrap(err, "inflating collection tree") + } + + if len(roots) > 1 { + return nil, nil, errors.New("multiple root directories") + } + + var res fs.Directory + + for dirName, dir := range roots { + tmp, err := buildKopiaDirs(dirName, dir, progress) + if err != nil { + return nil, nil, err + } + + res = tmp + } + + return res, ownerCats, nil +} diff --git a/src/internal/kopia/upload_test.go b/src/internal/kopia/upload_test.go new file mode 100644 index 000000000..dbb239734 --- /dev/null +++ b/src/internal/kopia/upload_test.go @@ -0,0 +1,685 @@ +package kopia + +import ( + "bytes" + "context" + "io" + stdpath "path" + "testing" + "unsafe" + + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/snapshot/snapshotfs" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/src/internal/connector/mockconnector" + "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/backup/details" + "github.com/alcionai/corso/src/pkg/path" +) + +func expectDirs( + t *testing.T, + entries []fs.Entry, + dirs []string, + exactly bool, +) { + t.Helper() + + if exactly { + require.Len(t, entries, len(dirs)) + } + + names := make([]string, 0, len(entries)) + for _, e := range entries { + names = append(names, e.Name()) + } + + assert.Subset(t, names, dirs) +} + +//revive:disable:context-as-argument +func getDirEntriesForEntry( + t *testing.T, + ctx context.Context, + entry fs.Entry, +) []fs.Entry { + //revive:enable:context-as-argument + d, ok := entry.(fs.Directory) + require.True(t, ok, "returned entry is not a directory") + + entries, err := fs.GetAllEntries(ctx, d) + require.NoError(t, err) + + return entries +} + +// --------------- +// unit tests +// --------------- +type limitedRangeReader struct { + readLen int + io.ReadCloser +} + +func (lrr *limitedRangeReader) Read(p []byte) (int, error) { + if len(p) == 0 { + // Not well specified behavior, defer to underlying reader. + return lrr.ReadCloser.Read(p) + } + + toRead := lrr.readLen + if len(p) < toRead { + toRead = len(p) + } + + return lrr.ReadCloser.Read(p[:toRead]) +} + +type VersionReadersUnitSuite struct { + suite.Suite +} + +func TestVersionReadersUnitSuite(t *testing.T) { + suite.Run(t, new(VersionReadersUnitSuite)) +} + +func (suite *VersionReadersUnitSuite) TestWriteAndRead() { + inputData := []byte("This is some data for the reader to test with") + table := []struct { + name string + readVersion uint32 + writeVersion uint32 + check assert.ErrorAssertionFunc + }{ + { + name: "SameVersionSucceeds", + readVersion: 42, + writeVersion: 42, + check: assert.NoError, + }, + { + name: "DifferentVersionsFail", + readVersion: 7, + writeVersion: 42, + check: assert.Error, + }, + } + + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + baseReader := bytes.NewReader(inputData) + + reversible := &restoreStreamReader{ + expectedVersion: test.readVersion, + ReadCloser: &backupStreamReader{ + version: test.writeVersion, + ReadCloser: io.NopCloser(baseReader), + }, + } + + defer reversible.Close() + + allData, err := io.ReadAll(reversible) + test.check(t, err) + + if err != nil { + return + } + + assert.Equal(t, inputData, allData) + }) + } +} + +func readAllInParts( + t *testing.T, + partLen int, + reader io.ReadCloser, +) ([]byte, int) { + res := []byte{} + read := 0 + tmp := make([]byte, partLen) + + for { + n, err := reader.Read(tmp) + if errors.Is(err, io.EOF) { + break + } + + require.NoError(t, err) + + read += n + res = append(res, tmp[:n]...) + } + + return res, read +} + +func (suite *VersionReadersUnitSuite) TestWriteHandlesShortReads() { + t := suite.T() + inputData := []byte("This is some data for the reader to test with") + version := uint32(42) + baseReader := bytes.NewReader(inputData) + versioner := &backupStreamReader{ + version: version, + ReadCloser: io.NopCloser(baseReader), + } + expectedToWrite := len(inputData) + int(unsafe.Sizeof(versioner.version)) + + // "Write" all the data. + versionedData, writtenLen := readAllInParts(t, 1, versioner) + assert.Equal(t, expectedToWrite, writtenLen) + + // Read all of the data back. + baseReader = bytes.NewReader(versionedData) + reader := &restoreStreamReader{ + expectedVersion: version, + // Be adversarial and only allow reads of length 1 from the byte reader. + ReadCloser: &limitedRangeReader{ + readLen: 1, + ReadCloser: io.NopCloser(baseReader), + }, + } + readData, readLen := readAllInParts(t, 1, reader) + // This reports the bytes read and returned to the user, excluding the version + // that is stripped off at the start. + assert.Equal(t, len(inputData), readLen) + assert.Equal(t, inputData, readData) +} + +type CorsoProgressUnitSuite struct { + suite.Suite + targetFilePath path.Path + targetFileName string +} + +func TestCorsoProgressUnitSuite(t *testing.T) { + suite.Run(t, new(CorsoProgressUnitSuite)) +} + +func (suite *CorsoProgressUnitSuite) SetupSuite() { + p, err := path.Builder{}.Append( + testInboxDir, + "testFile", + ).ToDataLayerExchangePathForCategory( + testTenant, + testUser, + path.EmailCategory, + true, + ) + require.NoError(suite.T(), err) + + suite.targetFilePath = p + suite.targetFileName = suite.targetFilePath.ToBuilder().Dir().String() +} + +type testInfo struct { + info *itemDetails + err error + totalBytes int64 +} + +var finishedFileTable = []struct { + name string + cachedItems func(fname string, fpath path.Path) map[string]testInfo + expectedBytes int64 + expectedNumEntries int + err error +}{ + { + name: "DetailsExist", + cachedItems: func(fname string, fpath path.Path) map[string]testInfo { + return map[string]testInfo{ + fname: { + info: &itemDetails{details.ItemInfo{}, fpath}, + err: nil, + totalBytes: 100, + }, + } + }, + expectedBytes: 100, + // 1 file and 5 folders. + expectedNumEntries: 6, + }, + { + name: "PendingNoDetails", + cachedItems: func(fname string, fpath path.Path) map[string]testInfo { + return map[string]testInfo{ + fname: { + info: nil, + err: nil, + }, + } + }, + expectedNumEntries: 0, + }, + { + name: "HadError", + cachedItems: func(fname string, fpath path.Path) map[string]testInfo { + return map[string]testInfo{ + fname: { + info: &itemDetails{details.ItemInfo{}, fpath}, + err: assert.AnError, + }, + } + }, + expectedNumEntries: 0, + }, + { + name: "NotPending", + cachedItems: func(fname string, fpath path.Path) map[string]testInfo { + return nil + }, + expectedNumEntries: 0, + }, +} + +func (suite *CorsoProgressUnitSuite) TestFinishedFile() { + for _, test := range finishedFileTable { + suite.T().Run(test.name, func(t *testing.T) { + bd := &details.Details{} + cp := corsoProgress{ + UploadProgress: &snapshotfs.NullUploadProgress{}, + deets: bd, + pending: map[string]*itemDetails{}, + } + + ci := test.cachedItems(suite.targetFileName, suite.targetFilePath) + + for k, v := range ci { + cp.put(k, v.info) + } + + require.Len(t, cp.pending, len(ci)) + + for k, v := range ci { + cp.FinishedFile(k, v.err) + } + + assert.Empty(t, cp.pending) + assert.Len(t, bd.Entries, test.expectedNumEntries) + }) + } +} + +func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchy() { + t := suite.T() + // Order of folders in hierarchy from root to leaf (excluding the item). + expectedFolderOrder := suite.targetFilePath.ToBuilder().Dir().Elements() + + // Setup stuff. + bd := &details.Details{} + cp := corsoProgress{ + UploadProgress: &snapshotfs.NullUploadProgress{}, + deets: bd, + pending: map[string]*itemDetails{}, + } + + deets := &itemDetails{details.ItemInfo{}, suite.targetFilePath} + cp.put(suite.targetFileName, deets) + require.Len(t, cp.pending, 1) + + cp.FinishedFile(suite.targetFileName, nil) + + // Gather information about the current state. + var ( + curRef *details.DetailsEntry + refToEntry = map[string]*details.DetailsEntry{} + ) + + for i := 0; i < len(bd.Entries); i++ { + e := &bd.Entries[i] + if e.Folder == nil { + continue + } + + refToEntry[e.ShortRef] = e + + if e.Folder.DisplayName == expectedFolderOrder[len(expectedFolderOrder)-1] { + curRef = e + } + } + + // Actual tests start here. + var rootRef *details.DetailsEntry + + // Traverse the details entries from leaf to root, following the ParentRef + // fields. At the end rootRef should point to the root of the path. + for i := len(expectedFolderOrder) - 1; i >= 0; i-- { + name := expectedFolderOrder[i] + + require.NotNil(t, curRef) + assert.Equal(t, name, curRef.Folder.DisplayName) + + rootRef = curRef + curRef = refToEntry[curRef.ParentRef] + } + + // Hierarchy root's ParentRef = "" and map will return nil. + assert.Nil(t, curRef) + require.NotNil(t, rootRef) + assert.Empty(t, rootRef.ParentRef) +} + +func (suite *CorsoProgressUnitSuite) TestFinishedHashingFile() { + for _, test := range finishedFileTable { + suite.T().Run(test.name, func(t *testing.T) { + bd := &details.Details{} + cp := corsoProgress{ + UploadProgress: &snapshotfs.NullUploadProgress{}, + deets: bd, + pending: map[string]*itemDetails{}, + } + + ci := test.cachedItems(suite.targetFileName, suite.targetFilePath) + + for k, v := range ci { + cp.FinishedHashingFile(k, v.totalBytes) + } + + assert.Empty(t, cp.pending) + assert.Equal(t, test.expectedBytes, cp.totalBytes) + }) + } +} + +type HierarchyBuilderUnitSuite struct { + suite.Suite + testPath path.Path +} + +func (suite *HierarchyBuilderUnitSuite) SetupSuite() { + tmp, err := path.FromDataLayerPath( + stdpath.Join( + testTenant, + path.ExchangeService.String(), + testUser, + path.EmailCategory.String(), + testInboxDir, + ), + false, + ) + require.NoError(suite.T(), err) + + suite.testPath = tmp +} + +func TestHierarchyBuilderUnitSuite(t *testing.T) { + suite.Run(t, new(HierarchyBuilderUnitSuite)) +} + +func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree() { + tester.LogTimeOfTest(suite.T()) + ctx, flush := tester.NewContext() + + defer flush() + + t := suite.T() + tenant := "a-tenant" + user1 := testUser + user1Encoded := encodeAsPath(user1) + user2 := "user2" + user2Encoded := encodeAsPath(user2) + + p2, err := path.FromDataLayerPath( + stdpath.Join( + tenant, + service, + user2, + category, + testInboxDir, + ), + false, + ) + require.NoError(t, err) + + // Encode user names here so we don't have to decode things later. + expectedFileCount := map[string]int{ + user1Encoded: 5, + user2Encoded: 42, + } + expectedServiceCats := map[string]struct{}{ + serviceCatTag(suite.testPath): {}, + serviceCatTag(p2): {}, + } + expectedResourceOwners := map[string]struct{}{ + suite.testPath.ResourceOwner(): {}, + p2.ResourceOwner(): {}, + } + + progress := &corsoProgress{pending: map[string]*itemDetails{}} + + collections := []data.Collection{ + mockconnector.NewMockExchangeCollection( + suite.testPath, + expectedFileCount[user1Encoded], + ), + mockconnector.NewMockExchangeCollection( + p2, + expectedFileCount[user2Encoded], + ), + } + + // Returned directory structure should look like: + // - a-tenant + // - exchange + // - user1 + // - emails + // - Inbox + // - 5 separate files + // - user2 + // - emails + // - Inbox + // - 42 separate files + dirTree, oc, err := inflateDirTree(ctx, collections, progress) + require.NoError(t, err) + + assert.Equal(t, expectedServiceCats, oc.ServiceCats) + assert.Equal(t, expectedResourceOwners, oc.ResourceOwners) + + assert.Equal(t, encodeAsPath(testTenant), dirTree.Name()) + + entries, err := fs.GetAllEntries(ctx, dirTree) + require.NoError(t, err) + + expectDirs(t, entries, encodeElements(service), true) + + entries = getDirEntriesForEntry(t, ctx, entries[0]) + expectDirs(t, entries, encodeElements(user1, user2), true) + + for _, entry := range entries { + userName := entry.Name() + + entries = getDirEntriesForEntry(t, ctx, entry) + expectDirs(t, entries, encodeElements(category), true) + + entries = getDirEntriesForEntry(t, ctx, entries[0]) + expectDirs(t, entries, encodeElements(testInboxDir), true) + + entries = getDirEntriesForEntry(t, ctx, entries[0]) + assert.Len(t, entries, expectedFileCount[userName]) + } + + totalFileCount := 0 + for _, c := range expectedFileCount { + totalFileCount += c + } + + assert.Len(t, progress.pending, totalFileCount) +} + +func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_MixedDirectory() { + ctx, flush := tester.NewContext() + defer flush() + + subdir := "subfolder" + + p2, err := suite.testPath.Append(subdir, false) + require.NoError(suite.T(), err) + + expectedServiceCats := map[string]struct{}{ + serviceCatTag(suite.testPath): {}, + serviceCatTag(p2): {}, + } + expectedResourceOwners := map[string]struct{}{ + suite.testPath.ResourceOwner(): {}, + p2.ResourceOwner(): {}, + } + + // Test multiple orders of items because right now order can matter. Both + // orders result in a directory structure like: + // - a-tenant + // - exchange + // - user1 + // - emails + // - Inbox + // - subfolder + // - 5 separate files + // - 42 separate files + table := []struct { + name string + layout []data.Collection + }{ + { + name: "SubdirFirst", + layout: []data.Collection{ + mockconnector.NewMockExchangeCollection( + p2, + 5, + ), + mockconnector.NewMockExchangeCollection( + suite.testPath, + 42, + ), + }, + }, + { + name: "SubdirLast", + layout: []data.Collection{ + mockconnector.NewMockExchangeCollection( + suite.testPath, + 42, + ), + mockconnector.NewMockExchangeCollection( + p2, + 5, + ), + }, + }, + } + + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + progress := &corsoProgress{pending: map[string]*itemDetails{}} + + dirTree, oc, err := inflateDirTree(ctx, test.layout, progress) + require.NoError(t, err) + + assert.Equal(t, expectedServiceCats, oc.ServiceCats) + assert.Equal(t, expectedResourceOwners, oc.ResourceOwners) + + assert.Equal(t, encodeAsPath(testTenant), dirTree.Name()) + + entries, err := fs.GetAllEntries(ctx, dirTree) + require.NoError(t, err) + + expectDirs(t, entries, encodeElements(service), true) + + entries = getDirEntriesForEntry(t, ctx, entries[0]) + expectDirs(t, entries, encodeElements(testUser), true) + + entries = getDirEntriesForEntry(t, ctx, entries[0]) + expectDirs(t, entries, encodeElements(category), true) + + entries = getDirEntriesForEntry(t, ctx, entries[0]) + expectDirs(t, entries, encodeElements(testInboxDir), true) + + entries = getDirEntriesForEntry(t, ctx, entries[0]) + // 42 files and 1 subdirectory. + assert.Len(t, entries, 43) + + // One of these entries should be a subdirectory with items in it. + subDirs := []fs.Directory(nil) + for _, e := range entries { + d, ok := e.(fs.Directory) + if !ok { + continue + } + + subDirs = append(subDirs, d) + assert.Equal(t, encodeAsPath(subdir), d.Name()) + } + + require.Len(t, subDirs, 1) + + entries = getDirEntriesForEntry(t, ctx, entries[0]) + assert.Len(t, entries, 5) + }) + } +} + +func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_Fails() { + p2, err := path.Builder{}.Append(testInboxDir).ToDataLayerExchangePathForCategory( + "tenant2", + "user2", + path.EmailCategory, + false, + ) + require.NoError(suite.T(), err) + + table := []struct { + name string + layout []data.Collection + }{ + { + "MultipleRoots", + // Directory structure would look like: + // - tenant1 + // - exchange + // - user1 + // - emails + // - Inbox + // - 5 separate files + // - tenant2 + // - exchange + // - user2 + // - emails + // - Inbox + // - 42 separate files + []data.Collection{ + mockconnector.NewMockExchangeCollection( + suite.testPath, + 5, + ), + mockconnector.NewMockExchangeCollection( + p2, + 42, + ), + }, + }, + { + "NoCollectionPath", + []data.Collection{ + mockconnector.NewMockExchangeCollection( + nil, + 5, + ), + }, + }, + } + + for _, test := range table { + ctx, flush := tester.NewContext() + defer flush() + + suite.T().Run(test.name, func(t *testing.T) { + _, _, err := inflateDirTree(ctx, test.layout, nil) + assert.Error(t, err) + }) + } +} diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index fc5f24cff..d1f043bbd 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -2,17 +2,9 @@ package kopia import ( "context" - "encoding/binary" - "io" - "runtime/trace" - "sync" - "sync/atomic" - "time" - "unsafe" "github.com/hashicorp/go-multierror" "github.com/kopia/kopia/fs" - "github.com/kopia/kopia/fs/virtualfs" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/snapshot" @@ -41,86 +33,8 @@ const ( var ( errNotConnected = errors.New("not connected to repo") errNoRestorePath = errors.New("no restore path given") - - versionSize = int(unsafe.Sizeof(serializationVersion)) ) -// backupStreamReader is a wrapper around the io.Reader that other Corso -// components return when backing up information. It injects a version number at -// the start of the data stream. Future versions of Corso may not need this if -// they use more complex serialization logic as serialization/version injection -// will be handled by other components. -type backupStreamReader struct { - io.ReadCloser - version uint32 - readBytes int -} - -func (rw *backupStreamReader) Read(p []byte) (n int, err error) { - if rw.readBytes < versionSize { - marshalled := make([]byte, versionSize) - - toCopy := len(marshalled) - rw.readBytes - if len(p) < toCopy { - toCopy = len(p) - } - - binary.BigEndian.PutUint32(marshalled, rw.version) - - copy(p, marshalled[rw.readBytes:rw.readBytes+toCopy]) - rw.readBytes += toCopy - - return toCopy, nil - } - - return rw.ReadCloser.Read(p) -} - -// restoreStreamReader is a wrapper around the io.Reader that kopia returns when -// reading data from an item. It examines and strips off the version number of -// the restored data. Future versions of Corso may not need this if they use -// more complex serialization logic as version checking/deserialization will be -// handled by other components. A reader that returns a version error is no -// longer valid and should not be used once the version error is returned. -type restoreStreamReader struct { - io.ReadCloser - expectedVersion uint32 - readVersion bool -} - -func (rw *restoreStreamReader) checkVersion() error { - versionBuf := make([]byte, versionSize) - - for newlyRead := 0; newlyRead < versionSize; { - n, err := rw.ReadCloser.Read(versionBuf[newlyRead:]) - if err != nil { - return errors.Wrap(err, "reading data format version") - } - - newlyRead += n - } - - version := binary.BigEndian.Uint32(versionBuf) - - if version != rw.expectedVersion { - return errors.Errorf("unexpected data format %v", version) - } - - return nil -} - -func (rw *restoreStreamReader) Read(p []byte) (n int, err error) { - if !rw.readVersion { - rw.readVersion = true - - if err := rw.checkVersion(); err != nil { - return 0, err - } - } - - return rw.ReadCloser.Read(p) -} - type BackupStats struct { SnapshotID string @@ -161,95 +75,6 @@ func manifestToStats( } } -type itemDetails struct { - info details.ItemInfo - repoPath path.Path -} - -type corsoProgress struct { - snapshotfs.UploadProgress - pending map[string]*itemDetails - deets *details.Details - mu sync.RWMutex - totalBytes int64 -} - -// Kopia interface function used as a callback when kopia finishes processing a -// file. -func (cp *corsoProgress) FinishedFile(relativePath string, err error) { - // Pass the call through as well so we don't break expected functionality. - defer cp.UploadProgress.FinishedFile(relativePath, err) - // Whether it succeeded or failed, remove the entry from our pending set so we - // don't leak references. - defer func() { - cp.mu.Lock() - defer cp.mu.Unlock() - - delete(cp.pending, relativePath) - }() - - if err != nil { - return - } - - d := cp.get(relativePath) - if d == nil { - return - } - - parent := d.repoPath.ToBuilder().Dir() - - cp.deets.Add( - d.repoPath.String(), - d.repoPath.ShortRef(), - parent.ShortRef(), - d.info, - ) - - folders := []details.FolderEntry{} - - for len(parent.Elements()) > 0 { - nextParent := parent.Dir() - - folders = append(folders, details.FolderEntry{ - RepoRef: parent.String(), - ShortRef: parent.ShortRef(), - ParentRef: nextParent.ShortRef(), - Info: details.ItemInfo{ - Folder: &details.FolderInfo{ - DisplayName: parent.Elements()[len(parent.Elements())-1], - }, - }, - }) - - parent = nextParent - } - - cp.deets.AddFolders(folders) -} - -// Kopia interface function used as a callback when kopia finishes hashing a file. -func (cp *corsoProgress) FinishedHashingFile(fname string, bytes int64) { - // Pass the call through as well so we don't break expected functionality. - defer cp.UploadProgress.FinishedHashingFile(fname, bytes) - - atomic.AddInt64(&cp.totalBytes, bytes) -} - -func (cp *corsoProgress) put(k string, v *itemDetails) { - cp.mu.Lock() - defer cp.mu.Unlock() - - cp.pending[k] = v -} - -func (cp *corsoProgress) get(k string) *itemDetails { - cp.mu.RLock() - defer cp.mu.RUnlock() - - return cp.pending[k] -} - func NewWrapper(c *conn) (*Wrapper, error) { if err := c.wrap(); err != nil { return nil, errors.Wrap(err, "creating Wrapper") @@ -273,226 +98,6 @@ func (w *Wrapper) Close(ctx context.Context) error { return errors.Wrap(err, "closing Wrapper") } -// getStreamItemFunc returns a function that can be used by kopia's -// virtualfs.StreamingDirectory to iterate through directory entries and call -// kopia callbacks on directory entries. It binds the directory to the given -// DataCollection. -func getStreamItemFunc( - staticEnts []fs.Entry, - streamedEnts data.Collection, - progress *corsoProgress, -) func(context.Context, func(context.Context, fs.Entry) error) error { - return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error { - ctx, end := D.Span(ctx, "kopia:getStreamItemFunc") - defer end() - - log := logger.Ctx(ctx) - - // Collect all errors and return them at the end so that iteration for this - // directory doesn't end early. - var errs *multierror.Error - - // Return static entries in this directory first. - for _, d := range staticEnts { - if err := cb(ctx, d); err != nil { - return errors.Wrap(err, "executing callback on static directory") - } - } - - if streamedEnts == nil { - return nil - } - - items := streamedEnts.Items() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - - case e, ok := <-items: - if !ok { - return errs.ErrorOrNil() - } - - // For now assuming that item IDs don't need escaping. - itemPath, err := streamedEnts.FullPath().Append(e.UUID(), true) - if err != nil { - err = errors.Wrap(err, "getting full item path") - errs = multierror.Append(errs, err) - - log.Error(err) - - continue - } - - log.Debugw("reading item", "path", itemPath.String()) - trace.Log(ctx, "kopia:getStreamItemFunc:item", itemPath.String()) - - // Not all items implement StreamInfo. For example, the metadata files - // do not because they don't contain information directly backed up or - // used for restore. If progress does not contain information about a - // finished file it just returns without an error so it's safe to skip - // adding something to it. - ei, ok := e.(data.StreamInfo) - if ok { - // Relative path given to us in the callback is missing the root - // element. Add to pending set before calling the callback to avoid race - // conditions when the item is completed. - d := &itemDetails{info: ei.Info(), repoPath: itemPath} - progress.put(encodeAsPath(itemPath.PopFront().Elements()...), d) - } - - modTime := time.Now() - if smt, ok := e.(data.StreamModTime); ok { - modTime = smt.ModTime() - } - - entry := virtualfs.StreamingFileWithModTimeFromReader( - encodeAsPath(e.UUID()), - modTime, - &backupStreamReader{ - version: serializationVersion, - ReadCloser: e.ToReader(), - }, - ) - if err := cb(ctx, entry); err != nil { - // Kopia's uploader swallows errors in most cases, so if we see - // something here it's probably a big issue and we should return. - errs = multierror.Append(errs, errors.Wrapf(err, "executing callback on %q", itemPath)) - return errs.ErrorOrNil() - } - } - } - } -} - -// buildKopiaDirs recursively builds a directory hierarchy from the roots up. -// Returned directories are virtualfs.StreamingDirectory. -func buildKopiaDirs(dirName string, dir *treeMap, progress *corsoProgress) (fs.Directory, error) { - // Need to build the directory tree from the leaves up because intermediate - // directories need to have all their entries at creation time. - var childDirs []fs.Entry - - for childName, childDir := range dir.childDirs { - child, err := buildKopiaDirs(childName, childDir, progress) - if err != nil { - return nil, err - } - - childDirs = append(childDirs, child) - } - - return virtualfs.NewStreamingDirectory( - encodeAsPath(dirName), - getStreamItemFunc(childDirs, dir.collection, progress), - ), nil -} - -type treeMap struct { - childDirs map[string]*treeMap - collection data.Collection -} - -func newTreeMap() *treeMap { - return &treeMap{ - childDirs: map[string]*treeMap{}, - } -} - -// inflateDirTree returns a set of tags representing all the resource owners and -// service/categories in the snapshot and a fs.Directory tree rooted at the -// oldest common ancestor of the streams. All nodes are -// virtualfs.StreamingDirectory with the given DataCollections if there is one -// for that node. Tags can be used in future backups to fetch old snapshots for -// caching reasons. -func inflateDirTree( - ctx context.Context, - collections []data.Collection, - progress *corsoProgress, -) (fs.Directory, *OwnersCats, error) { - roots := make(map[string]*treeMap) - ownerCats := &OwnersCats{ - ResourceOwners: make(map[string]struct{}), - ServiceCats: make(map[string]struct{}), - } - - for _, s := range collections { - if s.FullPath() == nil { - return nil, nil, errors.New("no identifier for collection") - } - - serviceCat := serviceCatTag(s.FullPath()) - ownerCats.ServiceCats[serviceCat] = struct{}{} - ownerCats.ResourceOwners[s.FullPath().ResourceOwner()] = struct{}{} - - itemPath := s.FullPath().Elements() - - if len(itemPath) == 0 { - return nil, nil, errors.New("no identifier for collection") - } - - dir, ok := roots[itemPath[0]] - if !ok { - dir = newTreeMap() - roots[itemPath[0]] = dir - } - - // Single DataCollection with no ancestors. - if len(itemPath) == 1 { - dir.collection = s - continue - } - - for _, p := range itemPath[1 : len(itemPath)-1] { - newDir := dir.childDirs[p] - if newDir == nil { - newDir = newTreeMap() - - if dir.childDirs == nil { - dir.childDirs = map[string]*treeMap{} - } - - dir.childDirs[p] = newDir - } - - dir = newDir - } - - // At this point we have all the ancestor directories of this DataCollection - // as treeMap objects and `dir` is the parent directory of this - // DataCollection. - - end := len(itemPath) - 1 - - // Make sure this entry doesn't already exist. - tmpDir := dir.childDirs[itemPath[end]] - if tmpDir == nil { - tmpDir = newTreeMap() - dir.childDirs[itemPath[end]] = tmpDir - } - - tmpDir.collection = s - } - - if len(roots) > 1 { - return nil, nil, errors.New("multiple root directories") - } - - var res fs.Directory - - for dirName, dir := range roots { - tmp, err := buildKopiaDirs(dirName, dir, progress) - if err != nil { - return nil, nil, err - } - - res = tmp - } - - return res, ownerCats, nil -} - // BackupCollections takes a set of collections and creates a kopia snapshot // with the data that they contain. previousSnapshots is used for incremental // backups and should represent the base snapshot from which metadata is sourced diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index b6e4f8319..445e8f340 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -3,19 +3,15 @@ package kopia import ( "bytes" "context" - "errors" "io" "io/ioutil" stdpath "path" "testing" - "unsafe" "github.com/google/uuid" - "github.com/kopia/kopia/fs" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/snapshot" - "github.com/kopia/kopia/snapshot/snapshotfs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -24,7 +20,6 @@ import ( "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/model" "github.com/alcionai/corso/src/internal/tester" - "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" ) @@ -88,42 +83,6 @@ func testForFiles( assert.Equal(t, len(expected), count) } -func expectDirs( - t *testing.T, - entries []fs.Entry, - dirs []string, - exactly bool, -) { - t.Helper() - - if exactly { - require.Len(t, entries, len(dirs)) - } - - names := make([]string, 0, len(entries)) - for _, e := range entries { - names = append(names, e.Name()) - } - - assert.Subset(t, names, dirs) -} - -//revive:disable:context-as-argument -func getDirEntriesForEntry( - t *testing.T, - ctx context.Context, - entry fs.Entry, -) []fs.Entry { - //revive:enable:context-as-argument - d, ok := entry.(fs.Directory) - require.True(t, ok, "returned entry is not a directory") - - entries, err := fs.GetAllEntries(ctx, d) - require.NoError(t, err) - - return entries -} - //revive:disable:context-as-argument func checkSnapshotTags( t *testing.T, @@ -141,333 +100,6 @@ func checkSnapshotTags( // --------------- // unit tests // --------------- -type limitedRangeReader struct { - readLen int - io.ReadCloser -} - -func (lrr *limitedRangeReader) Read(p []byte) (int, error) { - if len(p) == 0 { - // Not well specified behavior, defer to underlying reader. - return lrr.ReadCloser.Read(p) - } - - toRead := lrr.readLen - if len(p) < toRead { - toRead = len(p) - } - - return lrr.ReadCloser.Read(p[:toRead]) -} - -type VersionReadersUnitSuite struct { - suite.Suite -} - -func TestVersionReadersUnitSuite(t *testing.T) { - suite.Run(t, new(VersionReadersUnitSuite)) -} - -func (suite *VersionReadersUnitSuite) TestWriteAndRead() { - inputData := []byte("This is some data for the reader to test with") - table := []struct { - name string - readVersion uint32 - writeVersion uint32 - check assert.ErrorAssertionFunc - }{ - { - name: "SameVersionSucceeds", - readVersion: 42, - writeVersion: 42, - check: assert.NoError, - }, - { - name: "DifferentVersionsFail", - readVersion: 7, - writeVersion: 42, - check: assert.Error, - }, - } - - for _, test := range table { - suite.T().Run(test.name, func(t *testing.T) { - baseReader := bytes.NewReader(inputData) - - reversible := &restoreStreamReader{ - expectedVersion: test.readVersion, - ReadCloser: &backupStreamReader{ - version: test.writeVersion, - ReadCloser: io.NopCloser(baseReader), - }, - } - - defer reversible.Close() - - allData, err := io.ReadAll(reversible) - test.check(t, err) - - if err != nil { - return - } - - assert.Equal(t, inputData, allData) - }) - } -} - -func readAllInParts( - t *testing.T, - partLen int, - reader io.ReadCloser, -) ([]byte, int) { - res := []byte{} - read := 0 - tmp := make([]byte, partLen) - - for { - n, err := reader.Read(tmp) - if errors.Is(err, io.EOF) { - break - } - - require.NoError(t, err) - - read += n - res = append(res, tmp[:n]...) - } - - return res, read -} - -func (suite *VersionReadersUnitSuite) TestWriteHandlesShortReads() { - t := suite.T() - inputData := []byte("This is some data for the reader to test with") - version := uint32(42) - baseReader := bytes.NewReader(inputData) - versioner := &backupStreamReader{ - version: version, - ReadCloser: io.NopCloser(baseReader), - } - expectedToWrite := len(inputData) + int(unsafe.Sizeof(versioner.version)) - - // "Write" all the data. - versionedData, writtenLen := readAllInParts(t, 1, versioner) - assert.Equal(t, expectedToWrite, writtenLen) - - // Read all of the data back. - baseReader = bytes.NewReader(versionedData) - reader := &restoreStreamReader{ - expectedVersion: version, - // Be adversarial and only allow reads of length 1 from the byte reader. - ReadCloser: &limitedRangeReader{ - readLen: 1, - ReadCloser: io.NopCloser(baseReader), - }, - } - readData, readLen := readAllInParts(t, 1, reader) - // This reports the bytes read and returned to the user, excluding the version - // that is stripped off at the start. - assert.Equal(t, len(inputData), readLen) - assert.Equal(t, inputData, readData) -} - -type CorsoProgressUnitSuite struct { - suite.Suite - targetFilePath path.Path - targetFileName string -} - -func TestCorsoProgressUnitSuite(t *testing.T) { - suite.Run(t, new(CorsoProgressUnitSuite)) -} - -func (suite *CorsoProgressUnitSuite) SetupSuite() { - p, err := path.Builder{}.Append( - testInboxDir, - "testFile", - ).ToDataLayerExchangePathForCategory( - testTenant, - testUser, - path.EmailCategory, - true, - ) - require.NoError(suite.T(), err) - - suite.targetFilePath = p - suite.targetFileName = suite.targetFilePath.ToBuilder().Dir().String() -} - -type testInfo struct { - info *itemDetails - err error - totalBytes int64 -} - -var finishedFileTable = []struct { - name string - cachedItems func(fname string, fpath path.Path) map[string]testInfo - expectedBytes int64 - expectedNumEntries int - err error -}{ - { - name: "DetailsExist", - cachedItems: func(fname string, fpath path.Path) map[string]testInfo { - return map[string]testInfo{ - fname: { - info: &itemDetails{details.ItemInfo{}, fpath}, - err: nil, - totalBytes: 100, - }, - } - }, - expectedBytes: 100, - // 1 file and 5 folders. - expectedNumEntries: 6, - }, - { - name: "PendingNoDetails", - cachedItems: func(fname string, fpath path.Path) map[string]testInfo { - return map[string]testInfo{ - fname: { - info: nil, - err: nil, - }, - } - }, - expectedNumEntries: 0, - }, - { - name: "HadError", - cachedItems: func(fname string, fpath path.Path) map[string]testInfo { - return map[string]testInfo{ - fname: { - info: &itemDetails{details.ItemInfo{}, fpath}, - err: assert.AnError, - }, - } - }, - expectedNumEntries: 0, - }, - { - name: "NotPending", - cachedItems: func(fname string, fpath path.Path) map[string]testInfo { - return nil - }, - expectedNumEntries: 0, - }, -} - -func (suite *CorsoProgressUnitSuite) TestFinishedFile() { - for _, test := range finishedFileTable { - suite.T().Run(test.name, func(t *testing.T) { - bd := &details.Details{} - cp := corsoProgress{ - UploadProgress: &snapshotfs.NullUploadProgress{}, - deets: bd, - pending: map[string]*itemDetails{}, - } - - ci := test.cachedItems(suite.targetFileName, suite.targetFilePath) - - for k, v := range ci { - cp.put(k, v.info) - } - - require.Len(t, cp.pending, len(ci)) - - for k, v := range ci { - cp.FinishedFile(k, v.err) - } - - assert.Empty(t, cp.pending) - assert.Len(t, bd.Entries, test.expectedNumEntries) - }) - } -} - -func (suite *CorsoProgressUnitSuite) TestFinishedFileBuildsHierarchy() { - t := suite.T() - // Order of folders in hierarchy from root to leaf (excluding the item). - expectedFolderOrder := suite.targetFilePath.ToBuilder().Dir().Elements() - - // Setup stuff. - bd := &details.Details{} - cp := corsoProgress{ - UploadProgress: &snapshotfs.NullUploadProgress{}, - deets: bd, - pending: map[string]*itemDetails{}, - } - - deets := &itemDetails{details.ItemInfo{}, suite.targetFilePath} - cp.put(suite.targetFileName, deets) - require.Len(t, cp.pending, 1) - - cp.FinishedFile(suite.targetFileName, nil) - - // Gather information about the current state. - var ( - curRef *details.DetailsEntry - refToEntry = map[string]*details.DetailsEntry{} - ) - - for i := 0; i < len(bd.Entries); i++ { - e := &bd.Entries[i] - if e.Folder == nil { - continue - } - - refToEntry[e.ShortRef] = e - - if e.Folder.DisplayName == expectedFolderOrder[len(expectedFolderOrder)-1] { - curRef = e - } - } - - // Actual tests start here. - var rootRef *details.DetailsEntry - - // Traverse the details entries from leaf to root, following the ParentRef - // fields. At the end rootRef should point to the root of the path. - for i := len(expectedFolderOrder) - 1; i >= 0; i-- { - name := expectedFolderOrder[i] - - require.NotNil(t, curRef) - assert.Equal(t, name, curRef.Folder.DisplayName) - - rootRef = curRef - curRef = refToEntry[curRef.ParentRef] - } - - // Hierarchy root's ParentRef = "" and map will return nil. - assert.Nil(t, curRef) - require.NotNil(t, rootRef) - assert.Empty(t, rootRef.ParentRef) -} - -func (suite *CorsoProgressUnitSuite) TestFinishedHashingFile() { - for _, test := range finishedFileTable { - suite.T().Run(test.name, func(t *testing.T) { - bd := &details.Details{} - cp := corsoProgress{ - UploadProgress: &snapshotfs.NullUploadProgress{}, - deets: bd, - pending: map[string]*itemDetails{}, - } - - ci := test.cachedItems(suite.targetFileName, suite.targetFilePath) - - for k, v := range ci { - cp.FinishedHashingFile(k, v.totalBytes) - } - - assert.Empty(t, cp.pending) - assert.Equal(t, test.expectedBytes, cp.totalBytes) - }) - } -} - type KopiaUnitSuite struct { suite.Suite testPath path.Path @@ -503,277 +135,6 @@ func (suite *KopiaUnitSuite) TestCloseWithoutInitDoesNotPanic() { }) } -func (suite *KopiaUnitSuite) TestBuildDirectoryTree() { - tester.LogTimeOfTest(suite.T()) - ctx, flush := tester.NewContext() - - defer flush() - - t := suite.T() - tenant := "a-tenant" - user1 := testUser - user1Encoded := encodeAsPath(user1) - user2 := "user2" - user2Encoded := encodeAsPath(user2) - - p2, err := path.FromDataLayerPath( - stdpath.Join( - tenant, - service, - user2, - category, - testInboxDir, - ), - false, - ) - require.NoError(t, err) - - // Encode user names here so we don't have to decode things later. - expectedFileCount := map[string]int{ - user1Encoded: 5, - user2Encoded: 42, - } - expectedServiceCats := map[string]struct{}{ - serviceCatTag(suite.testPath): {}, - serviceCatTag(p2): {}, - } - expectedResourceOwners := map[string]struct{}{ - suite.testPath.ResourceOwner(): {}, - p2.ResourceOwner(): {}, - } - - progress := &corsoProgress{pending: map[string]*itemDetails{}} - - collections := []data.Collection{ - mockconnector.NewMockExchangeCollection( - suite.testPath, - expectedFileCount[user1Encoded], - ), - mockconnector.NewMockExchangeCollection( - p2, - expectedFileCount[user2Encoded], - ), - } - - // Returned directory structure should look like: - // - a-tenant - // - exchange - // - user1 - // - emails - // - Inbox - // - 5 separate files - // - user2 - // - emails - // - Inbox - // - 42 separate files - dirTree, oc, err := inflateDirTree(ctx, collections, progress) - require.NoError(t, err) - - assert.Equal(t, expectedServiceCats, oc.ServiceCats) - assert.Equal(t, expectedResourceOwners, oc.ResourceOwners) - - assert.Equal(t, encodeAsPath(testTenant), dirTree.Name()) - - entries, err := fs.GetAllEntries(ctx, dirTree) - require.NoError(t, err) - - expectDirs(t, entries, encodeElements(service), true) - - entries = getDirEntriesForEntry(t, ctx, entries[0]) - expectDirs(t, entries, encodeElements(user1, user2), true) - - for _, entry := range entries { - userName := entry.Name() - - entries = getDirEntriesForEntry(t, ctx, entry) - expectDirs(t, entries, encodeElements(category), true) - - entries = getDirEntriesForEntry(t, ctx, entries[0]) - expectDirs(t, entries, encodeElements(testInboxDir), true) - - entries = getDirEntriesForEntry(t, ctx, entries[0]) - assert.Len(t, entries, expectedFileCount[userName]) - } - - totalFileCount := 0 - for _, c := range expectedFileCount { - totalFileCount += c - } - - assert.Len(t, progress.pending, totalFileCount) -} - -func (suite *KopiaUnitSuite) TestBuildDirectoryTree_MixedDirectory() { - ctx, flush := tester.NewContext() - defer flush() - - subdir := "subfolder" - - p2, err := suite.testPath.Append(subdir, false) - require.NoError(suite.T(), err) - - expectedServiceCats := map[string]struct{}{ - serviceCatTag(suite.testPath): {}, - serviceCatTag(p2): {}, - } - expectedResourceOwners := map[string]struct{}{ - suite.testPath.ResourceOwner(): {}, - p2.ResourceOwner(): {}, - } - - // Test multiple orders of items because right now order can matter. Both - // orders result in a directory structure like: - // - a-tenant - // - exchange - // - user1 - // - emails - // - Inbox - // - subfolder - // - 5 separate files - // - 42 separate files - table := []struct { - name string - layout []data.Collection - }{ - { - name: "SubdirFirst", - layout: []data.Collection{ - mockconnector.NewMockExchangeCollection( - p2, - 5, - ), - mockconnector.NewMockExchangeCollection( - suite.testPath, - 42, - ), - }, - }, - { - name: "SubdirLast", - layout: []data.Collection{ - mockconnector.NewMockExchangeCollection( - suite.testPath, - 42, - ), - mockconnector.NewMockExchangeCollection( - p2, - 5, - ), - }, - }, - } - - for _, test := range table { - suite.T().Run(test.name, func(t *testing.T) { - progress := &corsoProgress{pending: map[string]*itemDetails{}} - - dirTree, oc, err := inflateDirTree(ctx, test.layout, progress) - require.NoError(t, err) - - assert.Equal(t, expectedServiceCats, oc.ServiceCats) - assert.Equal(t, expectedResourceOwners, oc.ResourceOwners) - - assert.Equal(t, encodeAsPath(testTenant), dirTree.Name()) - - entries, err := fs.GetAllEntries(ctx, dirTree) - require.NoError(t, err) - - expectDirs(t, entries, encodeElements(service), true) - - entries = getDirEntriesForEntry(t, ctx, entries[0]) - expectDirs(t, entries, encodeElements(testUser), true) - - entries = getDirEntriesForEntry(t, ctx, entries[0]) - expectDirs(t, entries, encodeElements(category), true) - - entries = getDirEntriesForEntry(t, ctx, entries[0]) - expectDirs(t, entries, encodeElements(testInboxDir), true) - - entries = getDirEntriesForEntry(t, ctx, entries[0]) - // 42 files and 1 subdirectory. - assert.Len(t, entries, 43) - - // One of these entries should be a subdirectory with items in it. - subDirs := []fs.Directory(nil) - for _, e := range entries { - d, ok := e.(fs.Directory) - if !ok { - continue - } - - subDirs = append(subDirs, d) - assert.Equal(t, encodeAsPath(subdir), d.Name()) - } - - require.Len(t, subDirs, 1) - - entries = getDirEntriesForEntry(t, ctx, entries[0]) - assert.Len(t, entries, 5) - }) - } -} - -func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() { - p2, err := path.Builder{}.Append(testInboxDir).ToDataLayerExchangePathForCategory( - "tenant2", - "user2", - path.EmailCategory, - false, - ) - require.NoError(suite.T(), err) - - table := []struct { - name string - layout []data.Collection - }{ - { - "MultipleRoots", - // Directory structure would look like: - // - tenant1 - // - exchange - // - user1 - // - emails - // - Inbox - // - 5 separate files - // - tenant2 - // - exchange - // - user2 - // - emails - // - Inbox - // - 42 separate files - []data.Collection{ - mockconnector.NewMockExchangeCollection( - suite.testPath, - 5, - ), - mockconnector.NewMockExchangeCollection( - p2, - 42, - ), - }, - }, - { - "NoCollectionPath", - []data.Collection{ - mockconnector.NewMockExchangeCollection( - nil, - 5, - ), - }, - }, - } - - for _, test := range table { - ctx, flush := tester.NewContext() - defer flush() - - suite.T().Run(test.name, func(t *testing.T) { - _, _, err := inflateDirTree(ctx, test.layout, nil) - assert.Error(t, err) - }) - } -} - // --------------- // integration tests that use kopia // ---------------