From 129d6b0b0c2d62b0cf6523a951f9f3b7dc992629 Mon Sep 17 00:00:00 2001 From: ashmrtn Date: Wed, 8 Feb 2023 14:05:38 -0800 Subject: [PATCH] Add Fetch() to RestoreCollection (#2434) ## Description Add a function to fetch a file from the collection synchronously. This will help avoid data dependencies on the restore path created by splitting item information across multiple kopia files Fetch function is currently unoptimized, though deeper analysis of memory footprint should be done before changing Viewing by commit will help reduce chaff from updating tests to comply with the new interface ## Does this PR need a docs update or release note? - [ ] :white_check_mark: Yes, it's included - [ ] :clock1: Yes, but in a later PR - [x] :no_entry: No ## Type of change - [x] :sunflower: Feature - [ ] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Test - [ ] :computer: CI/Deployment - [ ] :broom: Tech Debt/Cleanup ## Issue(s) * #1535 ## Test Plan - [ ] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- src/cmd/factory/impl/common.go | 2 +- .../exchange/data_collections_test.go | 8 +- .../connector/graph_connector_helper_test.go | 8 +- .../mockconnector/mock_data_collection.go | 11 +- .../connector/onedrive/collections_test.go | 6 +- src/internal/data/data_collection.go | 4 + src/internal/kopia/data_collection.go | 29 ++- src/internal/kopia/data_collection_test.go | 175 ++++++++++++++++++ src/internal/kopia/wrapper.go | 8 +- .../operations/backup_integration_test.go | 2 +- src/internal/operations/manifests_test.go | 53 ++++-- src/internal/operations/restore_test.go | 6 +- 12 files changed, 275 insertions(+), 37 deletions(-) diff --git a/src/cmd/factory/impl/common.go b/src/cmd/factory/impl/common.go index 2279c71a3..45d2b7a18 100644 --- a/src/cmd/factory/impl/common.go +++ b/src/cmd/factory/impl/common.go @@ -175,7 +175,7 @@ func buildCollections( mc.Data[i] = c.items[i].data } - collections = append(collections, mc) + collections = append(collections, data.NotFoundRestoreCollection{Collection: mc}) } return collections, nil diff --git a/src/internal/connector/exchange/data_collections_test.go b/src/internal/connector/exchange/data_collections_test.go index 42b24c9f3..3df22d030 100644 --- a/src/internal/connector/exchange/data_collections_test.go +++ b/src/internal/connector/exchange/data_collections_test.go @@ -174,7 +174,9 @@ func (suite *DataCollectionsUnitSuite) TestParseMetadataCollections() { ) require.NoError(t, err) - cdps, err := parseMetadataCollections(ctx, []data.RestoreCollection{coll}) + cdps, err := parseMetadataCollections(ctx, []data.RestoreCollection{ + data.NotFoundRestoreCollection{Collection: coll}, + }) test.expectError(t, err) emails := cdps[path.EmailCategory] @@ -345,7 +347,9 @@ func (suite *DataCollectionsIntegrationSuite) TestDelta() { require.NotNil(t, metadata, "collections contains a metadata collection") - cdps, err := parseMetadataCollections(ctx, []data.RestoreCollection{metadata}) + cdps, err := parseMetadataCollections(ctx, []data.RestoreCollection{ + data.NotFoundRestoreCollection{Collection: metadata}, + }) require.NoError(t, err) dps := cdps[test.scope.Category().PathType()] diff --git a/src/internal/connector/graph_connector_helper_test.go b/src/internal/connector/graph_connector_helper_test.go index 299509f96..ad6ea556e 100644 --- a/src/internal/connector/graph_connector_helper_test.go +++ b/src/internal/connector/graph_connector_helper_test.go @@ -989,7 +989,9 @@ func collectionsForInfo( } } - collections = append(collections, c) + collections = append(collections, data.NotFoundRestoreCollection{ + Collection: c, + }) kopiaEntries += len(info.items) } @@ -1034,7 +1036,9 @@ func collectionsForInfoVersion0( baseExpected[info.items[i].lookupKey] = info.items[i].data } - collections = append(collections, c) + collections = append(collections, data.NotFoundRestoreCollection{ + Collection: c, + }) totalItems += len(info.items) kopiaEntries += len(info.items) } diff --git a/src/internal/connector/mockconnector/mock_data_collection.go b/src/internal/connector/mockconnector/mock_data_collection.go index 8cd315d0d..0b643a699 100644 --- a/src/internal/connector/mockconnector/mock_data_collection.go +++ b/src/internal/connector/mockconnector/mock_data_collection.go @@ -27,13 +27,10 @@ type MockExchangeDataCollection struct { } var ( - // Needs to implement both backup and restore interfaces so we can use it in - // integration tests. - _ data.BackupCollection = &MockExchangeDataCollection{} - _ data.RestoreCollection = &MockExchangeDataCollection{} - _ data.Stream = &MockExchangeData{} - _ data.StreamInfo = &MockExchangeData{} - _ data.StreamSize = &MockExchangeData{} + _ data.BackupCollection = &MockExchangeDataCollection{} + _ data.Stream = &MockExchangeData{} + _ data.StreamInfo = &MockExchangeData{} + _ data.StreamSize = &MockExchangeData{} ) // NewMockExchangeDataCollection creates an data collection that will return the specified number of diff --git a/src/internal/connector/onedrive/collections_test.go b/src/internal/connector/onedrive/collections_test.go index 1fae8ee9a..3cc5dbcb5 100644 --- a/src/internal/connector/onedrive/collections_test.go +++ b/src/internal/connector/onedrive/collections_test.go @@ -996,7 +996,7 @@ func (suite *OneDriveCollectionsSuite) TestDeserializeMetadata() { ) require.NoError(t, err) - cols = append(cols, mc) + cols = append(cols, data.NotFoundRestoreCollection{Collection: mc}) } deltas, paths, err := deserializeMetadata(ctx, cols) @@ -1529,7 +1529,9 @@ func (suite *OneDriveCollectionsSuite) TestGet() { for _, baseCol := range cols { folderPath := baseCol.FullPath().String() if folderPath == metadataPath.String() { - deltas, paths, err := deserializeMetadata(ctx, []data.RestoreCollection{baseCol}) + deltas, paths, err := deserializeMetadata(ctx, []data.RestoreCollection{ + data.NotFoundRestoreCollection{Collection: baseCol}, + }) if !assert.NoError(t, err, "deserializing metadata") { continue } diff --git a/src/internal/data/data_collection.go b/src/internal/data/data_collection.go index 794b4bc16..beeffd3d7 100644 --- a/src/internal/data/data_collection.go +++ b/src/internal/data/data_collection.go @@ -68,6 +68,10 @@ type BackupCollection interface { // RestoreCollection is an extension of Collection that is used during restores. type RestoreCollection interface { Collection + // Fetch retrieves an item with the given name from the Collection if it + // exists. Items retrieved with Fetch may still appear in the channel returned + // by Items(). + Fetch(ctx context.Context, name string) (Stream, error) } // NotFoundRestoreCollection is a wrapper for a Collection that returns diff --git a/src/internal/kopia/data_collection.go b/src/internal/kopia/data_collection.go index 262ebd849..23ab824a5 100644 --- a/src/internal/kopia/data_collection.go +++ b/src/internal/kopia/data_collection.go @@ -1,8 +1,12 @@ package kopia import ( + "context" "io" + "github.com/alcionai/clues" + "github.com/kopia/kopia/fs" + "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/pkg/path" ) @@ -13,8 +17,10 @@ var ( ) type kopiaDataCollection struct { - path path.Path - streams []data.Stream + path path.Path + streams []data.Stream + snapshotRoot fs.Entry + counter ByteCounter } func (kdc *kopiaDataCollection) Items() <-chan data.Stream { @@ -35,6 +41,25 @@ func (kdc kopiaDataCollection) FullPath() path.Path { return kdc.path } +func (kdc kopiaDataCollection) Fetch( + ctx context.Context, + name string, +) (data.Stream, error) { + if kdc.snapshotRoot == nil { + return nil, clues.New("no snapshot root") + } + + p, err := kdc.FullPath().Append(name, true) + if err != nil { + return nil, clues.Wrap(err, "creating item path") + } + + // TODO(ashmrtn): We could possibly hold a reference to the folder this + // collection corresponds to, but that requires larger changes for the + // creation of these collections. + return getItemStream(ctx, p, kdc.snapshotRoot, kdc.counter) +} + type kopiaDataStream struct { reader io.ReadCloser uuid string diff --git a/src/internal/kopia/data_collection_test.go b/src/internal/kopia/data_collection_test.go index c28c54af8..7c2ed4894 100644 --- a/src/internal/kopia/data_collection_test.go +++ b/src/internal/kopia/data_collection_test.go @@ -2,14 +2,20 @@ package kopia import ( "bytes" + "context" + "errors" "io" "testing" + "github.com/kopia/kopia/fs" + "github.com/kopia/kopia/fs/virtualfs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/alcionai/corso/src/internal/connector/mockconnector" "github.com/alcionai/corso/src/internal/data" + "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/path" ) @@ -113,3 +119,172 @@ func (suite *KopiaDataCollectionUnitSuite) TestReturnsStreams() { }) } } + +// These types are needed because we check that a fs.File was returned. +// Unfortunately fs.StreamingFile and fs.File have different interfaces so we +// have to fake things. +type mockSeeker struct{} + +func (s mockSeeker) Seek(offset int64, whence int) (int64, error) { + return 0, errors.New("not implemented") +} + +type mockReader struct { + io.ReadCloser + mockSeeker +} + +func (r mockReader) Entry() (fs.Entry, error) { + return nil, errors.New("not implemented") +} + +type mockFile struct { + // Use for Entry interface. + fs.StreamingFile + r io.ReadCloser +} + +func (f *mockFile) Open(ctx context.Context) (fs.Reader, error) { + return mockReader{ReadCloser: f.r}, nil +} + +func (suite *KopiaDataCollectionUnitSuite) TestFetch() { + var ( + tenant = "a-tenant" + user = "a-user" + service = path.ExchangeService.String() + category = path.EmailCategory + folder1 = "folder1" + folder2 = "folder2" + + noErrFileName = "noError" + errFileName = "error" + + noErrFileData = "foo bar baz" + + errReader = &mockconnector.MockExchangeData{ + ReadErr: assert.AnError, + } + ) + + // Needs to be a function so we can switch the serialization version as + // needed. + getLayout := func(serVersion uint32) fs.Entry { + return virtualfs.NewStaticDirectory(encodeAsPath(tenant), []fs.Entry{ + virtualfs.NewStaticDirectory(encodeAsPath(service), []fs.Entry{ + virtualfs.NewStaticDirectory(encodeAsPath(user), []fs.Entry{ + virtualfs.NewStaticDirectory(encodeAsPath(category.String()), []fs.Entry{ + virtualfs.NewStaticDirectory(encodeAsPath(folder1), []fs.Entry{ + virtualfs.NewStaticDirectory(encodeAsPath(folder2), []fs.Entry{ + &mockFile{ + StreamingFile: virtualfs.StreamingFileFromReader( + encodeAsPath(noErrFileName), + nil, + ), + r: newBackupStreamReader( + serVersion, + io.NopCloser(bytes.NewReader([]byte(noErrFileData))), + ), + }, + &mockFile{ + StreamingFile: virtualfs.StreamingFileFromReader( + encodeAsPath(errFileName), + nil, + ), + r: newBackupStreamReader( + serVersion, + errReader.ToReader(), + ), + }, + }), + }), + }), + }), + }), + }) + } + + b := path.Builder{}.Append(folder1, folder2) + pth, err := b.ToDataLayerExchangePathForCategory( + tenant, + user, + category, + false, + ) + require.NoError(suite.T(), err) + + table := []struct { + name string + inputName string + inputSerializationVersion uint32 + expectedData []byte + lookupErr assert.ErrorAssertionFunc + readErr assert.ErrorAssertionFunc + notFoundErr bool + }{ + { + name: "FileFound_NoError", + inputName: noErrFileName, + inputSerializationVersion: serializationVersion, + expectedData: []byte(noErrFileData), + lookupErr: assert.NoError, + readErr: assert.NoError, + }, + { + name: "FileFound_ReadError", + inputName: errFileName, + inputSerializationVersion: serializationVersion, + lookupErr: assert.NoError, + readErr: assert.Error, + }, + { + name: "FileFound_VersionError", + inputName: noErrFileName, + inputSerializationVersion: serializationVersion + 1, + lookupErr: assert.NoError, + readErr: assert.Error, + }, + { + name: "FileNotFound", + inputName: "foo", + inputSerializationVersion: serializationVersion + 1, + lookupErr: assert.Error, + notFoundErr: true, + }, + } + for _, test := range table { + suite.Run(test.name, func() { + ctx, flush := tester.NewContext() + defer flush() + + t := suite.T() + + root := getLayout(test.inputSerializationVersion) + c := &i64counter{} + + col := &kopiaDataCollection{path: pth, snapshotRoot: root, counter: c} + + s, err := col.Fetch(ctx, test.inputName) + + test.lookupErr(t, err) + + if err != nil { + if test.notFoundErr { + assert.ErrorIs(t, err, data.ErrNotFound) + } + + return + } + + fileData, err := io.ReadAll(s.ToReader()) + + test.readErr(t, err) + + if err != nil { + return + } + + assert.Equal(t, test.expectedData, fileData) + }) + } +} diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index d49241d36..c829b5ccf 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -412,13 +412,19 @@ func (w Wrapper) RestoreMultipleItems( c, ok := cols[parentPath.ShortRef()] if !ok { - cols[parentPath.ShortRef()] = &kopiaDataCollection{path: parentPath} + cols[parentPath.ShortRef()] = &kopiaDataCollection{ + path: parentPath, + snapshotRoot: snapshotRoot, + counter: bcounter, + } c = cols[parentPath.ShortRef()] } c.streams = append(c.streams, ds) } + // Can't use the maps package to extract the values because we need to convert + // from *kopiaDataCollection to data.RestoreCollection too. res := make([]data.RestoreCollection, 0, len(cols)) for _, c := range cols { res = append(res, c) diff --git a/src/internal/operations/backup_integration_test.go b/src/internal/operations/backup_integration_test.go index 83033ab3b..d20be4c31 100644 --- a/src/internal/operations/backup_integration_test.go +++ b/src/internal/operations/backup_integration_test.go @@ -410,7 +410,7 @@ func buildCollections( mc.Data[i] = c.items[i].data } - collections = append(collections, mc) + collections = append(collections, data.NotFoundRestoreCollection{Collection: mc}) } return collections diff --git a/src/internal/operations/manifests_test.go b/src/internal/operations/manifests_test.go index d30f72726..24c948320 100644 --- a/src/internal/operations/manifests_test.go +++ b/src/internal/operations/manifests_test.go @@ -435,7 +435,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { getMeta bool assertErr assert.ErrorAssertionFunc assertB assert.BoolAssertionFunc - expectDCS []data.RestoreCollection + expectDCS []mockColl expectNilMans bool }{ { @@ -539,7 +539,7 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { name: "man missing backup id", mr: mockManifestRestorer{ mockRestorer: mockRestorer{collsByID: map[string][]data.RestoreCollection{ - "id": {mockColl{id: "id_coll"}}, + "id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "id_coll"}}}, }}, mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "id", "", "")}, }, @@ -566,8 +566,8 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { name: "one complete, one incomplete", mr: mockManifestRestorer{ mockRestorer: mockRestorer{collsByID: map[string][]data.RestoreCollection{ - "id": {mockColl{id: "id_coll"}}, - "incmpl_id": {mockColl{id: "incmpl_id_coll"}}, + "id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "id_coll"}}}, + "incmpl_id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "incmpl_id_coll"}}}, }}, mans: []*kopia.ManifestEntry{ makeMan(path.EmailCategory, "id", "", "bid"), @@ -579,13 +579,13 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { getMeta: true, assertErr: assert.NoError, assertB: assert.True, - expectDCS: []data.RestoreCollection{mockColl{id: "id_coll"}}, + expectDCS: []mockColl{{id: "id_coll"}}, }, { name: "single valid man", mr: mockManifestRestorer{ mockRestorer: mockRestorer{collsByID: map[string][]data.RestoreCollection{ - "id": {mockColl{id: "id_coll"}}, + "id": {data.NotFoundRestoreCollection{Collection: mockColl{id: "id_coll"}}}, }}, mans: []*kopia.ManifestEntry{makeMan(path.EmailCategory, "id", "", "bid")}, }, @@ -594,14 +594,14 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { getMeta: true, assertErr: assert.NoError, assertB: assert.True, - expectDCS: []data.RestoreCollection{mockColl{id: "id_coll"}}, + expectDCS: []mockColl{{id: "id_coll"}}, }, { name: "multiple valid mans", mr: mockManifestRestorer{ mockRestorer: mockRestorer{collsByID: map[string][]data.RestoreCollection{ - "mail": {mockColl{id: "mail_coll"}}, - "contact": {mockColl{id: "contact_coll"}}, + "mail": {data.NotFoundRestoreCollection{Collection: mockColl{id: "mail_coll"}}}, + "contact": {data.NotFoundRestoreCollection{Collection: mockColl{id: "contact_coll"}}}, }}, mans: []*kopia.ManifestEntry{ makeMan(path.EmailCategory, "mail", "", "bid"), @@ -613,9 +613,9 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { getMeta: true, assertErr: assert.NoError, assertB: assert.True, - expectDCS: []data.RestoreCollection{ - mockColl{id: "mail_coll"}, - mockColl{id: "contact_coll"}, + expectDCS: []mockColl{ + {id: "mail_coll"}, + {id: "contact_coll"}, }, }, { @@ -658,16 +658,33 @@ func (suite *OperationsManifestsUnitSuite) TestProduceManifestsAndMetadata() { expect, got := []string{}, []string{} for _, dc := range test.expectDCS { - mc, ok := dc.(mockColl) - assert.True(t, ok) - - expect = append(expect, mc.id) + expect = append(expect, dc.id) } for _, dc := range dcs { - mc, ok := dc.(mockColl) - assert.True(t, ok) + if !assert.IsTypef( + t, + data.NotFoundRestoreCollection{}, + dc, + "unexpected type returned [%T]", + dc, + ) { + continue + } + tmp := dc.(data.NotFoundRestoreCollection) + + if !assert.IsTypef( + t, + mockColl{}, + tmp.Collection, + "unexpected type returned [%T]", + tmp.Collection, + ) { + continue + } + + mc := tmp.Collection.(mockColl) got = append(got, mc.id) } diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index a3974ae64..42c78bbbe 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -62,7 +62,11 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { bytesRead: &stats.ByteCounter{ NumBytes: 42, }, - cs: []data.RestoreCollection{&mockconnector.MockExchangeDataCollection{}}, + cs: []data.RestoreCollection{ + data.NotFoundRestoreCollection{ + Collection: &mockconnector.MockExchangeDataCollection{}, + }, + }, gc: &support.ConnectorOperationStatus{ ObjectCount: 1, Successful: 1,