Smarter item load ordering during ProduceRestoreCollections (#3294)
Optimize loading items for restore a little bit by first grouping items by directory, loading the directory once, and then loading all items from the loaded directory. This brings item loading on my local machine (communicating with remote S3) down to ~1.5min/1k items Future improvements could lazily load items as they're returned in the Items() call of each collection but doing so would change the semantics of ProduceRestoreCollections() (specifically item not found errors would be returned during Items() instead of during ProduceRestoreCollections()) The kopia data collection has also been updated to hold onto a reference to the folder it corresponds to. This folder reference is used to service Fetch() calls --- #### Does this PR need a docs update or release note? - [x] ✅ Yes, it's included - [ ] 🕐 Yes, but in a later PR - [ ] ⛔ No #### Type of change - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Supportability/Tests - [ ] 💻 CI/Deployment - [ ] 🧹 Tech Debt/Cleanup #### Issue(s) * #3293 #### Test Plan - [ ] 💪 Manual - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
d7839ac607
commit
bcde15689f
@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- POST Retries following certain status codes (500, 502, 504) will re-use the post body instead of retrying with a no-content request.
|
||||
- Fix nil pointer exception when running an incremental backup on SharePoint where the base backup used an older index data format.
|
||||
- --user and --mailbox flags (already not supported) have been removed from CLI examples for details and restore commands.
|
||||
- Improve restore time on large restores by optimizing how items are loaded from the remote repository.
|
||||
|
||||
## [v0.7.0] (beta) - 2023-05-02
|
||||
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/pkg/fault"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
@ -18,10 +19,25 @@ var (
|
||||
)
|
||||
|
||||
type kopiaDataCollection struct {
|
||||
path path.Path
|
||||
streams []data.Stream
|
||||
snapshotRoot fs.Entry
|
||||
counter ByteCounter
|
||||
path path.Path
|
||||
streams []data.Stream
|
||||
dir fs.Directory
|
||||
counter ByteCounter
|
||||
expectedVersion uint32
|
||||
}
|
||||
|
||||
func (kdc *kopiaDataCollection) addStream(
|
||||
ctx context.Context,
|
||||
name string,
|
||||
) error {
|
||||
s, err := kdc.Fetch(ctx, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
kdc.streams = append(kdc.streams, s)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kdc *kopiaDataCollection) Items(
|
||||
@ -45,23 +61,61 @@ func (kdc kopiaDataCollection) FullPath() path.Path {
|
||||
return kdc.path
|
||||
}
|
||||
|
||||
// Fetch returns the file with the given name from the collection as a
|
||||
// data.Stream. Returns a data.ErrNotFound error if the file isn't in the
|
||||
// collection.
|
||||
func (kdc kopiaDataCollection) Fetch(
|
||||
ctx context.Context,
|
||||
name string,
|
||||
) (data.Stream, error) {
|
||||
if kdc.snapshotRoot == nil {
|
||||
return nil, clues.New("no snapshot root")
|
||||
ctx = clues.Add(ctx, "item_name", clues.Hide(name))
|
||||
|
||||
if kdc.dir == nil {
|
||||
return nil, clues.New("no snapshot directory")
|
||||
}
|
||||
|
||||
p, err := kdc.FullPath().Append(name, true)
|
||||
if len(name) == 0 {
|
||||
return nil, clues.Wrap(errNoRestorePath, "unknown item").WithClues(ctx)
|
||||
}
|
||||
|
||||
e, err := kdc.dir.Child(ctx, encodeAsPath(name))
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "creating item path")
|
||||
if isErrEntryNotFound(err) {
|
||||
err = clues.Stack(data.ErrNotFound, err)
|
||||
}
|
||||
|
||||
return nil, clues.Wrap(err, "getting item").WithClues(ctx)
|
||||
}
|
||||
|
||||
// TODO(ashmrtn): We could possibly hold a reference to the folder this
|
||||
// collection corresponds to, but that requires larger changes for the
|
||||
// creation of these collections.
|
||||
return getItemStream(ctx, p, kdc.snapshotRoot, kdc.counter)
|
||||
f, ok := e.(fs.File)
|
||||
if !ok {
|
||||
return nil, clues.New("object is not a file").WithClues(ctx)
|
||||
}
|
||||
|
||||
size := f.Size() - int64(versionSize)
|
||||
if size < 0 {
|
||||
logger.Ctx(ctx).Infow("negative file size; resetting to 0", "file_size", size)
|
||||
|
||||
size = 0
|
||||
}
|
||||
|
||||
if kdc.counter != nil {
|
||||
kdc.counter.Count(size)
|
||||
}
|
||||
|
||||
r, err := f.Open(ctx)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "opening file").WithClues(ctx)
|
||||
}
|
||||
|
||||
return &kopiaDataStream{
|
||||
uuid: name,
|
||||
reader: &restoreStreamReader{
|
||||
ReadCloser: r,
|
||||
expectedVersion: kdc.expectedVersion,
|
||||
},
|
||||
size: size,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type kopiaDataStream struct {
|
||||
|
||||
@ -20,6 +20,48 @@ import (
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
// ---------------
|
||||
// Wrappers to match required interfaces.
|
||||
// ---------------
|
||||
|
||||
// These types are needed because we check that a fs.File was returned.
|
||||
// Unfortunately fs.StreamingFile and fs.File have different interfaces so we
|
||||
// have to fake things.
|
||||
type mockSeeker struct{}
|
||||
|
||||
func (s mockSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||
return 0, clues.New("not implemented")
|
||||
}
|
||||
|
||||
type mockReader struct {
|
||||
io.ReadCloser
|
||||
mockSeeker
|
||||
}
|
||||
|
||||
func (r mockReader) Entry() (fs.Entry, error) {
|
||||
return nil, clues.New("not implemented")
|
||||
}
|
||||
|
||||
type mockFile struct {
|
||||
// Use for Entry interface.
|
||||
fs.StreamingFile
|
||||
r io.ReadCloser
|
||||
openErr error
|
||||
size int64
|
||||
}
|
||||
|
||||
func (f *mockFile) Open(ctx context.Context) (fs.Reader, error) {
|
||||
if f.openErr != nil {
|
||||
return nil, f.openErr
|
||||
}
|
||||
|
||||
return mockReader{ReadCloser: f.r}, nil
|
||||
}
|
||||
|
||||
func (f *mockFile) Size() int64 {
|
||||
return f.size
|
||||
}
|
||||
|
||||
// ---------------
|
||||
// unit tests
|
||||
// ---------------
|
||||
@ -44,52 +86,127 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsPath() {
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
|
||||
c := kopiaDataCollection{
|
||||
streams: []data.Stream{},
|
||||
path: pth,
|
||||
path: pth,
|
||||
}
|
||||
|
||||
assert.Equal(t, pth, c.FullPath())
|
||||
}
|
||||
|
||||
func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() {
|
||||
testData := [][]byte{
|
||||
[]byte("abcdefghijklmnopqrstuvwxyz"),
|
||||
[]byte("zyxwvutsrqponmlkjihgfedcba"),
|
||||
type loadedData struct {
|
||||
uuid string
|
||||
data []byte
|
||||
size int64
|
||||
}
|
||||
|
||||
uuids := []string{
|
||||
"a-file",
|
||||
"another-file",
|
||||
var (
|
||||
fileData = [][]byte{
|
||||
[]byte("abcdefghijklmnopqrstuvwxyz"),
|
||||
[]byte("zyxwvutsrqponmlkjihgfedcba"),
|
||||
}
|
||||
|
||||
uuids = []string{
|
||||
"a-file",
|
||||
"another-file",
|
||||
}
|
||||
|
||||
files = []loadedData{
|
||||
{uuid: uuids[0], data: fileData[0], size: int64(len(fileData[0]))},
|
||||
{uuid: uuids[1], data: fileData[1], size: int64(len(fileData[1]))},
|
||||
}
|
||||
|
||||
fileLookupErrName = "errLookup"
|
||||
fileOpenErrName = "errOpen"
|
||||
notFileErrName = "errNotFile"
|
||||
)
|
||||
|
||||
// Needs to be a function so the readers get refreshed each time.
|
||||
getLayout := func() fs.Directory {
|
||||
return virtualfs.NewStaticDirectory(encodeAsPath("foo"), []fs.Entry{
|
||||
&mockFile{
|
||||
StreamingFile: virtualfs.StreamingFileFromReader(
|
||||
encodeAsPath(files[0].uuid),
|
||||
nil,
|
||||
),
|
||||
r: newBackupStreamReader(
|
||||
serializationVersion,
|
||||
io.NopCloser(bytes.NewReader(files[0].data)),
|
||||
),
|
||||
size: int64(len(files[0].data) + versionSize),
|
||||
},
|
||||
&mockFile{
|
||||
StreamingFile: virtualfs.StreamingFileFromReader(
|
||||
encodeAsPath(files[1].uuid),
|
||||
nil,
|
||||
),
|
||||
r: newBackupStreamReader(
|
||||
serializationVersion,
|
||||
io.NopCloser(bytes.NewReader(files[1].data)),
|
||||
),
|
||||
size: int64(len(files[1].data) + versionSize),
|
||||
},
|
||||
&mockFile{
|
||||
StreamingFile: virtualfs.StreamingFileFromReader(
|
||||
encodeAsPath(fileOpenErrName),
|
||||
nil,
|
||||
),
|
||||
openErr: assert.AnError,
|
||||
},
|
||||
virtualfs.NewStaticDirectory(encodeAsPath(notFileErrName), []fs.Entry{}),
|
||||
})
|
||||
}
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
streams []data.Stream
|
||||
name string
|
||||
uuidsAndErrors map[string]assert.ErrorAssertionFunc
|
||||
// Data and stuff about the loaded data.
|
||||
expectedLoaded []loadedData
|
||||
}{
|
||||
{
|
||||
name: "SingleStream",
|
||||
streams: []data.Stream{
|
||||
&kopiaDataStream{
|
||||
reader: io.NopCloser(bytes.NewReader(testData[0])),
|
||||
uuid: uuids[0],
|
||||
size: int64(len(testData[0])),
|
||||
},
|
||||
uuidsAndErrors: map[string]assert.ErrorAssertionFunc{
|
||||
uuids[0]: assert.NoError,
|
||||
},
|
||||
expectedLoaded: []loadedData{files[0]},
|
||||
},
|
||||
{
|
||||
name: "MultipleStreams",
|
||||
streams: []data.Stream{
|
||||
&kopiaDataStream{
|
||||
reader: io.NopCloser(bytes.NewReader(testData[0])),
|
||||
uuid: uuids[0],
|
||||
size: int64(len(testData[0])),
|
||||
},
|
||||
&kopiaDataStream{
|
||||
reader: io.NopCloser(bytes.NewReader(testData[1])),
|
||||
uuid: uuids[1],
|
||||
size: int64(len(testData[1])),
|
||||
},
|
||||
uuidsAndErrors: map[string]assert.ErrorAssertionFunc{
|
||||
uuids[0]: assert.NoError,
|
||||
uuids[1]: assert.NoError,
|
||||
},
|
||||
expectedLoaded: files,
|
||||
},
|
||||
{
|
||||
name: "Some Not Found Errors",
|
||||
uuidsAndErrors: map[string]assert.ErrorAssertionFunc{
|
||||
fileLookupErrName: assert.Error,
|
||||
uuids[0]: assert.NoError,
|
||||
},
|
||||
expectedLoaded: []loadedData{files[0]},
|
||||
},
|
||||
{
|
||||
name: "Some Not A File Errors",
|
||||
uuidsAndErrors: map[string]assert.ErrorAssertionFunc{
|
||||
notFileErrName: assert.Error,
|
||||
uuids[0]: assert.NoError,
|
||||
},
|
||||
expectedLoaded: []loadedData{files[0]},
|
||||
},
|
||||
{
|
||||
name: "Some Open Errors",
|
||||
uuidsAndErrors: map[string]assert.ErrorAssertionFunc{
|
||||
fileOpenErrName: assert.Error,
|
||||
uuids[0]: assert.NoError,
|
||||
},
|
||||
expectedLoaded: []loadedData{files[0]},
|
||||
},
|
||||
{
|
||||
name: "Empty Name Errors",
|
||||
uuidsAndErrors: map[string]assert.ErrorAssertionFunc{
|
||||
"": assert.Error,
|
||||
},
|
||||
expectedLoaded: []loadedData{},
|
||||
},
|
||||
}
|
||||
|
||||
@ -101,112 +218,101 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() {
|
||||
t := suite.T()
|
||||
|
||||
c := kopiaDataCollection{
|
||||
streams: test.streams,
|
||||
path: nil,
|
||||
dir: getLayout(),
|
||||
path: nil,
|
||||
expectedVersion: serializationVersion,
|
||||
}
|
||||
|
||||
count := 0
|
||||
for returnedStream := range c.Items(ctx, fault.New(true)) {
|
||||
require.Less(t, count, len(test.streams))
|
||||
assert.Equal(t, returnedStream.UUID(), uuids[count])
|
||||
for uuid, expectErr := range test.uuidsAndErrors {
|
||||
err := c.addStream(ctx, uuid)
|
||||
expectErr(t, err, "adding stream to collection", clues.ToCore(err))
|
||||
}
|
||||
|
||||
var (
|
||||
found []loadedData
|
||||
bus = fault.New(true)
|
||||
)
|
||||
|
||||
for returnedStream := range c.Items(ctx, bus) {
|
||||
require.Less(t, len(found), len(test.expectedLoaded), "items read safety")
|
||||
|
||||
found = append(found, loadedData{})
|
||||
f := &found[len(found)-1]
|
||||
f.uuid = returnedStream.UUID()
|
||||
|
||||
buf, err := io.ReadAll(returnedStream.ToReader())
|
||||
require.NoError(t, err, clues.ToCore(err))
|
||||
assert.Equal(t, buf, testData[count])
|
||||
require.Implements(t, (*data.StreamSize)(nil), returnedStream)
|
||||
if !assert.NoError(t, err, clues.ToCore(err)) {
|
||||
continue
|
||||
}
|
||||
|
||||
f.data = buf
|
||||
|
||||
if !assert.Implements(t, (*data.StreamSize)(nil), returnedStream) {
|
||||
continue
|
||||
}
|
||||
|
||||
ss := returnedStream.(data.StreamSize)
|
||||
assert.Equal(t, len(buf), int(ss.Size()))
|
||||
|
||||
count++
|
||||
f.size = ss.Size()
|
||||
}
|
||||
|
||||
assert.Equal(t, len(test.streams), count)
|
||||
assert.Empty(t, bus.Recovered(), "expected no recoverable errors")
|
||||
assert.NoError(t, bus.Failure(), "expected no hard failures")
|
||||
|
||||
assert.ElementsMatch(t, test.expectedLoaded, found, "loaded items")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// These types are needed because we check that a fs.File was returned.
|
||||
// Unfortunately fs.StreamingFile and fs.File have different interfaces so we
|
||||
// have to fake things.
|
||||
type mockSeeker struct{}
|
||||
|
||||
func (s mockSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||
return 0, clues.New("not implemented")
|
||||
}
|
||||
|
||||
type mockReader struct {
|
||||
io.ReadCloser
|
||||
mockSeeker
|
||||
}
|
||||
|
||||
func (r mockReader) Entry() (fs.Entry, error) {
|
||||
return nil, clues.New("not implemented")
|
||||
}
|
||||
|
||||
type mockFile struct {
|
||||
// Use for Entry interface.
|
||||
fs.StreamingFile
|
||||
r io.ReadCloser
|
||||
}
|
||||
|
||||
func (f *mockFile) Open(ctx context.Context) (fs.Reader, error) {
|
||||
return mockReader{ReadCloser: f.r}, nil
|
||||
}
|
||||
|
||||
func (suite *KopiaDataCollectionUnitSuite) TestFetch() {
|
||||
var (
|
||||
tenant = "a-tenant"
|
||||
user = "a-user"
|
||||
service = path.ExchangeService.String()
|
||||
category = path.EmailCategory
|
||||
folder1 = "folder1"
|
||||
folder2 = "folder2"
|
||||
|
||||
noErrFileName = "noError"
|
||||
errFileName = "error"
|
||||
errFileName2 = "error2"
|
||||
|
||||
noErrFileData = "foo bar baz"
|
||||
|
||||
errReader = &exchMock.Data{
|
||||
errReader = &exchMock.Data{
|
||||
ReadErr: assert.AnError,
|
||||
}
|
||||
)
|
||||
|
||||
// Needs to be a function so we can switch the serialization version as
|
||||
// needed.
|
||||
getLayout := func(serVersion uint32) fs.Entry {
|
||||
return virtualfs.NewStaticDirectory(encodeAsPath(tenant), []fs.Entry{
|
||||
virtualfs.NewStaticDirectory(encodeAsPath(service), []fs.Entry{
|
||||
virtualfs.NewStaticDirectory(encodeAsPath(user), []fs.Entry{
|
||||
virtualfs.NewStaticDirectory(encodeAsPath(category.String()), []fs.Entry{
|
||||
virtualfs.NewStaticDirectory(encodeAsPath(folder1), []fs.Entry{
|
||||
virtualfs.NewStaticDirectory(encodeAsPath(folder2), []fs.Entry{
|
||||
&mockFile{
|
||||
StreamingFile: virtualfs.StreamingFileFromReader(
|
||||
encodeAsPath(noErrFileName),
|
||||
nil,
|
||||
),
|
||||
r: newBackupStreamReader(
|
||||
serVersion,
|
||||
io.NopCloser(bytes.NewReader([]byte(noErrFileData))),
|
||||
),
|
||||
},
|
||||
&mockFile{
|
||||
StreamingFile: virtualfs.StreamingFileFromReader(
|
||||
encodeAsPath(errFileName),
|
||||
nil,
|
||||
),
|
||||
r: newBackupStreamReader(
|
||||
serVersion,
|
||||
errReader.ToReader(),
|
||||
),
|
||||
},
|
||||
}),
|
||||
}),
|
||||
}),
|
||||
}),
|
||||
}),
|
||||
getLayout := func(serVersion uint32) fs.Directory {
|
||||
return virtualfs.NewStaticDirectory(encodeAsPath(folder2), []fs.Entry{
|
||||
&mockFile{
|
||||
StreamingFile: virtualfs.StreamingFileFromReader(
|
||||
encodeAsPath(noErrFileName),
|
||||
nil,
|
||||
),
|
||||
r: newBackupStreamReader(
|
||||
serVersion,
|
||||
io.NopCloser(bytes.NewReader([]byte(noErrFileData))),
|
||||
),
|
||||
},
|
||||
&mockFile{
|
||||
StreamingFile: virtualfs.StreamingFileFromReader(
|
||||
encodeAsPath(errFileName),
|
||||
nil,
|
||||
),
|
||||
r: newBackupStreamReader(
|
||||
serVersion,
|
||||
errReader.ToReader(),
|
||||
),
|
||||
},
|
||||
&mockFile{
|
||||
StreamingFile: virtualfs.StreamingFileFromReader(
|
||||
encodeAsPath(errFileName2),
|
||||
nil,
|
||||
),
|
||||
openErr: assert.AnError,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@ -268,7 +374,12 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetch() {
|
||||
root := getLayout(test.inputSerializationVersion)
|
||||
c := &i64counter{}
|
||||
|
||||
col := &kopiaDataCollection{path: pth, snapshotRoot: root, counter: c}
|
||||
col := &kopiaDataCollection{
|
||||
path: pth,
|
||||
dir: root,
|
||||
counter: c,
|
||||
expectedVersion: serializationVersion,
|
||||
}
|
||||
|
||||
s, err := col.Fetch(ctx, test.inputName)
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@ package kopia
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
@ -326,26 +327,24 @@ func (w Wrapper) getSnapshotRoot(
|
||||
return rootDirEntry, nil
|
||||
}
|
||||
|
||||
// getItemStream looks up the item at the given path starting from snapshotRoot.
|
||||
// If the item is a file in kopia then it returns a data.Stream of the item. If
|
||||
// the item does not exist in kopia or is not a file an error is returned. The
|
||||
// UUID of the returned data.Stream will be the name of the kopia file the data
|
||||
// is sourced from.
|
||||
func getItemStream(
|
||||
// getDir looks up the directory at the given path starting from snapshotRoot.
|
||||
// If the item is a directory in kopia then it returns the kopia fs.Directory
|
||||
// handle. If the item does not exist in kopia or is not a directory an error is
|
||||
// returned.
|
||||
func getDir(
|
||||
ctx context.Context,
|
||||
itemPath path.Path,
|
||||
dirPath path.Path,
|
||||
snapshotRoot fs.Entry,
|
||||
bcounter ByteCounter,
|
||||
) (data.Stream, error) {
|
||||
if itemPath == nil {
|
||||
return nil, clues.Wrap(errNoRestorePath, "getting item stream").WithClues(ctx)
|
||||
) (fs.Directory, error) {
|
||||
if dirPath == nil {
|
||||
return nil, clues.Wrap(errNoRestorePath, "getting directory").WithClues(ctx)
|
||||
}
|
||||
|
||||
// GetNestedEntry handles nil properly.
|
||||
e, err := snapshotfs.GetNestedEntry(
|
||||
ctx,
|
||||
snapshotRoot,
|
||||
encodeElements(itemPath.PopFront().Elements()...))
|
||||
encodeElements(dirPath.PopFront().Elements()...))
|
||||
if err != nil {
|
||||
if isErrEntryNotFound(err) {
|
||||
err = clues.Stack(data.ErrNotFound, err)
|
||||
@ -354,39 +353,97 @@ func getItemStream(
|
||||
return nil, clues.Wrap(err, "getting nested object handle").WithClues(ctx)
|
||||
}
|
||||
|
||||
f, ok := e.(fs.File)
|
||||
f, ok := e.(fs.Directory)
|
||||
if !ok {
|
||||
return nil, clues.New("requested object is not a file").WithClues(ctx)
|
||||
return nil, clues.New("requested object is not a directory").WithClues(ctx)
|
||||
}
|
||||
|
||||
if bcounter != nil {
|
||||
bcounter.Count(f.Size())
|
||||
}
|
||||
|
||||
r, err := f.Open(ctx)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "opening file").WithClues(ctx)
|
||||
}
|
||||
|
||||
decodedName, err := decodeElement(f.Name())
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "decoding file name").WithClues(ctx)
|
||||
}
|
||||
|
||||
return &kopiaDataStream{
|
||||
uuid: decodedName,
|
||||
reader: &restoreStreamReader{
|
||||
ReadCloser: r,
|
||||
expectedVersion: serializationVersion,
|
||||
},
|
||||
size: f.Size() - int64(versionSize),
|
||||
}, nil
|
||||
return f, nil
|
||||
}
|
||||
|
||||
type ByteCounter interface {
|
||||
Count(numBytes int64)
|
||||
}
|
||||
|
||||
type dirAndItems struct {
|
||||
dir path.Path
|
||||
items []string
|
||||
}
|
||||
|
||||
// loadDirsAndItems takes a set of ShortRef -> (directory path, []item names)
|
||||
// and creates a collection for each tuple in the set. Non-fatal errors are
|
||||
// accumulated into bus. Any fatal errors will stop processing and return the
|
||||
// error directly.
|
||||
//
|
||||
// All data is loaded from the given snapshot.
|
||||
func loadDirsAndItems(
|
||||
ctx context.Context,
|
||||
snapshotRoot fs.Entry,
|
||||
bcounter ByteCounter,
|
||||
toLoad map[string]*dirAndItems,
|
||||
bus *fault.Bus,
|
||||
) ([]data.RestoreCollection, error) {
|
||||
var (
|
||||
el = bus.Local()
|
||||
res = make([]data.RestoreCollection, 0, len(toLoad))
|
||||
loadCount = 0
|
||||
)
|
||||
|
||||
for _, dirItems := range toLoad {
|
||||
if el.Failure() != nil {
|
||||
return nil, el.Failure()
|
||||
}
|
||||
|
||||
ictx := clues.Add(ctx, "directory_path", dirItems.dir)
|
||||
|
||||
dir, err := getDir(ictx, dirItems.dir, snapshotRoot)
|
||||
if err != nil {
|
||||
el.AddRecoverable(clues.Wrap(err, "loading directory").
|
||||
WithClues(ictx).
|
||||
Label(fault.LabelForceNoBackupCreation))
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
dc := &kopiaDataCollection{
|
||||
path: dirItems.dir,
|
||||
dir: dir,
|
||||
counter: bcounter,
|
||||
expectedVersion: serializationVersion,
|
||||
}
|
||||
|
||||
res = append(res, dc)
|
||||
|
||||
for _, item := range dirItems.items {
|
||||
if el.Failure() != nil {
|
||||
return nil, el.Failure()
|
||||
}
|
||||
|
||||
err := dc.addStream(ictx, item)
|
||||
if err != nil {
|
||||
el.AddRecoverable(clues.Wrap(err, "loading item").
|
||||
WithClues(ictx).
|
||||
Label(fault.LabelForceNoBackupCreation))
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
loadCount++
|
||||
if loadCount%1000 == 0 {
|
||||
logger.Ctx(ctx).Infow(
|
||||
"loading items from kopia",
|
||||
"loaded_items", loadCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Infow(
|
||||
"done loading items from kopia",
|
||||
"loaded_items", loadCount)
|
||||
|
||||
return res, el.Failure()
|
||||
}
|
||||
|
||||
// ProduceRestoreCollections looks up all paths- assuming each is an item declaration,
|
||||
// not a directory- in the snapshot with id snapshotID. The path should be the
|
||||
// full path of the item from the root. Returns the results as a slice of single-
|
||||
@ -408,16 +465,18 @@ func (w Wrapper) ProduceRestoreCollections(
|
||||
return nil, clues.Stack(errNoRestorePath).WithClues(ctx)
|
||||
}
|
||||
|
||||
// Used later on, but less confusing to follow error propagation if we just
|
||||
// load it here.
|
||||
snapshotRoot, err := w.getSnapshotRoot(ctx, snapshotID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, clues.Wrap(err, "loading snapshot root")
|
||||
}
|
||||
|
||||
var (
|
||||
loadCount int
|
||||
// Maps short ID of parent path to data collection for that folder.
|
||||
cols = map[string]*kopiaDataCollection{}
|
||||
el = errs.Local()
|
||||
// Directory path -> set of items to load from the directory.
|
||||
dirsToItems = map[string]*dirAndItems{}
|
||||
el = errs.Local()
|
||||
)
|
||||
|
||||
for _, itemPath := range paths {
|
||||
@ -425,14 +484,10 @@ func (w Wrapper) ProduceRestoreCollections(
|
||||
return nil, el.Failure()
|
||||
}
|
||||
|
||||
// Group things by directory so we can load all items from a single
|
||||
// directory instance lower down.
|
||||
ictx := clues.Add(ctx, "item_path", itemPath.String())
|
||||
|
||||
ds, err := getItemStream(ictx, itemPath, snapshotRoot, bcounter)
|
||||
if err != nil {
|
||||
el.AddRecoverable(clues.Stack(err).Label(fault.LabelForceNoBackupCreation))
|
||||
continue
|
||||
}
|
||||
|
||||
parentPath, err := itemPath.Dir()
|
||||
if err != nil {
|
||||
el.AddRecoverable(clues.Wrap(err, "making directory collection").
|
||||
@ -442,33 +497,29 @@ func (w Wrapper) ProduceRestoreCollections(
|
||||
continue
|
||||
}
|
||||
|
||||
c, ok := cols[parentPath.ShortRef()]
|
||||
if !ok {
|
||||
cols[parentPath.ShortRef()] = &kopiaDataCollection{
|
||||
path: parentPath,
|
||||
snapshotRoot: snapshotRoot,
|
||||
counter: bcounter,
|
||||
}
|
||||
c = cols[parentPath.ShortRef()]
|
||||
di := dirsToItems[parentPath.ShortRef()]
|
||||
if di == nil {
|
||||
dirsToItems[parentPath.ShortRef()] = &dirAndItems{dir: parentPath}
|
||||
di = dirsToItems[parentPath.ShortRef()]
|
||||
}
|
||||
|
||||
c.streams = append(c.streams, ds)
|
||||
di.items = append(di.items, itemPath.Item())
|
||||
|
||||
loadCount++
|
||||
if loadCount%1000 == 0 {
|
||||
logger.Ctx(ctx).Infow("loading items from kopia", "loaded_count", loadCount)
|
||||
logger.Ctx(ctx).Infow(
|
||||
"grouping items to load from kopia",
|
||||
"group_items", loadCount)
|
||||
}
|
||||
}
|
||||
|
||||
// Can't use the maps package to extract the values because we need to convert
|
||||
// from *kopiaDataCollection to data.RestoreCollection too.
|
||||
res := make([]data.RestoreCollection, 0, len(cols))
|
||||
for _, c := range cols {
|
||||
res = append(res, c)
|
||||
// Now that we've grouped everything, go through and load each directory and
|
||||
// then load the items from the directory.
|
||||
res, err := loadDirsAndItems(ctx, snapshotRoot, bcounter, dirsToItems, errs)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "loading items")
|
||||
}
|
||||
|
||||
logger.Ctx(ctx).Infow("done loading items from kopia", "loaded_count", loadCount)
|
||||
|
||||
return res, el.Failure()
|
||||
}
|
||||
|
||||
@ -525,6 +576,12 @@ func (w Wrapper) FetchPrevSnapshotManifests(
|
||||
}
|
||||
|
||||
func isErrEntryNotFound(err error) bool {
|
||||
// Calling Child on a directory may return this.
|
||||
if errors.Is(err, fs.ErrEntryNotFound) {
|
||||
return true
|
||||
}
|
||||
|
||||
// This is returned when walking the hierarchy of a backup.
|
||||
return strings.Contains(err.Error(), "entry not found") &&
|
||||
!strings.Contains(err.Error(), "parent is not a directory")
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user