From 6e9bd634e45778cb4af47cf6a11ce2ab92126a0c Mon Sep 17 00:00:00 2001 From: Vaibhav Kamra Date: Thu, 7 Jul 2022 18:41:40 -0700 Subject: [PATCH] Create RestorePoint and RestorePointDetails (#290) Update the backup operation to create RestorePoint and RestorePointDetails models in the repository Add modelstore to the operation to allow backup/restore operations to update/query for corso models Closes #268 --- src/internal/connector/exchange/message.go | 30 +++++++ .../connector/exchange/message_test.go | 85 +++++++++++++++++++ .../connector/exchange_data_collection.go | 17 ++++ src/internal/connector/graph_connector.go | 3 +- .../mockconnector/mock_data_collection.go | 7 ++ src/internal/kopia/wrapper.go | 43 +++++++--- src/internal/kopia/wrapper_test.go | 24 ++++-- src/internal/operations/backup.go | 27 +++++- src/internal/operations/backup_test.go | 17 +++- src/internal/operations/operation.go | 18 ++-- src/internal/operations/operation_test.go | 11 ++- src/internal/operations/restore.go | 3 +- src/internal/operations/restore_test.go | 11 ++- src/pkg/repository/repository.go | 65 ++++++++------ 14 files changed, 300 insertions(+), 61 deletions(-) create mode 100644 src/internal/connector/exchange/message.go create mode 100644 src/internal/connector/exchange/message_test.go diff --git a/src/internal/connector/exchange/message.go b/src/internal/connector/exchange/message.go new file mode 100644 index 000000000..d87c8797c --- /dev/null +++ b/src/internal/connector/exchange/message.go @@ -0,0 +1,30 @@ +package exchange + +import ( + "time" + + "github.com/alcionai/corso/pkg/restorepoint" + "github.com/microsoftgraph/msgraph-sdk-go/models" +) + +func MessageInfo(msg models.Messageable) *restorepoint.ExchangeInfo { + sender := "" + subject := "" + received := time.Time{} + if msg.GetSender() != nil && + msg.GetSender().GetEmailAddress() != nil && + msg.GetSender().GetEmailAddress().GetAddress() != nil { + sender = *msg.GetSender().GetEmailAddress().GetAddress() + } + if msg.GetSubject() != nil { + subject = *msg.GetSubject() + } + if msg.GetReceivedDateTime() != nil { + received = *msg.GetReceivedDateTime() + } + return &restorepoint.ExchangeInfo{ + Sender: sender, + Subject: subject, + Received: received, + } +} diff --git a/src/internal/connector/exchange/message_test.go b/src/internal/connector/exchange/message_test.go new file mode 100644 index 000000000..80feb615f --- /dev/null +++ b/src/internal/connector/exchange/message_test.go @@ -0,0 +1,85 @@ +package exchange + +import ( + "testing" + "time" + + "github.com/alcionai/corso/pkg/restorepoint" + "github.com/microsoftgraph/msgraph-sdk-go/models" + "github.com/stretchr/testify/suite" +) + +type MessageSuite struct { + suite.Suite +} + +func TestMessageSuite(t *testing.T) { + suite.Run(t, &MessageSuite{}) +} + +func (suite *MessageSuite) TestMessageInfo() { + tests := []struct { + name string + msgAndRP func() (models.Messageable, *restorepoint.ExchangeInfo) + }{ + { + name: "Empty message", + msgAndRP: func() (models.Messageable, *restorepoint.ExchangeInfo) { + return models.NewMessage(), &restorepoint.ExchangeInfo{} + }, + }, + { + name: "Just sender", + msgAndRP: func() (models.Messageable, *restorepoint.ExchangeInfo) { + sender := "foo@bar.com" + sr := models.NewRecipient() + sea := models.NewEmailAddress() + msg := models.NewMessage() + sea.SetAddress(&sender) + sr.SetEmailAddress(sea) + msg.SetSender(sr) + return msg, &restorepoint.ExchangeInfo{Sender: sender} + }, + }, + { + name: "Just subject", + msgAndRP: func() (models.Messageable, *restorepoint.ExchangeInfo) { + subject := "Hello world" + msg := models.NewMessage() + msg.SetSubject(&subject) + return msg, &restorepoint.ExchangeInfo{Subject: subject} + }, + }, + { + name: "Just receivedtime", + msgAndRP: func() (models.Messageable, *restorepoint.ExchangeInfo) { + now := time.Now() + msg := models.NewMessage() + msg.SetReceivedDateTime(&now) + return msg, &restorepoint.ExchangeInfo{Received: now} + }, + }, + { + name: "All fields", + msgAndRP: func() (models.Messageable, *restorepoint.ExchangeInfo) { + sender := "foo@bar.com" + subject := "Hello world" + now := time.Now() + sr := models.NewRecipient() + sea := models.NewEmailAddress() + msg := models.NewMessage() + sea.SetAddress(&sender) + sr.SetEmailAddress(sea) + msg.SetSender(sr) + msg.SetSubject(&subject) + msg.SetReceivedDateTime(&now) + return msg, &restorepoint.ExchangeInfo{Sender: sender, Subject: subject, Received: now} + }, + }} + for _, tt := range tests { + suite.T().Run(tt.name, func(t *testing.T) { + msg, expected := tt.msgAndRP() + suite.Equal(expected, MessageInfo(msg)) + }) + } +} diff --git a/src/internal/connector/exchange_data_collection.go b/src/internal/connector/exchange_data_collection.go index 49138f077..019d649cd 100644 --- a/src/internal/connector/exchange_data_collection.go +++ b/src/internal/connector/exchange_data_collection.go @@ -3,6 +3,8 @@ package connector import ( "bytes" "io" + + "github.com/alcionai/corso/pkg/restorepoint" ) const ( @@ -35,6 +37,16 @@ type DataStream interface { UUID() string } +// DataStreamInfo is used to provide service specific +// information about the DataStream +type DataStreamInfo interface { + Info() restorepoint.ItemInfo +} + +var _ DataCollection = &ExchangeDataCollection{} +var _ DataStream = &ExchangeData{} +var _ DataStreamInfo = &ExchangeData{} + // ExchangeDataCollection represents exchange mailbox // data for a single user. // @@ -89,6 +101,7 @@ type ExchangeData struct { // going forward. Using []byte for now but I assume we'll have // some structured type in here (serialization to []byte can be done in `Read`) message []byte + info *restorepoint.ExchangeInfo } func (ed *ExchangeData) UUID() string { @@ -98,3 +111,7 @@ func (ed *ExchangeData) UUID() string { func (ed *ExchangeData) ToReader() io.ReadCloser { return io.NopCloser(bytes.NewReader(ed.message)) } + +func (ed *ExchangeData) Info() restorepoint.ItemInfo { + return restorepoint.ItemInfo{Exchange: ed.info} +} diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 782f50c77..e4418a7b6 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -17,6 +17,7 @@ import ( msfolder "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders" "github.com/pkg/errors" + "github.com/alcionai/corso/internal/connector/exchange" "github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/pkg/account" "github.com/alcionai/corso/pkg/logger" @@ -396,7 +397,7 @@ func (gc *GraphConnector) serializeMessageIteratorCallback( } if byteArray != nil { - edc.PopulateCollection(&ExchangeData{id: *message.GetId(), message: byteArray}) + edc.PopulateCollection(&ExchangeData{id: *message.GetId(), message: byteArray, info: exchange.MessageInfo(message)}) } return true diff --git a/src/internal/connector/mockconnector/mock_data_collection.go b/src/internal/connector/mockconnector/mock_data_collection.go index 88dd76383..2d04c0f2a 100644 --- a/src/internal/connector/mockconnector/mock_data_collection.go +++ b/src/internal/connector/mockconnector/mock_data_collection.go @@ -3,10 +3,12 @@ package mockconnector import ( "bytes" "io" + "time" "github.com/google/uuid" "github.com/alcionai/corso/internal/connector" + "github.com/alcionai/corso/pkg/restorepoint" ) // MockExchangeDataCollection represents a mock exchange mailbox @@ -18,6 +20,7 @@ type MockExchangeDataCollection struct { var ( _ connector.DataCollection = &MockExchangeDataCollection{} _ connector.DataStream = &MockExchangeData{} + _ connector.DataStreamInfo = &MockExchangeData{} ) // NewMockExchangeDataCollection creates an data collection that will return the specified number of @@ -65,3 +68,7 @@ func (med *MockExchangeData) UUID() string { func (med *MockExchangeData) ToReader() io.ReadCloser { return med.Reader } + +func (med *MockExchangeData) Info() restorepoint.ItemInfo { + return restorepoint.ItemInfo{Exchange: &restorepoint.ExchangeInfo{Sender: "foo@bar.com", Subject: "Hello world!", Received: time.Now()}} +} diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index c7f2cca21..0e2ea15a4 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -2,6 +2,7 @@ package kopia import ( "context" + "path" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/virtualfs" @@ -13,6 +14,7 @@ import ( "github.com/pkg/errors" "github.com/alcionai/corso/internal/connector" + "github.com/alcionai/corso/pkg/restorepoint" ) const ( @@ -77,6 +79,7 @@ func (w *Wrapper) Close(ctx context.Context) error { // DataCollection. func getStreamItemFunc( collection connector.DataCollection, + details *restorepoint.Details, ) func(context.Context, func(context.Context, fs.Entry) error) error { return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error { items := collection.Items() @@ -88,11 +91,19 @@ func getStreamItemFunc( if !ok { return nil } + ei, ok := e.(connector.DataStreamInfo) + if !ok { + return errors.New("item does not implement DataStreamInfo") + } entry := virtualfs.StreamingFileFromReader(e.UUID(), e.ToReader()) if err := cb(ctx, entry); err != nil { return errors.Wrap(err, "executing callback") } + + // Populate RestorePointDetails + ep := append(collection.FullPath(), e.UUID()) + details.Add(path.Join(ep...), ei.Info()) } } } @@ -101,7 +112,7 @@ func getStreamItemFunc( // buildKopiaDirs recursively builds a directory hierarchy from the roots up. // Returned directories are either virtualfs.StreamingDirectory or // virtualfs.staticDirectory. -func buildKopiaDirs(dirName string, dir *treeMap) (fs.Directory, error) { +func buildKopiaDirs(dirName string, dir *treeMap, details *restorepoint.Details) (fs.Directory, error) { // Don't support directories that have both a DataCollection and a set of // static child directories. if dir.collection != nil && len(dir.childDirs) > 0 { @@ -109,7 +120,7 @@ func buildKopiaDirs(dirName string, dir *treeMap) (fs.Directory, error) { } if dir.collection != nil { - return virtualfs.NewStreamingDirectory(dirName, getStreamItemFunc(dir.collection)), nil + return virtualfs.NewStreamingDirectory(dirName, getStreamItemFunc(dir.collection, details)), nil } // Need to build the directory tree from the leaves up because intermediate @@ -117,7 +128,7 @@ func buildKopiaDirs(dirName string, dir *treeMap) (fs.Directory, error) { childDirs := []fs.Entry{} for childName, childDir := range dir.childDirs { - child, err := buildKopiaDirs(childName, childDir) + child, err := buildKopiaDirs(childName, childDir, details) if err != nil { return nil, err } @@ -143,7 +154,7 @@ func newTreeMap() *treeMap { // ancestor of the streams and uses virtualfs.StaticDirectory for internal nodes // in the hierarchy. Leaf nodes are virtualfs.StreamingDirectory with the given // DataCollections. -func inflateDirTree(ctx context.Context, collections []connector.DataCollection) (fs.Directory, error) { +func inflateDirTree(ctx context.Context, collections []connector.DataCollection, details *restorepoint.Details) (fs.Directory, error) { roots := make(map[string]*treeMap) for _, s := range collections { @@ -202,7 +213,7 @@ func inflateDirTree(ctx context.Context, collections []connector.DataCollection) var res fs.Directory for dirName, dir := range roots { - tmp, err := buildKopiaDirs(dirName, dir) + tmp, err := buildKopiaDirs(dirName, dir, details) if err != nil { return nil, err } @@ -216,27 +227,30 @@ func inflateDirTree(ctx context.Context, collections []connector.DataCollection) func (w Wrapper) BackupCollections( ctx context.Context, collections []connector.DataCollection, -) (*BackupStats, error) { +) (*BackupStats, *restorepoint.Details, error) { if w.c == nil { - return nil, errNotConnected + return nil, nil, errNotConnected } - dirTree, err := inflateDirTree(ctx, collections) + details := &restorepoint.Details{} + + dirTree, err := inflateDirTree(ctx, collections, details) if err != nil { - return nil, errors.Wrap(err, "building kopia directories") + return nil, nil, errors.Wrap(err, "building kopia directories") } - stats, err := w.makeSnapshotWithRoot(ctx, dirTree) + stats, err := w.makeSnapshotWithRoot(ctx, dirTree, details) if err != nil { - return nil, err + return nil, nil, err } - return stats, nil + return stats, details, nil } func (w Wrapper) makeSnapshotWithRoot( ctx context.Context, root fs.Directory, + details *restorepoint.Details, ) (*BackupStats, error) { si := snapshot.SourceInfo{ Host: kTestHost, @@ -261,6 +275,9 @@ func (w Wrapper) makeSnapshotWithRoot( return nil, errors.Wrap(err, "uploading data") } + // TODO: Persist RestorePointDetails here + // Create and store RestorePoint + if _, err := snapshot.SaveSnapshot(ctx, rw, man); err != nil { return nil, errors.Wrap(err, "saving snapshot") } @@ -269,6 +286,8 @@ func (w Wrapper) makeSnapshotWithRoot( return nil, errors.Wrap(err, "flushing writer") } + // TODO: Return RestorePoint ID in stats + res := manifestToStats(man) return &res, nil } diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 49c9640c9..d84a97163 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -16,6 +16,7 @@ import ( "github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/internal/connector/mockconnector" ctesting "github.com/alcionai/corso/internal/testing" + "github.com/alcionai/corso/pkg/restorepoint" ) const ( @@ -72,6 +73,8 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() { user2: 42, } + details := &restorepoint.Details{} + collections := []connector.DataCollection{ mockconnector.NewMockExchangeDataCollection( []string{tenant, user1, emails}, @@ -91,7 +94,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() { // - user2 // - emails // - 42 separate files - dirTree, err := inflateDirTree(ctx, collections) + dirTree, err := inflateDirTree(ctx, collections, details) require.NoError(suite.T(), err) assert.Equal(suite.T(), dirTree.Name(), tenant) @@ -116,6 +119,13 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() { require.NoError(suite.T(), err) assert.Len(suite.T(), emailFiles, expectedFileCount[entry.Name()]) } + + totalFileCount := 0 + for _, c := range expectedFileCount { + totalFileCount += c + } + + assert.Len(suite.T(), details.Entries, totalFileCount) } func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() { @@ -126,6 +136,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() { expectedFileCount := 42 + details := &restorepoint.Details{} collections := []connector.DataCollection{ mockconnector.NewMockExchangeDataCollection( []string{emails}, @@ -136,7 +147,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() { // Returned directory structure should look like: // - emails // - 42 separate files - dirTree, err := inflateDirTree(ctx, collections) + dirTree, err := inflateDirTree(ctx, collections, details) require.NoError(suite.T(), err) assert.Equal(suite.T(), dirTree.Name(), emails) @@ -204,7 +215,8 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() { ctx := context.Background() suite.T().Run(test.name, func(t *testing.T) { - _, err := inflateDirTree(ctx, test.layout) + details := &restorepoint.Details{} + _, err := inflateDirTree(ctx, test.layout, details) assert.Error(t, err) }) } @@ -260,13 +272,14 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { ), } - stats, err := suite.w.BackupCollections(suite.ctx, collections) + stats, rp, err := suite.w.BackupCollections(suite.ctx, collections) assert.NoError(t, err) assert.Equal(t, stats.TotalFileCount, 47) assert.Equal(t, stats.TotalDirectoryCount, 5) assert.Equal(t, stats.IgnoredErrorCount, 0) assert.Equal(t, stats.ErrorCount, 0) assert.False(t, stats.Incomplete) + assert.Len(t, rp.Entries, 47) } type KopiaSimpleRepoIntegrationSuite struct { @@ -312,13 +325,14 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { }, } - stats, err := suite.w.BackupCollections(suite.ctx, collections) + stats, rp, err := suite.w.BackupCollections(suite.ctx, collections) require.NoError(t, err) require.Equal(t, stats.ErrorCount, 0) require.Equal(t, stats.TotalFileCount, 1) require.Equal(t, stats.TotalDirectoryCount, 3) require.Equal(t, stats.IgnoredErrorCount, 0) require.False(t, stats.Incomplete) + assert.Len(t, rp.Entries, 1) suite.snapshotID = manifest.ID(stats.SnapshotID) } diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 91435bb26..ac8764ad4 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -10,6 +10,7 @@ import ( "github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/kopia" "github.com/alcionai/corso/pkg/account" + "github.com/alcionai/corso/pkg/restorepoint" "github.com/alcionai/corso/pkg/selectors" ) @@ -36,11 +37,12 @@ func NewBackupOperation( ctx context.Context, opts Options, kw *kopia.Wrapper, + ms *kopia.ModelStore, acct account.Account, selector selectors.Selector, ) (BackupOperation, error) { op := BackupOperation{ - operation: newOperation(opts, kw), + operation: newOperation(opts, kw, ms), Selectors: selector, Version: "v0", account: acct, @@ -90,12 +92,33 @@ func (op *BackupOperation) Run(ctx context.Context) error { stats.gc = gc.Status() // hand the results to the consumer - stats.k, err = op.kopia.BackupCollections(ctx, cs) + var details *restorepoint.Details + stats.k, details, err = op.kopia.BackupCollections(ctx, cs) if err != nil { stats.writeErr = err return errors.Wrap(err, "backing up service data") } + err = op.createRestorePoint(ctx, stats.k.SnapshotID, details) + if err != nil { + stats.writeErr = err + return err + } + + return nil +} + +func (op *BackupOperation) createRestorePoint(ctx context.Context, snapID string, details *restorepoint.Details) error { + err := op.modelStore.Put(ctx, kopia.RestorePointDetailsModel, details) + if err != nil { + return errors.Wrap(err, "creating restorepointdetails model") + } + + err = op.modelStore.Put(ctx, kopia.RestorePointModel, + restorepoint.New(snapID, string(details.ModelStoreID))) + if err != nil { + return errors.Wrap(err, "creating restorepoint model") + } return nil } diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index a4c6bab52..da37ae485 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -37,6 +37,7 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() { var ( kw = &kopia.Wrapper{} + ms = &kopia.ModelStore{} acct = account.Account{} now = time.Now() stats = backupStats{ @@ -51,7 +52,7 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() { } ) - op, err := NewBackupOperation(ctx, Options{}, kw, acct, selectors.Selector{}) + op, err := NewBackupOperation(ctx, Options{}, kw, ms, acct, selectors.Selector{}) require.NoError(t, err) op.persistResults(now, &stats) @@ -92,6 +93,7 @@ func (suite *BackupOpIntegrationSuite) SetupSuite() { func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { kw := &kopia.Wrapper{} + ms := &kopia.ModelStore{} acct, err := ctesting.NewM365Account() require.NoError(suite.T(), err) @@ -99,12 +101,14 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { name string opts Options kw *kopia.Wrapper + ms *kopia.ModelStore acct account.Account targets []string errCheck assert.ErrorAssertionFunc }{ - {"good", Options{}, kw, acct, nil, assert.NoError}, - {"missing kopia", Options{}, nil, acct, nil, assert.Error}, + {"good", Options{}, kw, ms, acct, nil, assert.NoError}, + {"missing kopia", Options{}, nil, ms, acct, nil, assert.Error}, + {"missing modelstore", Options{}, kw, nil, acct, nil, assert.Error}, } for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { @@ -112,6 +116,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { context.Background(), Options{}, test.kw, + test.ms, test.acct, selectors.Selector{}) test.errCheck(t, err) @@ -143,6 +148,11 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() { w, err := kopia.NewWrapper(k) require.NoError(t, err) + defer w.Close(ctx) + + ms, err := kopia.NewModelStore(k) + require.NoError(t, err) + defer ms.Close(ctx) sel := selectors.NewExchangeBackup() sel.Include(sel.Users(m365User)) @@ -151,6 +161,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() { ctx, Options{}, w, + ms, acct, sel.Selector) require.NoError(t, err) diff --git a/src/internal/operations/operation.go b/src/internal/operations/operation.go index 724bfb37f..c92589643 100644 --- a/src/internal/operations/operation.go +++ b/src/internal/operations/operation.go @@ -31,7 +31,8 @@ type operation struct { Options Options `json:"options"` Status opStatus `json:"status"` - kopia *kopia.Wrapper + kopia *kopia.Wrapper + modelStore *kopia.ModelStore } // Options configure some parameters of the operation @@ -43,13 +44,15 @@ type Options struct { func newOperation( opts Options, kw *kopia.Wrapper, + ms *kopia.ModelStore, ) operation { return operation{ - ID: uuid.New(), - CreatedAt: time.Now(), - Options: opts, - kopia: kw, - Status: InProgress, + ID: uuid.New(), + CreatedAt: time.Now(), + Options: opts, + kopia: kw, + modelStore: ms, + Status: InProgress, } } @@ -57,6 +60,9 @@ func (op operation) validate() error { if op.kopia == nil { return errors.New("missing kopia connection") } + if op.modelStore == nil { + return errors.New("missing modelstore") + } return nil } diff --git a/src/internal/operations/operation_test.go b/src/internal/operations/operation_test.go index 821095fe5..23452de3b 100644 --- a/src/internal/operations/operation_test.go +++ b/src/internal/operations/operation_test.go @@ -19,23 +19,26 @@ func TestOperationSuite(t *testing.T) { func (suite *OperationSuite) TestNewOperation() { t := suite.T() - op := newOperation(Options{}, nil) + op := newOperation(Options{}, nil, nil) assert.NotNil(t, op.ID) } func (suite *OperationSuite) TestOperation_Validate() { kwStub := &kopia.Wrapper{} + msStub := &kopia.ModelStore{} table := []struct { name string kw *kopia.Wrapper + ms *kopia.ModelStore errCheck assert.ErrorAssertionFunc }{ - {"good", kwStub, assert.NoError}, - {"missing kopia", nil, assert.Error}, + {"good", kwStub, msStub, assert.NoError}, + {"missing kopia wrapper", nil, msStub, assert.Error}, + {"missing kopia modelstore", kwStub, nil, assert.Error}, } for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { - op := newOperation(Options{}, test.kw) + op := newOperation(Options{}, test.kw, test.ms) test.errCheck(t, op.validate()) }) } diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index 44eb88332..c2e62e805 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -35,12 +35,13 @@ func NewRestoreOperation( ctx context.Context, opts Options, kw *kopia.Wrapper, + ms *kopia.ModelStore, acct account.Account, restorePointID string, targets []string, ) (RestoreOperation, error) { op := RestoreOperation{ - operation: newOperation(opts, kw), + operation: newOperation(opts, kw, ms), RestorePointID: restorePointID, Targets: targets, Version: "v0", diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index 6d653b12c..0cf111b23 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -37,6 +37,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { var ( kw = &kopia.Wrapper{} + ms = &kopia.ModelStore{} acct = account.Account{} now = time.Now() stats = restoreStats{ @@ -49,7 +50,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { } ) - op, err := NewRestoreOperation(ctx, Options{}, kw, acct, "foo", nil) + op, err := NewRestoreOperation(ctx, Options{}, kw, ms, acct, "foo", nil) require.NoError(t, err) op.persistResults(now, &stats) @@ -85,6 +86,7 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() { func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { kw := &kopia.Wrapper{} + ms := &kopia.ModelStore{} acct, err := ctesting.NewM365Account() require.NoError(suite.T(), err) @@ -92,12 +94,14 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { name string opts Options kw *kopia.Wrapper + ms *kopia.ModelStore acct account.Account targets []string errCheck assert.ErrorAssertionFunc }{ - {"good", Options{}, kw, acct, nil, assert.NoError}, - {"missing kopia", Options{}, nil, acct, nil, assert.Error}, + {"good", Options{}, kw, ms, acct, nil, assert.NoError}, + {"missing kopia", Options{}, nil, ms, acct, nil, assert.Error}, + {"missing modelstore", Options{}, kw, nil, acct, nil, assert.Error}, } for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { @@ -105,6 +109,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { context.Background(), Options{}, test.kw, + test.ms, test.acct, "restore-point-id", nil) diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 5428e3fd5..faef2ebc2 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -20,9 +20,10 @@ type Repository struct { CreatedAt time.Time Version string // in case of future breaking changes - Account account.Account // the user's m365 account connection details - Storage storage.Storage // the storage provider details and configuration - dataLayer *kopia.Wrapper + Account account.Account // the user's m365 account connection details + Storage storage.Storage // the storage provider details and configuration + dataLayer *kopia.Wrapper + modelStore *kopia.ModelStore } // Initialize will: @@ -42,7 +43,7 @@ func Initialize( if err := kopiaRef.Initialize(ctx); err != nil { return nil, err } - // kopiaRef comes with a count of 1 and NewWrapper bumps it again so safe + // kopiaRef comes with a count of 1 and NewWrapper/NewModelStore bumps it again so safe // to close here. defer kopiaRef.Close(ctx) @@ -51,12 +52,18 @@ func Initialize( return nil, err } + ms, err := kopia.NewModelStore(kopiaRef) + if err != nil { + return nil, err + } + r := Repository{ - ID: uuid.New(), - Version: "v1", - Account: acct, - Storage: storage, - dataLayer: w, + ID: uuid.New(), + Version: "v1", + Account: acct, + Storage: storage, + dataLayer: w, + modelStore: ms, } return &r, nil } @@ -75,7 +82,7 @@ func Connect( if err := kopiaRef.Connect(ctx); err != nil { return nil, err } - // kopiaRef comes with a count of 1 and NewWrapper bumps it again so safe + // kopiaRef comes with a count of 1 and NewWrapper/NewModelStore bumps it again so safe // to close here. defer kopiaRef.Close(ctx) @@ -84,29 +91,37 @@ func Connect( return nil, err } + ms, err := kopia.NewModelStore(kopiaRef) + if err != nil { + return nil, err + } + // todo: ID and CreatedAt should get retrieved from a stored kopia config. r := Repository{ - Version: "v1", - Account: acct, - Storage: storage, - dataLayer: w, + Version: "v1", + Account: acct, + Storage: storage, + dataLayer: w, + modelStore: ms, } return &r, nil } func (r *Repository) Close(ctx context.Context) error { - if r.dataLayer == nil { + if r.dataLayer != nil { + err := r.dataLayer.Close(ctx) + r.dataLayer = nil + if err != nil { + return errors.Wrap(err, "closing corso DataLayer") + } + } + + if r.modelStore == nil { return nil } - - err := r.dataLayer.Close(ctx) - r.dataLayer = nil - - if err != nil { - return errors.Wrap(err, "closing corso Repository") - } - - return nil + err := r.modelStore.Close(ctx) + r.modelStore = nil + return errors.Wrap(err, "closing corso ModelStore") } // NewBackup generates a backupOperation runner. @@ -115,6 +130,7 @@ func (r Repository) NewBackup(ctx context.Context, selector selectors.Selector) ctx, operations.Options{}, r.dataLayer, + r.modelStore, r.Account, selector) } @@ -125,6 +141,7 @@ func (r Repository) NewRestore(ctx context.Context, restorePointID string, targe ctx, operations.Options{}, r.dataLayer, + r.modelStore, r.Account, restorePointID, targets)