Add Fetch() to RestoreCollection (#2434)
## Description Add a function to fetch a file from the collection synchronously. This will help avoid data dependencies on the restore path created by splitting item information across multiple kopia files Fetch function is currently unoptimized, though deeper analysis of memory footprint should be done before changing Viewing by commit will help reduce chaff from updating tests to comply with the new interface ## Does this PR need a docs update or release note? - [ ] ✅ Yes, it's included - [ ] 🕐 Yes, but in a later PR - [x] ⛔ No ## Type of change - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Test - [ ] 💻 CI/Deployment - [ ] 🧹 Tech Debt/Cleanup ## Issue(s) * #1535 ## Test Plan - [ ] 💪 Manual - [x] ⚡ Unit test - [ ] 💚 E2E
This commit is contained in:
parent
47f5ca1d95
commit
129d6b0b0c
@ -175,7 +175,7 @@ func buildCollections(
|
||||
mc.Data[i] = c.items[i].data
|
||||
}
|
||||
|
||||
collections = append(collections, mc)
|
||||
collections = append(collections, data.NotFoundRestoreCollection{Collection: mc})
|
||||
}
|
||||
|
||||
return collections, nil
|
||||
|
||||
@ -174,7 +174,9 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() {
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
cdps, err := parseMetadataCollections(ctx, []data.RestoreCollection{coll})
|
||||
cdps, err := parseMetadataCollections(ctx, []data.RestoreCollection{
|
||||
data.NotFoundRestoreCollection{Collection: coll},
|
||||
})
|
||||
test.expectError(t, err)
|
||||
|
||||
emails := cdps[path.EmailCategory]
|
||||
@ -345,7 +347,9 @@ func (suite *DataCollectionsIntegrationSuite) TestDelta() {
|
||||
|
||||
require.NotNil(t, metadata, "collections contains a metadata collection")
|
||||
|
||||
cdps, err := parseMetadataCollections(ctx, []data.RestoreCollection{metadata})
|
||||
cdps, err := parseMetadataCollections(ctx, []data.RestoreCollection{
|
||||
data.NotFoundRestoreCollection{Collection: metadata},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
dps := cdps[test.scope.Category().PathType()]
|
||||
|
||||
@ -989,7 +989,9 @@ func collectionsForInfo(
|
||||
}
|
||||
}
|
||||
|
||||
collections = append(collections, c)
|
||||
collections = append(collections, data.NotFoundRestoreCollection{
|
||||
Collection: c,
|
||||
})
|
||||
kopiaEntries += len(info.items)
|
||||
}
|
||||
|
||||
@ -1034,7 +1036,9 @@ func collectionsForInfoVersion0(
|
||||
baseExpected[info.items[i].lookupKey] = info.items[i].data
|
||||
}
|
||||
|
||||
collections = append(collections, c)
|
||||
collections = append(collections, data.NotFoundRestoreCollection{
|
||||
Collection: c,
|
||||
})
|
||||
totalItems += len(info.items)
|
||||
kopiaEntries += len(info.items)
|
||||
}
|
||||
|
||||
@ -27,13 +27,10 @@ type MockExchangeDataCollection struct {
|
||||
}
|
||||
|
||||
var (
|
||||
// Needs to implement both backup and restore interfaces so we can use it in
|
||||
// integration tests.
|
||||
_ data.BackupCollection = &MockExchangeDataCollection{}
|
||||
_ data.RestoreCollection = &MockExchangeDataCollection{}
|
||||
_ data.Stream = &MockExchangeData{}
|
||||
_ data.StreamInfo = &MockExchangeData{}
|
||||
_ data.StreamSize = &MockExchangeData{}
|
||||
_ data.BackupCollection = &MockExchangeDataCollection{}
|
||||
_ data.Stream = &MockExchangeData{}
|
||||
_ data.StreamInfo = &MockExchangeData{}
|
||||
_ data.StreamSize = &MockExchangeData{}
|
||||
)
|
||||
|
||||
// NewMockExchangeDataCollection creates an data collection that will return the specified number of
|
||||
|
||||
@ -996,7 +996,7 @@ func (suite *OneDriveCollectionsSuite) TestDeserializeMetadata() {
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
cols = append(cols, mc)
|
||||
cols = append(cols, data.NotFoundRestoreCollection{Collection: mc})
|
||||
}
|
||||
|
||||
deltas, paths, err := deserializeMetadata(ctx, cols)
|
||||
@ -1529,7 +1529,9 @@ func (suite *OneDriveCollectionsSuite) TestGet() {
|
||||
for _, baseCol := range cols {
|
||||
folderPath := baseCol.FullPath().String()
|
||||
if folderPath == metadataPath.String() {
|
||||
deltas, paths, err := deserializeMetadata(ctx, []data.RestoreCollection{baseCol})
|
||||
deltas, paths, err := deserializeMetadata(ctx, []data.RestoreCollection{
|
||||
data.NotFoundRestoreCollection{Collection: baseCol},
|
||||
})
|
||||
if !assert.NoError(t, err, "deserializing metadata") {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -68,6 +68,10 @@ type BackupCollection interface {
|
||||
// RestoreCollection is an extension of Collection that is used during restores.
|
||||
type RestoreCollection interface {
|
||||
Collection
|
||||
// Fetch retrieves an item with the given name from the Collection if it
|
||||
// exists. Items retrieved with Fetch may still appear in the channel returned
|
||||
// by Items().
|
||||
Fetch(ctx context.Context, name string) (Stream, error)
|
||||
}
|
||||
|
||||
// NotFoundRestoreCollection is a wrapper for a Collection that returns
|
||||
|
||||
@ -1,8 +1,12 @@
|
||||
package kopia
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/kopia/kopia/fs"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
@ -13,8 +17,10 @@ var (
|
||||
)
|
||||
|
||||
type kopiaDataCollection struct {
|
||||
path path.Path
|
||||
streams []data.Stream
|
||||
path path.Path
|
||||
streams []data.Stream
|
||||
snapshotRoot fs.Entry
|
||||
counter ByteCounter
|
||||
}
|
||||
|
||||
func (kdc *kopiaDataCollection) Items() <-chan data.Stream {
|
||||
@ -35,6 +41,25 @@ func (kdc kopiaDataCollection) FullPath() path.Path {
|
||||
return kdc.path
|
||||
}
|
||||
|
||||
func (kdc kopiaDataCollection) Fetch(
|
||||
ctx context.Context,
|
||||
name string,
|
||||
) (data.Stream, error) {
|
||||
if kdc.snapshotRoot == nil {
|
||||
return nil, clues.New("no snapshot root")
|
||||
}
|
||||
|
||||
p, err := kdc.FullPath().Append(name, true)
|
||||
if err != nil {
|
||||
return nil, clues.Wrap(err, "creating item path")
|
||||
}
|
||||
|
||||
// TODO(ashmrtn): We could possibly hold a reference to the folder this
|
||||
// collection corresponds to, but that requires larger changes for the
|
||||
// creation of these collections.
|
||||
return getItemStream(ctx, p, kdc.snapshotRoot, kdc.counter)
|
||||
}
|
||||
|
||||
type kopiaDataStream struct {
|
||||
reader io.ReadCloser
|
||||
uuid string
|
||||
|
||||
@ -2,14 +2,20 @@ package kopia
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/fs/virtualfs"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/connector/mockconnector"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
@ -113,3 +119,172 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// These types are needed because we check that a fs.File was returned.
|
||||
// Unfortunately fs.StreamingFile and fs.File have different interfaces so we
|
||||
// have to fake things.
|
||||
type mockSeeker struct{}
|
||||
|
||||
func (s mockSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||
return 0, errors.New("not implemented")
|
||||
}
|
||||
|
||||
type mockReader struct {
|
||||
io.ReadCloser
|
||||
mockSeeker
|
||||
}
|
||||
|
||||
func (r mockReader) Entry() (fs.Entry, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
type mockFile struct {
|
||||
// Use for Entry interface.
|
||||
fs.StreamingFile
|
||||
r io.ReadCloser
|
||||
}
|
||||
|
||||
func (f *mockFile) Open(ctx context.Context) (fs.Reader, error) {
|
||||
return mockReader{ReadCloser: f.r}, nil
|
||||
}
|
||||
|
||||
func (suite *KopiaDataCollectionUnitSuite) TestFetch() {
|
||||
var (
|
||||
tenant = "a-tenant"
|
||||
user = "a-user"
|
||||
service = path.ExchangeService.String()
|
||||
category = path.EmailCategory
|
||||
folder1 = "folder1"
|
||||
folder2 = "folder2"
|
||||
|
||||
noErrFileName = "noError"
|
||||
errFileName = "error"
|
||||
|
||||
noErrFileData = "foo bar baz"
|
||||
|
||||
errReader = &mockconnector.MockExchangeData{
|
||||
ReadErr: assert.AnError,
|
||||
}
|
||||
)
|
||||
|
||||
// Needs to be a function so we can switch the serialization version as
|
||||
// needed.
|
||||
getLayout := func(serVersion uint32) fs.Entry {
|
||||
return virtualfs.NewStaticDirectory(encodeAsPath(tenant), []fs.Entry{
|
||||
virtualfs.NewStaticDirectory(encodeAsPath(service), []fs.Entry{
|
||||
virtualfs.NewStaticDirectory(encodeAsPath(user), []fs.Entry{
|
||||
virtualfs.NewStaticDirectory(encodeAsPath(category.String()), []fs.Entry{
|
||||
virtualfs.NewStaticDirectory(encodeAsPath(folder1), []fs.Entry{
|
||||
virtualfs.NewStaticDirectory(encodeAsPath(folder2), []fs.Entry{
|
||||
&mockFile{
|
||||
StreamingFile: virtualfs.StreamingFileFromReader(
|
||||
encodeAsPath(noErrFileName),
|
||||
nil,
|
||||
),
|
||||
r: newBackupStreamReader(
|
||||
serVersion,
|
||||
io.NopCloser(bytes.NewReader([]byte(noErrFileData))),
|
||||
),
|
||||
},
|
||||
&mockFile{
|
||||
StreamingFile: virtualfs.StreamingFileFromReader(
|
||||
encodeAsPath(errFileName),
|
||||
nil,
|
||||
),
|
||||
r: newBackupStreamReader(
|
||||
serVersion,
|
||||
errReader.ToReader(),
|
||||
),
|
||||
},
|
||||
}),
|
||||
}),
|
||||
}),
|
||||
}),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
b := path.Builder{}.Append(folder1, folder2)
|
||||
pth, err := b.ToDataLayerExchangePathForCategory(
|
||||
tenant,
|
||||
user,
|
||||
category,
|
||||
false,
|
||||
)
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
inputName string
|
||||
inputSerializationVersion uint32
|
||||
expectedData []byte
|
||||
lookupErr assert.ErrorAssertionFunc
|
||||
readErr assert.ErrorAssertionFunc
|
||||
notFoundErr bool
|
||||
}{
|
||||
{
|
||||
name: "FileFound_NoError",
|
||||
inputName: noErrFileName,
|
||||
inputSerializationVersion: serializationVersion,
|
||||
expectedData: []byte(noErrFileData),
|
||||
lookupErr: assert.NoError,
|
||||
readErr: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "FileFound_ReadError",
|
||||
inputName: errFileName,
|
||||
inputSerializationVersion: serializationVersion,
|
||||
lookupErr: assert.NoError,
|
||||
readErr: assert.Error,
|
||||
},
|
||||
{
|
||||
name: "FileFound_VersionError",
|
||||
inputName: noErrFileName,
|
||||
inputSerializationVersion: serializationVersion + 1,
|
||||
lookupErr: assert.NoError,
|
||||
readErr: assert.Error,
|
||||
},
|
||||
{
|
||||
name: "FileNotFound",
|
||||
inputName: "foo",
|
||||
inputSerializationVersion: serializationVersion + 1,
|
||||
lookupErr: assert.Error,
|
||||
notFoundErr: true,
|
||||
},
|
||||
}
|
||||
for _, test := range table {
|
||||
suite.Run(test.name, func() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
t := suite.T()
|
||||
|
||||
root := getLayout(test.inputSerializationVersion)
|
||||
c := &i64counter{}
|
||||
|
||||
col := &kopiaDataCollection{path: pth, snapshotRoot: root, counter: c}
|
||||
|
||||
s, err := col.Fetch(ctx, test.inputName)
|
||||
|
||||
test.lookupErr(t, err)
|
||||
|
||||
if err != nil {
|
||||
if test.notFoundErr {
|
||||
assert.ErrorIs(t, err, data.ErrNotFound)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
fileData, err := io.ReadAll(s.ToReader())
|
||||
|
||||
test.readErr(t, err)
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
assert.Equal(t, test.expectedData, fileData)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -412,13 +412,19 @@ func (w Wrapper) RestoreMultipleItems(
|
||||
|
||||
c, ok := cols[parentPath.ShortRef()]
|
||||
if !ok {
|
||||
cols[parentPath.ShortRef()] = &kopiaDataCollection{path: parentPath}
|
||||
cols[parentPath.ShortRef()] = &kopiaDataCollection{
|
||||
path: parentPath,
|
||||
snapshotRoot: snapshotRoot,
|
||||
counter: bcounter,
|
||||
}
|
||||
c = cols[parentPath.ShortRef()]
|
||||
}
|
||||
|
||||
c.streams = append(c.streams, ds)
|
||||
}
|
||||
|
||||
// Can't use the maps package to extract the values because we need to convert
|
||||
// from *kopiaDataCollection to data.RestoreCollection too.
|
||||
res := make([]data.RestoreCollection, 0, len(cols))
|
||||
for _, c := range cols {
|
||||
res = append(res, c)
|
||||
|
||||
@ -410,7 +410,7 @@ func buildCollections(
|
||||
mc.Data[i] = c.items[i].data
|
||||
}
|
||||
|
||||
collections = append(collections, mc)
|
||||
collections = append(collections, data.NotFoundRestoreCollection{Collection: mc})
|
||||
}
|
||||
|
||||
return collections
|
||||
|
||||
@ -435,7 +435,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() {
|
||||
getMeta bool
|
||||
assertErr assert.ErrorAssertionFunc
|
||||
assertB assert.BoolAssertionFunc
|
||||
expectDCS []data.RestoreCollection
|
||||
expectDCS []mockColl
|
||||
expectNilMans bool
|
||||
}{
|
||||
{
|
||||
@ -539,7 +539,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() {
|
||||
name: "man missing backup id",
|
||||
mr: mockManifestRestorer{
|
||||
mockRestorer: mockRestorer{collsByID: map[string][]data.RestoreCollection{
|
||||
"id": {mockColl{id: "id_coll"}},
|
||||
"id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "id_coll"}}},
|
||||
}},
|
||||
mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "id", "", "")},
|
||||
},
|
||||
@ -566,8 +566,8 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() {
|
||||
name: "one complete, one incomplete",
|
||||
mr: mockManifestRestorer{
|
||||
mockRestorer: mockRestorer{collsByID: map[string][]data.RestoreCollection{
|
||||
"id": {mockColl{id: "id_coll"}},
|
||||
"incmpl_id": {mockColl{id: "incmpl_id_coll"}},
|
||||
"id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "id_coll"}}},
|
||||
"incmpl_id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "incmpl_id_coll"}}},
|
||||
}},
|
||||
mans: []*kopia.ManifestEntry{
|
||||
makeMan(path.EmailCategory, "id", "", "bid"),
|
||||
@ -579,13 +579,13 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() {
|
||||
getMeta: true,
|
||||
assertErr: assert.NoError,
|
||||
assertB: assert.True,
|
||||
expectDCS: []data.RestoreCollection{mockColl{id: "id_coll"}},
|
||||
expectDCS: []mockColl{{id: "id_coll"}},
|
||||
},
|
||||
{
|
||||
name: "single valid man",
|
||||
mr: mockManifestRestorer{
|
||||
mockRestorer: mockRestorer{collsByID: map[string][]data.RestoreCollection{
|
||||
"id": {mockColl{id: "id_coll"}},
|
||||
"id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "id_coll"}}},
|
||||
}},
|
||||
mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "id", "", "bid")},
|
||||
},
|
||||
@ -594,14 +594,14 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() {
|
||||
getMeta: true,
|
||||
assertErr: assert.NoError,
|
||||
assertB: assert.True,
|
||||
expectDCS: []data.RestoreCollection{mockColl{id: "id_coll"}},
|
||||
expectDCS: []mockColl{{id: "id_coll"}},
|
||||
},
|
||||
{
|
||||
name: "multiple valid mans",
|
||||
mr: mockManifestRestorer{
|
||||
mockRestorer: mockRestorer{collsByID: map[string][]data.RestoreCollection{
|
||||
"mail": {mockColl{id: "mail_coll"}},
|
||||
"contact": {mockColl{id: "contact_coll"}},
|
||||
"mail": {data.NotFoundRestoreCollection{Collection: mockColl{id: "mail_coll"}}},
|
||||
"contact": {data.NotFoundRestoreCollection{Collection: mockColl{id: "contact_coll"}}},
|
||||
}},
|
||||
mans: []*kopia.ManifestEntry{
|
||||
makeMan(path.EmailCategory, "mail", "", "bid"),
|
||||
@ -613,9 +613,9 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() {
|
||||
getMeta: true,
|
||||
assertErr: assert.NoError,
|
||||
assertB: assert.True,
|
||||
expectDCS: []data.RestoreCollection{
|
||||
mockColl{id: "mail_coll"},
|
||||
mockColl{id: "contact_coll"},
|
||||
expectDCS: []mockColl{
|
||||
{id: "mail_coll"},
|
||||
{id: "contact_coll"},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -658,16 +658,33 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() {
|
||||
expect, got := []string{}, []string{}
|
||||
|
||||
for _, dc := range test.expectDCS {
|
||||
mc, ok := dc.(mockColl)
|
||||
assert.True(t, ok)
|
||||
|
||||
expect = append(expect, mc.id)
|
||||
expect = append(expect, dc.id)
|
||||
}
|
||||
|
||||
for _, dc := range dcs {
|
||||
mc, ok := dc.(mockColl)
|
||||
assert.True(t, ok)
|
||||
if !assert.IsTypef(
|
||||
t,
|
||||
data.NotFoundRestoreCollection{},
|
||||
dc,
|
||||
"unexpected type returned [%T]",
|
||||
dc,
|
||||
) {
|
||||
continue
|
||||
}
|
||||
|
||||
tmp := dc.(data.NotFoundRestoreCollection)
|
||||
|
||||
if !assert.IsTypef(
|
||||
t,
|
||||
mockColl{},
|
||||
tmp.Collection,
|
||||
"unexpected type returned [%T]",
|
||||
tmp.Collection,
|
||||
) {
|
||||
continue
|
||||
}
|
||||
|
||||
mc := tmp.Collection.(mockColl)
|
||||
got = append(got, mc.id)
|
||||
}
|
||||
|
||||
|
||||
@ -62,7 +62,11 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
|
||||
bytesRead: &stats.ByteCounter{
|
||||
NumBytes: 42,
|
||||
},
|
||||
cs: []data.RestoreCollection{&mockconnector.MockExchangeDataCollection{}},
|
||||
cs: []data.RestoreCollection{
|
||||
data.NotFoundRestoreCollection{
|
||||
Collection: &mockconnector.MockExchangeDataCollection{},
|
||||
},
|
||||
},
|
||||
gc: &support.ConnectorOperationStatus{
|
||||
ObjectCount: 1,
|
||||
Successful: 1,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user