Create RestorePoint and RestorePointDetails (#290)

Update the backup operation to create RestorePoint and RestorePointDetails models in the repository

Add modelstore to the operation to allow backup/restore operations to update/query for corso models

Closes #268
This commit is contained in:
Vaibhav Kamra 2022-07-07 18:41:40 -07:00 committed by GitHub
parent 9d21d65bc7
commit 6e9bd634e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 300 additions and 61 deletions

View File

@ -0,0 +1,30 @@
package exchange
import (
"time"
"github.com/alcionai/corso/pkg/restorepoint"
"github.com/microsoftgraph/msgraph-sdk-go/models"
)
func MessageInfo(msg models.Messageable) *restorepoint.ExchangeInfo {
sender := ""
subject := ""
received := time.Time{}
if msg.GetSender() != nil &&
msg.GetSender().GetEmailAddress() != nil &&
msg.GetSender().GetEmailAddress().GetAddress() != nil {
sender = *msg.GetSender().GetEmailAddress().GetAddress()
}
if msg.GetSubject() != nil {
subject = *msg.GetSubject()
}
if msg.GetReceivedDateTime() != nil {
received = *msg.GetReceivedDateTime()
}
return &restorepoint.ExchangeInfo{
Sender: sender,
Subject: subject,
Received: received,
}
}

View File

@ -0,0 +1,85 @@
package exchange
import (
"testing"
"time"
"github.com/alcionai/corso/pkg/restorepoint"
"github.com/microsoftgraph/msgraph-sdk-go/models"
"github.com/stretchr/testify/suite"
)
type MessageSuite struct {
suite.Suite
}
func TestMessageSuite(t *testing.T) {
suite.Run(t, &MessageSuite{})
}
func (suite *MessageSuite) TestMessageInfo() {
tests := []struct {
name string
msgAndRP func() (models.Messageable, *restorepoint.ExchangeInfo)
}{
{
name: "Empty message",
msgAndRP: func() (models.Messageable, *restorepoint.ExchangeInfo) {
return models.NewMessage(), &restorepoint.ExchangeInfo{}
},
},
{
name: "Just sender",
msgAndRP: func() (models.Messageable, *restorepoint.ExchangeInfo) {
sender := "foo@bar.com"
sr := models.NewRecipient()
sea := models.NewEmailAddress()
msg := models.NewMessage()
sea.SetAddress(&sender)
sr.SetEmailAddress(sea)
msg.SetSender(sr)
return msg, &restorepoint.ExchangeInfo{Sender: sender}
},
},
{
name: "Just subject",
msgAndRP: func() (models.Messageable, *restorepoint.ExchangeInfo) {
subject := "Hello world"
msg := models.NewMessage()
msg.SetSubject(&subject)
return msg, &restorepoint.ExchangeInfo{Subject: subject}
},
},
{
name: "Just receivedtime",
msgAndRP: func() (models.Messageable, *restorepoint.ExchangeInfo) {
now := time.Now()
msg := models.NewMessage()
msg.SetReceivedDateTime(&now)
return msg, &restorepoint.ExchangeInfo{Received: now}
},
},
{
name: "All fields",
msgAndRP: func() (models.Messageable, *restorepoint.ExchangeInfo) {
sender := "foo@bar.com"
subject := "Hello world"
now := time.Now()
sr := models.NewRecipient()
sea := models.NewEmailAddress()
msg := models.NewMessage()
sea.SetAddress(&sender)
sr.SetEmailAddress(sea)
msg.SetSender(sr)
msg.SetSubject(&subject)
msg.SetReceivedDateTime(&now)
return msg, &restorepoint.ExchangeInfo{Sender: sender, Subject: subject, Received: now}
},
}}
for _, tt := range tests {
suite.T().Run(tt.name, func(t *testing.T) {
msg, expected := tt.msgAndRP()
suite.Equal(expected, MessageInfo(msg))
})
}
}

View File

@ -3,6 +3,8 @@ package connector
import ( import (
"bytes" "bytes"
"io" "io"
"github.com/alcionai/corso/pkg/restorepoint"
) )
const ( const (
@ -35,6 +37,16 @@ type DataStream interface {
UUID() string UUID() string
} }
// DataStreamInfo is used to provide service specific
// information about the DataStream
type DataStreamInfo interface {
Info() restorepoint.ItemInfo
}
var _ DataCollection = &ExchangeDataCollection{}
var _ DataStream = &ExchangeData{}
var _ DataStreamInfo = &ExchangeData{}
// ExchangeDataCollection represents exchange mailbox // ExchangeDataCollection represents exchange mailbox
// data for a single user. // data for a single user.
// //
@ -89,6 +101,7 @@ type ExchangeData struct {
// going forward. Using []byte for now but I assume we'll have // going forward. Using []byte for now but I assume we'll have
// some structured type in here (serialization to []byte can be done in `Read`) // some structured type in here (serialization to []byte can be done in `Read`)
message []byte message []byte
info *restorepoint.ExchangeInfo
} }
func (ed *ExchangeData) UUID() string { func (ed *ExchangeData) UUID() string {
@ -98,3 +111,7 @@ func (ed *ExchangeData) UUID() string {
func (ed *ExchangeData) ToReader() io.ReadCloser { func (ed *ExchangeData) ToReader() io.ReadCloser {
return io.NopCloser(bytes.NewReader(ed.message)) return io.NopCloser(bytes.NewReader(ed.message))
} }
func (ed *ExchangeData) Info() restorepoint.ItemInfo {
return restorepoint.ItemInfo{Exchange: ed.info}
}

View File

@ -17,6 +17,7 @@ import (
msfolder "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders" msfolder "github.com/microsoftgraph/msgraph-sdk-go/users/item/mailfolders"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/alcionai/corso/internal/connector/exchange"
"github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/pkg/account" "github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/logger" "github.com/alcionai/corso/pkg/logger"
@ -396,7 +397,7 @@ func (gc *GraphConnector) serializeMessageIteratorCallback(
} }
if byteArray != nil { if byteArray != nil {
edc.PopulateCollection(&ExchangeData{id: *message.GetId(), message: byteArray}) edc.PopulateCollection(&ExchangeData{id: *message.GetId(), message: byteArray, info: exchange.MessageInfo(message)})
} }
return true return true

View File

@ -3,10 +3,12 @@ package mockconnector
import ( import (
"bytes" "bytes"
"io" "io"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/pkg/restorepoint"
) )
// MockExchangeDataCollection represents a mock exchange mailbox // MockExchangeDataCollection represents a mock exchange mailbox
@ -18,6 +20,7 @@ type MockExchangeDataCollection struct {
var ( var (
_ connector.DataCollection = &MockExchangeDataCollection{} _ connector.DataCollection = &MockExchangeDataCollection{}
_ connector.DataStream = &MockExchangeData{} _ connector.DataStream = &MockExchangeData{}
_ connector.DataStreamInfo = &MockExchangeData{}
) )
// NewMockExchangeDataCollection creates an data collection that will return the specified number of // NewMockExchangeDataCollection creates an data collection that will return the specified number of
@ -65,3 +68,7 @@ func (med *MockExchangeData) UUID() string {
func (med *MockExchangeData) ToReader() io.ReadCloser { func (med *MockExchangeData) ToReader() io.ReadCloser {
return med.Reader return med.Reader
} }
func (med *MockExchangeData) Info() restorepoint.ItemInfo {
return restorepoint.ItemInfo{Exchange: &restorepoint.ExchangeInfo{Sender: "foo@bar.com", Subject: "Hello world!", Received: time.Now()}}
}

View File

@ -2,6 +2,7 @@ package kopia
import ( import (
"context" "context"
"path"
"github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs" "github.com/kopia/kopia/fs/virtualfs"
@ -13,6 +14,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/pkg/restorepoint"
) )
const ( const (
@ -77,6 +79,7 @@ func (w *Wrapper) Close(ctx context.Context) error {
// DataCollection. // DataCollection.
func getStreamItemFunc( func getStreamItemFunc(
collection connector.DataCollection, collection connector.DataCollection,
details *restorepoint.Details,
) func(context.Context, func(context.Context, fs.Entry) error) error { ) func(context.Context, func(context.Context, fs.Entry) error) error {
return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error { return func(ctx context.Context, cb func(context.Context, fs.Entry) error) error {
items := collection.Items() items := collection.Items()
@ -88,11 +91,19 @@ func getStreamItemFunc(
if !ok { if !ok {
return nil return nil
} }
ei, ok := e.(connector.DataStreamInfo)
if !ok {
return errors.New("item does not implement DataStreamInfo")
}
entry := virtualfs.StreamingFileFromReader(e.UUID(), e.ToReader()) entry := virtualfs.StreamingFileFromReader(e.UUID(), e.ToReader())
if err := cb(ctx, entry); err != nil { if err := cb(ctx, entry); err != nil {
return errors.Wrap(err, "executing callback") return errors.Wrap(err, "executing callback")
} }
// Populate RestorePointDetails
ep := append(collection.FullPath(), e.UUID())
details.Add(path.Join(ep...), ei.Info())
} }
} }
} }
@ -101,7 +112,7 @@ func getStreamItemFunc(
// buildKopiaDirs recursively builds a directory hierarchy from the roots up. // buildKopiaDirs recursively builds a directory hierarchy from the roots up.
// Returned directories are either virtualfs.StreamingDirectory or // Returned directories are either virtualfs.StreamingDirectory or
// virtualfs.staticDirectory. // virtualfs.staticDirectory.
func buildKopiaDirs(dirName string, dir *treeMap) (fs.Directory, error) { func buildKopiaDirs(dirName string, dir *treeMap, details *restorepoint.Details) (fs.Directory, error) {
// Don't support directories that have both a DataCollection and a set of // Don't support directories that have both a DataCollection and a set of
// static child directories. // static child directories.
if dir.collection != nil && len(dir.childDirs) > 0 { if dir.collection != nil && len(dir.childDirs) > 0 {
@ -109,7 +120,7 @@ func buildKopiaDirs(dirName string, dir *treeMap) (fs.Directory, error) {
} }
if dir.collection != nil { if dir.collection != nil {
return virtualfs.NewStreamingDirectory(dirName, getStreamItemFunc(dir.collection)), nil return virtualfs.NewStreamingDirectory(dirName, getStreamItemFunc(dir.collection, details)), nil
} }
// Need to build the directory tree from the leaves up because intermediate // Need to build the directory tree from the leaves up because intermediate
@ -117,7 +128,7 @@ func buildKopiaDirs(dirName string, dir *treeMap) (fs.Directory, error) {
childDirs := []fs.Entry{} childDirs := []fs.Entry{}
for childName, childDir := range dir.childDirs { for childName, childDir := range dir.childDirs {
child, err := buildKopiaDirs(childName, childDir) child, err := buildKopiaDirs(childName, childDir, details)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -143,7 +154,7 @@ func newTreeMap() *treeMap {
// ancestor of the streams and uses virtualfs.StaticDirectory for internal nodes // ancestor of the streams and uses virtualfs.StaticDirectory for internal nodes
// in the hierarchy. Leaf nodes are virtualfs.StreamingDirectory with the given // in the hierarchy. Leaf nodes are virtualfs.StreamingDirectory with the given
// DataCollections. // DataCollections.
func inflateDirTree(ctx context.Context, collections []connector.DataCollection) (fs.Directory, error) { func inflateDirTree(ctx context.Context, collections []connector.DataCollection, details *restorepoint.Details) (fs.Directory, error) {
roots := make(map[string]*treeMap) roots := make(map[string]*treeMap)
for _, s := range collections { for _, s := range collections {
@ -202,7 +213,7 @@ func inflateDirTree(ctx context.Context, collections []connector.DataCollection)
var res fs.Directory var res fs.Directory
for dirName, dir := range roots { for dirName, dir := range roots {
tmp, err := buildKopiaDirs(dirName, dir) tmp, err := buildKopiaDirs(dirName, dir, details)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -216,27 +227,30 @@ func inflateDirTree(ctx context.Context, collections []connector.DataCollection)
func (w Wrapper) BackupCollections( func (w Wrapper) BackupCollections(
ctx context.Context, ctx context.Context,
collections []connector.DataCollection, collections []connector.DataCollection,
) (*BackupStats, error) { ) (*BackupStats, *restorepoint.Details, error) {
if w.c == nil { if w.c == nil {
return nil, errNotConnected return nil, nil, errNotConnected
} }
dirTree, err := inflateDirTree(ctx, collections) details := &restorepoint.Details{}
dirTree, err := inflateDirTree(ctx, collections, details)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "building kopia directories") return nil, nil, errors.Wrap(err, "building kopia directories")
} }
stats, err := w.makeSnapshotWithRoot(ctx, dirTree) stats, err := w.makeSnapshotWithRoot(ctx, dirTree, details)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
return stats, nil return stats, details, nil
} }
func (w Wrapper) makeSnapshotWithRoot( func (w Wrapper) makeSnapshotWithRoot(
ctx context.Context, ctx context.Context,
root fs.Directory, root fs.Directory,
details *restorepoint.Details,
) (*BackupStats, error) { ) (*BackupStats, error) {
si := snapshot.SourceInfo{ si := snapshot.SourceInfo{
Host: kTestHost, Host: kTestHost,
@ -261,6 +275,9 @@ func (w Wrapper) makeSnapshotWithRoot(
return nil, errors.Wrap(err, "uploading data") return nil, errors.Wrap(err, "uploading data")
} }
// TODO: Persist RestorePointDetails here
// Create and store RestorePoint
if _, err := snapshot.SaveSnapshot(ctx, rw, man); err != nil { if _, err := snapshot.SaveSnapshot(ctx, rw, man); err != nil {
return nil, errors.Wrap(err, "saving snapshot") return nil, errors.Wrap(err, "saving snapshot")
} }
@ -269,6 +286,8 @@ func (w Wrapper) makeSnapshotWithRoot(
return nil, errors.Wrap(err, "flushing writer") return nil, errors.Wrap(err, "flushing writer")
} }
// TODO: Return RestorePoint ID in stats
res := manifestToStats(man) res := manifestToStats(man)
return &res, nil return &res, nil
} }

View File

@ -16,6 +16,7 @@ import (
"github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/connector/mockconnector" "github.com/alcionai/corso/internal/connector/mockconnector"
ctesting "github.com/alcionai/corso/internal/testing" ctesting "github.com/alcionai/corso/internal/testing"
"github.com/alcionai/corso/pkg/restorepoint"
) )
const ( const (
@ -72,6 +73,8 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() {
user2: 42, user2: 42,
} }
details := &restorepoint.Details{}
collections := []connector.DataCollection{ collections := []connector.DataCollection{
mockconnector.NewMockExchangeDataCollection( mockconnector.NewMockExchangeDataCollection(
[]string{tenant, user1, emails}, []string{tenant, user1, emails},
@ -91,7 +94,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() {
// - user2 // - user2
// - emails // - emails
// - 42 separate files // - 42 separate files
dirTree, err := inflateDirTree(ctx, collections) dirTree, err := inflateDirTree(ctx, collections, details)
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
assert.Equal(suite.T(), dirTree.Name(), tenant) assert.Equal(suite.T(), dirTree.Name(), tenant)
@ -116,6 +119,13 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() {
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
assert.Len(suite.T(), emailFiles, expectedFileCount[entry.Name()]) assert.Len(suite.T(), emailFiles, expectedFileCount[entry.Name()])
} }
totalFileCount := 0
for _, c := range expectedFileCount {
totalFileCount += c
}
assert.Len(suite.T(), details.Entries, totalFileCount)
} }
func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() { func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() {
@ -126,6 +136,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() {
expectedFileCount := 42 expectedFileCount := 42
details := &restorepoint.Details{}
collections := []connector.DataCollection{ collections := []connector.DataCollection{
mockconnector.NewMockExchangeDataCollection( mockconnector.NewMockExchangeDataCollection(
[]string{emails}, []string{emails},
@ -136,7 +147,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() {
// Returned directory structure should look like: // Returned directory structure should look like:
// - emails // - emails
// - 42 separate files // - 42 separate files
dirTree, err := inflateDirTree(ctx, collections) dirTree, err := inflateDirTree(ctx, collections, details)
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
assert.Equal(suite.T(), dirTree.Name(), emails) assert.Equal(suite.T(), dirTree.Name(), emails)
@ -204,7 +215,8 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() {
ctx := context.Background() ctx := context.Background()
suite.T().Run(test.name, func(t *testing.T) { suite.T().Run(test.name, func(t *testing.T) {
_, err := inflateDirTree(ctx, test.layout) details := &restorepoint.Details{}
_, err := inflateDirTree(ctx, test.layout, details)
assert.Error(t, err) assert.Error(t, err)
}) })
} }
@ -260,13 +272,14 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
), ),
} }
stats, err := suite.w.BackupCollections(suite.ctx, collections) stats, rp, err := suite.w.BackupCollections(suite.ctx, collections)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, stats.TotalFileCount, 47) assert.Equal(t, stats.TotalFileCount, 47)
assert.Equal(t, stats.TotalDirectoryCount, 5) assert.Equal(t, stats.TotalDirectoryCount, 5)
assert.Equal(t, stats.IgnoredErrorCount, 0) assert.Equal(t, stats.IgnoredErrorCount, 0)
assert.Equal(t, stats.ErrorCount, 0) assert.Equal(t, stats.ErrorCount, 0)
assert.False(t, stats.Incomplete) assert.False(t, stats.Incomplete)
assert.Len(t, rp.Entries, 47)
} }
type KopiaSimpleRepoIntegrationSuite struct { type KopiaSimpleRepoIntegrationSuite struct {
@ -312,13 +325,14 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
}, },
} }
stats, err := suite.w.BackupCollections(suite.ctx, collections) stats, rp, err := suite.w.BackupCollections(suite.ctx, collections)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, stats.ErrorCount, 0) require.Equal(t, stats.ErrorCount, 0)
require.Equal(t, stats.TotalFileCount, 1) require.Equal(t, stats.TotalFileCount, 1)
require.Equal(t, stats.TotalDirectoryCount, 3) require.Equal(t, stats.TotalDirectoryCount, 3)
require.Equal(t, stats.IgnoredErrorCount, 0) require.Equal(t, stats.IgnoredErrorCount, 0)
require.False(t, stats.Incomplete) require.False(t, stats.Incomplete)
assert.Len(t, rp.Entries, 1)
suite.snapshotID = manifest.ID(stats.SnapshotID) suite.snapshotID = manifest.ID(stats.SnapshotID)
} }

View File

@ -10,6 +10,7 @@ import (
"github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/kopia" "github.com/alcionai/corso/internal/kopia"
"github.com/alcionai/corso/pkg/account" "github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/restorepoint"
"github.com/alcionai/corso/pkg/selectors" "github.com/alcionai/corso/pkg/selectors"
) )
@ -36,11 +37,12 @@ func NewBackupOperation(
ctx context.Context, ctx context.Context,
opts Options, opts Options,
kw *kopia.Wrapper, kw *kopia.Wrapper,
ms *kopia.ModelStore,
acct account.Account, acct account.Account,
selector selectors.Selector, selector selectors.Selector,
) (BackupOperation, error) { ) (BackupOperation, error) {
op := BackupOperation{ op := BackupOperation{
operation: newOperation(opts, kw), operation: newOperation(opts, kw, ms),
Selectors: selector, Selectors: selector,
Version: "v0", Version: "v0",
account: acct, account: acct,
@ -90,12 +92,33 @@ func (op *BackupOperation) Run(ctx context.Context) error {
stats.gc = gc.Status() stats.gc = gc.Status()
// hand the results to the consumer // hand the results to the consumer
stats.k, err = op.kopia.BackupCollections(ctx, cs) var details *restorepoint.Details
stats.k, details, err = op.kopia.BackupCollections(ctx, cs)
if err != nil { if err != nil {
stats.writeErr = err stats.writeErr = err
return errors.Wrap(err, "backing up service data") return errors.Wrap(err, "backing up service data")
} }
err = op.createRestorePoint(ctx, stats.k.SnapshotID, details)
if err != nil {
stats.writeErr = err
return err
}
return nil
}
func (op *BackupOperation) createRestorePoint(ctx context.Context, snapID string, details *restorepoint.Details) error {
err := op.modelStore.Put(ctx, kopia.RestorePointDetailsModel, details)
if err != nil {
return errors.Wrap(err, "creating restorepointdetails model")
}
err = op.modelStore.Put(ctx, kopia.RestorePointModel,
restorepoint.New(snapID, string(details.ModelStoreID)))
if err != nil {
return errors.Wrap(err, "creating restorepoint model")
}
return nil return nil
} }

View File

@ -37,6 +37,7 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
var ( var (
kw = &kopia.Wrapper{} kw = &kopia.Wrapper{}
ms = &kopia.ModelStore{}
acct = account.Account{} acct = account.Account{}
now = time.Now() now = time.Now()
stats = backupStats{ stats = backupStats{
@ -51,7 +52,7 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
} }
) )
op, err := NewBackupOperation(ctx, Options{}, kw, acct, selectors.Selector{}) op, err := NewBackupOperation(ctx, Options{}, kw, ms, acct, selectors.Selector{})
require.NoError(t, err) require.NoError(t, err)
op.persistResults(now, &stats) op.persistResults(now, &stats)
@ -92,6 +93,7 @@ func (suite *BackupOpIntegrationSuite) SetupSuite() {
func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
kw := &kopia.Wrapper{} kw := &kopia.Wrapper{}
ms := &kopia.ModelStore{}
acct, err := ctesting.NewM365Account() acct, err := ctesting.NewM365Account()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
@ -99,12 +101,14 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
name string name string
opts Options opts Options
kw *kopia.Wrapper kw *kopia.Wrapper
ms *kopia.ModelStore
acct account.Account acct account.Account
targets []string targets []string
errCheck assert.ErrorAssertionFunc errCheck assert.ErrorAssertionFunc
}{ }{
{"good", Options{}, kw, acct, nil, assert.NoError}, {"good", Options{}, kw, ms, acct, nil, assert.NoError},
{"missing kopia", Options{}, nil, acct, nil, assert.Error}, {"missing kopia", Options{}, nil, ms, acct, nil, assert.Error},
{"missing modelstore", Options{}, kw, nil, acct, nil, assert.Error},
} }
for _, test := range table { for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) { suite.T().Run(test.name, func(t *testing.T) {
@ -112,6 +116,7 @@ func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
context.Background(), context.Background(),
Options{}, Options{},
test.kw, test.kw,
test.ms,
test.acct, test.acct,
selectors.Selector{}) selectors.Selector{})
test.errCheck(t, err) test.errCheck(t, err)
@ -143,6 +148,11 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
w, err := kopia.NewWrapper(k) w, err := kopia.NewWrapper(k)
require.NoError(t, err) require.NoError(t, err)
defer w.Close(ctx)
ms, err := kopia.NewModelStore(k)
require.NoError(t, err)
defer ms.Close(ctx)
sel := selectors.NewExchangeBackup() sel := selectors.NewExchangeBackup()
sel.Include(sel.Users(m365User)) sel.Include(sel.Users(m365User))
@ -151,6 +161,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
ctx, ctx,
Options{}, Options{},
w, w,
ms,
acct, acct,
sel.Selector) sel.Selector)
require.NoError(t, err) require.NoError(t, err)

View File

@ -31,7 +31,8 @@ type operation struct {
Options Options `json:"options"` Options Options `json:"options"`
Status opStatus `json:"status"` Status opStatus `json:"status"`
kopia *kopia.Wrapper kopia *kopia.Wrapper
modelStore *kopia.ModelStore
} }
// Options configure some parameters of the operation // Options configure some parameters of the operation
@ -43,13 +44,15 @@ type Options struct {
func newOperation( func newOperation(
opts Options, opts Options,
kw *kopia.Wrapper, kw *kopia.Wrapper,
ms *kopia.ModelStore,
) operation { ) operation {
return operation{ return operation{
ID: uuid.New(), ID: uuid.New(),
CreatedAt: time.Now(), CreatedAt: time.Now(),
Options: opts, Options: opts,
kopia: kw, kopia: kw,
Status: InProgress, modelStore: ms,
Status: InProgress,
} }
} }
@ -57,6 +60,9 @@ func (op operation) validate() error {
if op.kopia == nil { if op.kopia == nil {
return errors.New("missing kopia connection") return errors.New("missing kopia connection")
} }
if op.modelStore == nil {
return errors.New("missing modelstore")
}
return nil return nil
} }

View File

@ -19,23 +19,26 @@ func TestOperationSuite(t *testing.T) {
func (suite *OperationSuite) TestNewOperation() { func (suite *OperationSuite) TestNewOperation() {
t := suite.T() t := suite.T()
op := newOperation(Options{}, nil) op := newOperation(Options{}, nil, nil)
assert.NotNil(t, op.ID) assert.NotNil(t, op.ID)
} }
func (suite *OperationSuite) TestOperation_Validate() { func (suite *OperationSuite) TestOperation_Validate() {
kwStub := &kopia.Wrapper{} kwStub := &kopia.Wrapper{}
msStub := &kopia.ModelStore{}
table := []struct { table := []struct {
name string name string
kw *kopia.Wrapper kw *kopia.Wrapper
ms *kopia.ModelStore
errCheck assert.ErrorAssertionFunc errCheck assert.ErrorAssertionFunc
}{ }{
{"good", kwStub, assert.NoError}, {"good", kwStub, msStub, assert.NoError},
{"missing kopia", nil, assert.Error}, {"missing kopia wrapper", nil, msStub, assert.Error},
{"missing kopia modelstore", kwStub, nil, assert.Error},
} }
for _, test := range table { for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) { suite.T().Run(test.name, func(t *testing.T) {
op := newOperation(Options{}, test.kw) op := newOperation(Options{}, test.kw, test.ms)
test.errCheck(t, op.validate()) test.errCheck(t, op.validate())
}) })
} }

View File

@ -35,12 +35,13 @@ func NewRestoreOperation(
ctx context.Context, ctx context.Context,
opts Options, opts Options,
kw *kopia.Wrapper, kw *kopia.Wrapper,
ms *kopia.ModelStore,
acct account.Account, acct account.Account,
restorePointID string, restorePointID string,
targets []string, targets []string,
) (RestoreOperation, error) { ) (RestoreOperation, error) {
op := RestoreOperation{ op := RestoreOperation{
operation: newOperation(opts, kw), operation: newOperation(opts, kw, ms),
RestorePointID: restorePointID, RestorePointID: restorePointID,
Targets: targets, Targets: targets,
Version: "v0", Version: "v0",

View File

@ -37,6 +37,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
var ( var (
kw = &kopia.Wrapper{} kw = &kopia.Wrapper{}
ms = &kopia.ModelStore{}
acct = account.Account{} acct = account.Account{}
now = time.Now() now = time.Now()
stats = restoreStats{ stats = restoreStats{
@ -49,7 +50,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
} }
) )
op, err := NewRestoreOperation(ctx, Options{}, kw, acct, "foo", nil) op, err := NewRestoreOperation(ctx, Options{}, kw, ms, acct, "foo", nil)
require.NoError(t, err) require.NoError(t, err)
op.persistResults(now, &stats) op.persistResults(now, &stats)
@ -85,6 +86,7 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() {
func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
kw := &kopia.Wrapper{} kw := &kopia.Wrapper{}
ms := &kopia.ModelStore{}
acct, err := ctesting.NewM365Account() acct, err := ctesting.NewM365Account()
require.NoError(suite.T(), err) require.NoError(suite.T(), err)
@ -92,12 +94,14 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
name string name string
opts Options opts Options
kw *kopia.Wrapper kw *kopia.Wrapper
ms *kopia.ModelStore
acct account.Account acct account.Account
targets []string targets []string
errCheck assert.ErrorAssertionFunc errCheck assert.ErrorAssertionFunc
}{ }{
{"good", Options{}, kw, acct, nil, assert.NoError}, {"good", Options{}, kw, ms, acct, nil, assert.NoError},
{"missing kopia", Options{}, nil, acct, nil, assert.Error}, {"missing kopia", Options{}, nil, ms, acct, nil, assert.Error},
{"missing modelstore", Options{}, kw, nil, acct, nil, assert.Error},
} }
for _, test := range table { for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) { suite.T().Run(test.name, func(t *testing.T) {
@ -105,6 +109,7 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
context.Background(), context.Background(),
Options{}, Options{},
test.kw, test.kw,
test.ms,
test.acct, test.acct,
"restore-point-id", "restore-point-id",
nil) nil)

View File

@ -20,9 +20,10 @@ type Repository struct {
CreatedAt time.Time CreatedAt time.Time
Version string // in case of future breaking changes Version string // in case of future breaking changes
Account account.Account // the user's m365 account connection details Account account.Account // the user's m365 account connection details
Storage storage.Storage // the storage provider details and configuration Storage storage.Storage // the storage provider details and configuration
dataLayer *kopia.Wrapper dataLayer *kopia.Wrapper
modelStore *kopia.ModelStore
} }
// Initialize will: // Initialize will:
@ -42,7 +43,7 @@ func Initialize(
if err := kopiaRef.Initialize(ctx); err != nil { if err := kopiaRef.Initialize(ctx); err != nil {
return nil, err return nil, err
} }
// kopiaRef comes with a count of 1 and NewWrapper bumps it again so safe // kopiaRef comes with a count of 1 and NewWrapper/NewModelStore bumps it again so safe
// to close here. // to close here.
defer kopiaRef.Close(ctx) defer kopiaRef.Close(ctx)
@ -51,12 +52,18 @@ func Initialize(
return nil, err return nil, err
} }
ms, err := kopia.NewModelStore(kopiaRef)
if err != nil {
return nil, err
}
r := Repository{ r := Repository{
ID: uuid.New(), ID: uuid.New(),
Version: "v1", Version: "v1",
Account: acct, Account: acct,
Storage: storage, Storage: storage,
dataLayer: w, dataLayer: w,
modelStore: ms,
} }
return &r, nil return &r, nil
} }
@ -75,7 +82,7 @@ func Connect(
if err := kopiaRef.Connect(ctx); err != nil { if err := kopiaRef.Connect(ctx); err != nil {
return nil, err return nil, err
} }
// kopiaRef comes with a count of 1 and NewWrapper bumps it again so safe // kopiaRef comes with a count of 1 and NewWrapper/NewModelStore bumps it again so safe
// to close here. // to close here.
defer kopiaRef.Close(ctx) defer kopiaRef.Close(ctx)
@ -84,29 +91,37 @@ func Connect(
return nil, err return nil, err
} }
ms, err := kopia.NewModelStore(kopiaRef)
if err != nil {
return nil, err
}
// todo: ID and CreatedAt should get retrieved from a stored kopia config. // todo: ID and CreatedAt should get retrieved from a stored kopia config.
r := Repository{ r := Repository{
Version: "v1", Version: "v1",
Account: acct, Account: acct,
Storage: storage, Storage: storage,
dataLayer: w, dataLayer: w,
modelStore: ms,
} }
return &r, nil return &r, nil
} }
func (r *Repository) Close(ctx context.Context) error { func (r *Repository) Close(ctx context.Context) error {
if r.dataLayer == nil { if r.dataLayer != nil {
err := r.dataLayer.Close(ctx)
r.dataLayer = nil
if err != nil {
return errors.Wrap(err, "closing corso DataLayer")
}
}
if r.modelStore == nil {
return nil return nil
} }
err := r.modelStore.Close(ctx)
err := r.dataLayer.Close(ctx) r.modelStore = nil
r.dataLayer = nil return errors.Wrap(err, "closing corso ModelStore")
if err != nil {
return errors.Wrap(err, "closing corso Repository")
}
return nil
} }
// NewBackup generates a backupOperation runner. // NewBackup generates a backupOperation runner.
@ -115,6 +130,7 @@ func (r Repository) NewBackup(ctx context.Context, selector selectors.Selector)
ctx, ctx,
operations.Options{}, operations.Options{},
r.dataLayer, r.dataLayer,
r.modelStore,
r.Account, r.Account,
selector) selector)
} }
@ -125,6 +141,7 @@ func (r Repository) NewRestore(ctx context.Context, restorePointID string, targe
ctx, ctx,
operations.Options{}, operations.Options{},
r.dataLayer, r.dataLayer,
r.modelStore,
r.Account, r.Account,
restorePointID, restorePointID,
targets) targets)