add e2e backup-restore integration test (#311)

* add e2e backup-restore integration test

Adds an e2e integration test that starts by backing up
data, and ends with restoring it.  Also makes various
amendments to other code where necessary to
facilitate this exercise.
This commit is contained in:
Keepers 2022-07-14 09:00:55 -06:00 committed by GitHub
parent 76a5d6bba3
commit 105fd7383a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 448 additions and 186 deletions

View File

@ -11,6 +11,7 @@ import (
"github.com/alcionai/corso/cli/utils"
"github.com/alcionai/corso/pkg/logger"
"github.com/alcionai/corso/pkg/repository"
"github.com/alcionai/corso/pkg/selectors"
)
// exchange bucket info from flags
@ -85,7 +86,7 @@ func restoreExchangeCmd(cmd *cobra.Command, args []string) error {
}
defer utils.CloseRepo(ctx, r)
ro, err := r.NewRestore(ctx, backupID, []string{m365.TenantID, user, "mail", folder, mail})
ro, err := r.NewRestore(ctx, backupID, exchangeRestoreSelectors(user, folder, mail))
if err != nil {
return errors.Wrap(err, "Failed to initialize Exchange restore")
}
@ -98,9 +99,32 @@ func restoreExchangeCmd(cmd *cobra.Command, args []string) error {
return nil
}
func validateRestoreFlags(u, f, m, bID string) error {
if len(bID) == 0 {
return errors.New("a backup ID is requried")
func exchangeRestoreSelectors(u, f, m string) selectors.Selector {
sel := selectors.NewExchangeRestore()
if u == "*" {
u = selectors.All
}
if f == "*" {
f = selectors.All
}
if m == "*" {
m = selectors.All
}
if len(m) > 0 {
sel.Include(sel.Mails(u, f, m))
}
if len(f) > 0 && len(m) == 0 {
sel.Include(sel.MailFolders(u, f))
}
if len(f) == 0 && len(m) == 0 {
sel.Include(sel.Users(u))
}
return sel.Selector
}
func validateRestoreFlags(u, f, m, rpid string) error {
if len(rpid) == 0 {
return errors.New("a restore point ID is requried")
}
lu, lf, lm := len(u), len(f), len(m)
if (lu == 0 || u == "*") && (lf+lm > 0) {

View File

@ -9,7 +9,7 @@ require (
github.com/kopia/kopia v0.11.1
github.com/microsoft/kiota-abstractions-go v0.8.1
github.com/microsoft/kiota-authentication-azure-go v0.3.0
github.com/microsoft/kiota-serialization-json-go v0.5.4
github.com/microsoft/kiota-serialization-json-go v0.5.5
github.com/microsoftgraph/msgraph-sdk-go v0.28.0
github.com/microsoftgraph/msgraph-sdk-go-core v0.26.1
github.com/pkg/errors v0.9.1
@ -45,7 +45,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/chmduquesne/rollinghash v4.0.0+incompatible // indirect
github.com/cjlapao/common-go v0.0.21 // indirect
github.com/cjlapao/common-go v0.0.22 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect

View File

@ -69,8 +69,8 @@ github.com/chmduquesne/rollinghash v4.0.0+incompatible/go.mod h1:Uc2I36RRfTAf7Dg
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cjlapao/common-go v0.0.21 h1:z4PDFLQG4pJxHEmM8ecmnjDgTcR0Xr/30WZiNZF2oYM=
github.com/cjlapao/common-go v0.0.21/go.mod h1:QHUcl8KX3RgNVonFJ1WpW4mlr9NyWOHmzqxaRbwooPo=
github.com/cjlapao/common-go v0.0.22 h1:hQQ4mMupPp47eZRb5D8mxjrp0VyRSWNk/FOld7wxMsQ=
github.com/cjlapao/common-go v0.0.22/go.mod h1:RZuwsymEIdwSubzUBpNYmmGfeITqHDV5iTgnr6zYwSc=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
@ -256,8 +256,8 @@ github.com/microsoft/kiota-authentication-azure-go v0.3.0 h1:iLyy5qldAjBiYMGMk1r
github.com/microsoft/kiota-authentication-azure-go v0.3.0/go.mod h1:qyZWSCug2eG1zrRnCSacyFHGsgQa4aSCWn3EOkY9Z1M=
github.com/microsoft/kiota-http-go v0.5.2 h1:BS/bK2xHLT8TT+p0uZKxwu+lkXDAPByugYP2n1nV0Uo=
github.com/microsoft/kiota-http-go v0.5.2/go.mod h1:WqEFNw3rMEatymG4Xh3rLSTxaKq80rJdQ/CSSh7m6jI=
github.com/microsoft/kiota-serialization-json-go v0.5.4 h1:BpkTYq1AeZPCnSsp3zpzfNL9hx3xb1/LPFteV6tbhMQ=
github.com/microsoft/kiota-serialization-json-go v0.5.4/go.mod h1:GI9vrssO1EvqzDtvMKuhjALn40phZOWkeeaMgtCk6xE=
github.com/microsoft/kiota-serialization-json-go v0.5.5 h1:B0iKBKOdi+9NKFlormLRqduQ1+77MPGRsZ7xnd74EqQ=
github.com/microsoft/kiota-serialization-json-go v0.5.5/go.mod h1:GI9vrssO1EvqzDtvMKuhjALn40phZOWkeeaMgtCk6xE=
github.com/microsoft/kiota-serialization-text-go v0.4.1 h1:6QPH7+geUPCpaSZkKCQw0Scngx2IF0vKodrvvWWiu2A=
github.com/microsoft/kiota-serialization-text-go v0.4.1/go.mod h1:DsriFnVBDCc4D84qxG3j8q/1Sxu16JILfhxMZm3kdfw=
github.com/microsoftgraph/msgraph-sdk-go v0.28.0 h1:BolP/vNW7gsNXivg/qikcdftOicLMgMm3Z/6PpSFDvU=

View File

@ -5,6 +5,7 @@ package connector
import (
"bytes"
"context"
"strings"
az "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
ka "github.com/microsoft/kiota-authentication-azure-go"
@ -187,59 +188,77 @@ func (gc *GraphConnector) ExchangeDataCollection(ctx context.Context, selector s
// RestoreMessages: Utility function to connect to M365 backstore
// and upload messages from DataCollection.
// FullPath: tenantId, userId, <mailCategory>, FolderId
func (gc *GraphConnector) RestoreMessages(ctx context.Context, dc DataCollection) error {
var errs error
// must be user.GetId(), PrimaryName no longer works 6-15-2022
user := dc.FullPath()[1]
items := dc.Items()
func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []DataCollection) error {
var (
pathCounter = map[string]bool{}
attempts, successes int
errs error
)
for {
select {
case <-ctx.Done():
return support.WrapAndAppend("context cancelled", ctx.Err(), errs)
case data, ok := <-items:
if !ok {
return errs
}
for _, dc := range dcs {
// must be user.GetId(), PrimaryName no longer works 6-15-2022
user := dc.FullPath()[1]
items := dc.Items()
pathCounter[strings.Join(dc.FullPath(), "")] = true
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(data.ToReader())
if err != nil {
errs = support.WrapAndAppend(data.UUID(), err, errs)
continue
}
message, err := support.CreateMessageFromBytes(buf.Bytes())
if err != nil {
errs = support.WrapAndAppend(data.UUID(), err, errs)
continue
}
clone := support.ToMessage(message)
address := dc.FullPath()[3]
// details on valueId settings: https://docs.microsoft.com/en-us/openspecs/exchange_server_protocols/ms-oxprops/77844470-22ca-43fb-993d-c53e96cf9cd6
valueId := "Integer 0x0E07"
enableValue := "4"
sv := models.NewSingleValueLegacyExtendedProperty()
sv.SetId(&valueId)
sv.SetValue(&enableValue)
svlep := []models.SingleValueLegacyExtendedPropertyable{sv}
clone.SetSingleValueExtendedProperties(svlep)
draft := false
clone.SetIsDraft(&draft)
sentMessage, err := gc.client.UsersById(user).MailFoldersById(address).Messages().Post(clone)
if err != nil {
errs = support.WrapAndAppend(data.UUID()+": "+
support.ConnectorStackErrorTrace(err), err, errs)
continue
// TODO: Add to retry Handler for the for failure
}
var exit bool
for !exit {
select {
case <-ctx.Done():
return support.WrapAndAppend("context cancelled", ctx.Err(), errs)
case data, ok := <-items:
if !ok {
exit = true
break
}
attempts++
if sentMessage == nil && err == nil {
errs = support.WrapAndAppend(data.UUID(), errors.New("Message not Sent: Blocked by server"), errs)
buf := &bytes.Buffer{}
_, err := buf.ReadFrom(data.ToReader())
if err != nil {
errs = support.WrapAndAppend(data.UUID(), err, errs)
continue
}
message, err := support.CreateMessageFromBytes(buf.Bytes())
if err != nil {
errs = support.WrapAndAppend(data.UUID(), err, errs)
continue
}
clone := support.ToMessage(message)
address := dc.FullPath()[3]
valueId := "Integer 0x0E07"
enableValue := "4"
sv := models.NewSingleValueLegacyExtendedProperty()
sv.SetId(&valueId)
sv.SetValue(&enableValue)
svlep := []models.SingleValueLegacyExtendedPropertyable{sv}
clone.SetSingleValueExtendedProperties(svlep)
draft := false
clone.SetIsDraft(&draft)
sentMessage, err := gc.client.UsersById(user).MailFoldersById(address).Messages().Post(clone)
if err != nil {
errs = support.WrapAndAppend(
data.UUID()+": "+support.ConnectorStackErrorTrace(err),
err, errs)
continue
// TODO: Add to retry Handler for the for failure
}
if sentMessage == nil && err == nil {
errs = support.WrapAndAppend(data.UUID(), errors.New("Message not Sent: Blocked by server"), errs)
}
if err != nil {
successes++
}
// This completes the restore loop for a message..
}
// This completes the restore loop for a message..
}
}
status := support.CreateStatus(ctx, support.Restore, attempts, successes, len(pathCounter), errs)
gc.SetStatus(*status)
logger.Ctx(ctx).Debug(gc.PrintableStatus())
return errs
}
// serializeMessages: Temp Function as place Holder until Collections have been added
@ -299,11 +318,9 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([
success += edc.Length()
collections = append(collections, &edc)
}
status, err := support.CreateStatus(support.Backup, attemptedItems, success, len(tasklist), errs)
if err == nil {
gc.SetStatus(*status)
logger.Ctx(ctx).Debugw(gc.PrintableStatus())
}
status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, len(tasklist), errs)
gc.SetStatus(*status)
logger.Ctx(ctx).Debugw(gc.PrintableStatus())
return collections, errs
}

View File

@ -83,7 +83,7 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_restoreMessages(
edc := NewExchangeDataCollection("tenant", []string{"tenantId", evs[user], mailCategory, "Inbox"})
edc.PopulateCollection(&ds)
edc.FinishPopulation()
err = suite.connector.RestoreMessages(context.Background(), &edc)
err = suite.connector.RestoreMessages(context.Background(), []DataCollection{&edc})
assert.NoError(suite.T(), err)
}
@ -165,9 +165,11 @@ func (suite *DisconnectedGraphConnectorSuite) TestInterfaceAlignment() {
func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_Status() {
gc := GraphConnector{}
suite.Equal(len(gc.PrintableStatus()), 0)
status, err := support.CreateStatus(support.Restore, 12, 9, 8,
status := support.CreateStatus(
context.Background(),
support.Restore,
12, 9, 8,
support.WrapAndAppend("tres", errors.New("three"), support.WrapAndAppend("arc376", errors.New("one"), errors.New("two"))))
assert.NoError(suite.T(), err)
gc.SetStatus(*status)
suite.Greater(len(gc.PrintableStatus()), 0)
suite.Greater(gc.Status().ObjectCount, 0)

View File

@ -15,6 +15,8 @@ import (
type MockExchangeDataCollection struct {
fullPath []string
messageCount int
Data [][]byte
Names []string
}
var (
@ -26,11 +28,19 @@ var (
// NewMockExchangeDataCollection creates an data collection that will return the specified number of
// mock messages when iterated
func NewMockExchangeDataCollection(pathRepresentation []string, numMessagesToReturn int) *MockExchangeDataCollection {
collection := &MockExchangeDataCollection{
c := &MockExchangeDataCollection{
fullPath: pathRepresentation,
messageCount: numMessagesToReturn,
Data: [][]byte{},
Names: []string{},
}
return collection
for i := 0; i < c.messageCount; i++ {
// We can plug in whatever data we want here (can be an io.Reader to a test data file if needed)
c.Data = append(c.Data, []byte("test message"))
c.Names = append(c.Names, uuid.NewString())
}
return c
}
func (medc *MockExchangeDataCollection) FullPath() []string {
@ -44,11 +54,11 @@ func (medc *MockExchangeDataCollection) Items() <-chan connector.DataStream {
go func() {
defer close(res)
for i := 0; i < medc.messageCount; i++ {
// We can plug in whatever data we want here (can be an io.Reader to a test data file if needed)
m := []byte("test message")
res <- &MockExchangeData{uuid.NewString(), io.NopCloser(bytes.NewReader(m))}
res <- &MockExchangeData{
medc.Names[i],
io.NopCloser(bytes.NewReader(medc.Data[i])),
}
}
}()

View File

@ -1,8 +1,10 @@
package support
import (
"errors"
"context"
"fmt"
"github.com/alcionai/corso/pkg/logger"
)
type ConnectorOperationStatus struct {
@ -25,25 +27,30 @@ const (
)
// Constructor for ConnectorOperationStatus. If the counts do not agree, an error is returned.
func CreateStatus(op Operation, objects, success, folders int, err error) (*ConnectorOperationStatus, error) {
func CreateStatus(ctx context.Context, op Operation, objects, success, folders int, err error) *ConnectorOperationStatus {
hasErrors := err != nil
var reason string
if err != nil {
reason = err.Error()
}
numErr := GetNumberOfErrors(err)
status := ConnectorOperationStatus{
lastOperation: op,
ObjectCount: objects,
folderCount: folders,
successful: success,
errorCount: GetNumberOfErrors(err),
errorCount: numErr,
incomplete: hasErrors,
incompleteReason: reason,
}
if status.ObjectCount != status.errorCount+status.successful {
return nil, errors.New("incorrect total on initialization")
logger.Ctx(ctx).DPanicw(
"status object count does not match errors + successes",
"objects", objects,
"successes", success,
"errors", numErr)
}
return &status, nil
return &status
}
func (cos *ConnectorOperationStatus) String() string {

View File

@ -1,6 +1,7 @@
package support
import (
"context"
"errors"
"testing"
@ -28,38 +29,37 @@ type statusParams struct {
func (suite *GCStatusTestSuite) TestCreateStatus() {
table := []struct {
name string
params statusParams
expected bool
checkError assert.ValueAssertionFunc
name string
params statusParams
expect assert.BoolAssertionFunc
}{
{
name: "Test: Status Success",
params: statusParams{Backup, 12, 12, 3, nil},
expected: false,
checkError: assert.Nil,
name: "Test: Status Success",
params: statusParams{Backup, 12, 12, 3, nil},
expect: assert.False,
},
{
name: "Test: Status Failed",
params: statusParams{Restore, 12, 9, 8, WrapAndAppend("tres", errors.New("three"), WrapAndAppend("arc376", errors.New("one"), errors.New("two")))},
expected: true,
checkError: assert.Nil,
name: "Test: Status Failed",
params: statusParams{Restore, 12, 9, 8, WrapAndAppend("tres", errors.New("three"), WrapAndAppend("arc376", errors.New("one"), errors.New("two")))},
expect: assert.True,
},
{
name: "Invalid status",
params: statusParams{Backup, 9, 3, 12, errors.New("invalidcl")},
expected: false,
checkError: assert.NotNil,
name: "Invalid status",
// todo: expect panic once logger.DPanicw identifies dev mode.
params: statusParams{Backup, 9, 3, 13, errors.New("invalidcl")},
expect: assert.True,
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
result, err := CreateStatus(test.params.operationType, test.params.objects,
test.params.success, test.params.folders, test.params.err)
test.checkError(t, err)
if err == nil {
suite.Equal(result.incomplete, test.expected)
}
result := CreateStatus(
context.Background(),
test.params.operationType,
test.params.objects,
test.params.success,
test.params.folders,
test.params.err)
test.expect(t, result.incomplete, "status is incomplete")
})
}

View File

@ -321,13 +321,12 @@ func (w Wrapper) getEntry(
return e, nil
}
// collectItems is a generic helper function that pulls data from kopia for the
// given item in the snapshot with ID snapshotID. If isDirectory is true, it
// returns a slice of DataCollections with data from directories in the subtree
// rooted at itemPath. If isDirectory is false it returns a DataCollection (in a
// slice) with a single item corresponding to the requested item. If the item
// does not exist or a file is found when a directory is expected (or the
// opposite) it returns an error.
// CollectItems pulls data from kopia for the given items in the snapshot with
// ID snapshotID. If isDirectory is true, it returns a slice of DataCollections
// with data from directories in the subtree rooted at itemPath. If isDirectory
// is false it returns a DataCollection (in a slice) with a single item for each
// requested item. If the item does not exist or a file is found when a directory
// is expected (or the opposite) it returns an error.
func (w Wrapper) collectItems(
ctx context.Context,
snapshotID string,
@ -528,3 +527,30 @@ func (w Wrapper) RestoreDirectory(
) ([]connector.DataCollection, error) {
return w.collectItems(ctx, snapshotID, basePath, true)
}
// RestoreSingleItem looks up all paths- assuming each is an item declaration,
// not a directory- in the snapshot with id snapshotID. The path should be the
// full path of the item from the root. Returns the results as a slice of single-
// item DataCollections, where the DataCollection.FullPath() matches the path.
// If the item does not exist in kopia or is not a file an error is returned.
// The UUID of the returned DataStreams will be the name of the kopia file the
// data is sourced from.
func (w Wrapper) RestoreMultipleItems(
ctx context.Context,
snapshotID string,
paths [][]string,
) ([]connector.DataCollection, error) {
var (
dcs = []connector.DataCollection{}
errs *multierror.Error
)
for _, path := range paths {
dc, err := w.RestoreSingleItem(ctx, snapshotID, path)
if err != nil {
errs = multierror.Append(errs, err)
} else {
dcs = append(dcs, dc)
}
}
return dcs, errs.ErrorOrNil()
}

View File

@ -8,6 +8,7 @@ import (
"path"
"testing"
"github.com/google/uuid"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/kopia/kopia/repo/manifest"
@ -72,17 +73,12 @@ func testForFiles(
fullPath := path.Join(append(c.FullPath(), s.UUID())...)
expected, ok := expected[fullPath]
require.True(
t,
ok,
"unexpected file with path %q",
path.Join(append(c.FullPath(), fullPath)...),
)
require.True(t, ok, "unexpected file with path %q", fullPath)
buf, err := ioutil.ReadAll(s.ToReader())
require.NoError(t, err)
require.NoError(t, err, "reading collection item: %s", fullPath)
assert.Equal(t, expected, buf)
assert.Equal(t, expected, buf, "comparing collection item: %s", fullPath)
}
}
@ -674,3 +670,79 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupRestoreDirectory_Errors(
})
}
}
func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() {
t := suite.T()
ctx := context.Background()
k, err := openKopiaRepo(t, ctx)
require.NoError(t, err)
w := &Wrapper{k}
tid := uuid.NewString()
p1 := []string{tid, "uid", "emails", "fid"}
p2 := []string{tid, "uid2", "emails", "fid"}
dc1 := mockconnector.NewMockExchangeDataCollection(p1, 1)
dc2 := mockconnector.NewMockExchangeDataCollection(p2, 1)
fp1 := append(p1, dc1.Names[0])
fp2 := append(p2, dc2.Names[0])
stats, _, err := w.BackupCollections(ctx, []connector.DataCollection{dc1, dc2})
require.NoError(t, err)
expected := map[string][]byte{
path.Join(fp1...): dc1.Data[0],
path.Join(fp2...): dc2.Data[0],
}
result, err := w.RestoreMultipleItems(
ctx,
string(stats.SnapshotID),
[][]string{fp1, fp2})
require.NoError(t, err)
assert.Equal(t, 2, len(result))
testForFiles(t, expected, result)
}
func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems_Errors() {
table := []struct {
name string
snapshotID string
paths [][]string
}{
{
"EmptyPaths",
string(suite.snapshotID),
[][]string{{}},
},
{
"NoSnapshot",
"foo",
[][]string{append(testPath, testFileName)},
},
{
"TargetNotAFile",
string(suite.snapshotID),
[][]string{testPath[:2]},
},
{
"NonExistentFile",
string(suite.snapshotID),
[][]string{append(testPath, "subdir", "foo")},
},
}
for _, test := range table {
suite.T().Run(test.name, func(t *testing.T) {
_, err := suite.w.RestoreMultipleItems(
suite.ctx,
test.snapshotID,
test.paths,
)
require.Error(t, err)
})
}
}

View File

@ -9,6 +9,7 @@ import (
"github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/kopia"
"github.com/alcionai/corso/internal/model"
"github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/backup"
"github.com/alcionai/corso/pkg/selectors"
@ -29,7 +30,7 @@ type BackupOperation struct {
type BackupResults struct {
summary
metrics
// todo: Backup ID
BackupID model.ID `json:"backupID"`
}
// NewBackupOperation constructs and validates a backup operation.
@ -113,10 +114,15 @@ func (op *BackupOperation) createBackupModels(ctx context.Context, snapID string
return errors.Wrap(err, "creating backupdetails model")
}
err = op.modelStore.Put(ctx, kopia.BackupModel, backup.New(snapID, string(details.ModelStoreID)))
rp := backup.New(snapID, string(details.ModelStoreID))
err = op.modelStore.Put(ctx, kopia.BackupModel, rp)
if err != nil {
return errors.Wrap(err, "creating backup model")
}
op.Results.BackupID = rp.StableID
return nil
}

View File

@ -75,7 +75,10 @@ type BackupOpIntegrationSuite struct {
}
func TestBackupOpIntegrationSuite(t *testing.T) {
if err := ctesting.RunOnAny(ctesting.CorsoCITests); err != nil {
if err := ctesting.RunOnAny(
ctesting.CorsoCITests,
ctesting.CorsoOperationTests,
); err != nil {
t.Skip(err)
}
suite.Run(t, new(BackupOpIntegrationSuite))
@ -128,10 +131,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
t := suite.T()
ctx := context.Background()
// m365User := "lidiah@8qzvrj.onmicrosoft.com"
// not the user we want to use, but all the others are
// suffering from JsonParseNode syndrome
m365User := "george.martinez@8qzvrj.onmicrosoft.com"
m365User := "lidiah@8qzvrj.onmicrosoft.com"
acct, err := ctesting.NewM365Account()
require.NoError(t, err)
@ -168,6 +168,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
require.NoError(t, bo.Run(ctx))
require.NotEmpty(t, bo.Results)
require.NotEmpty(t, bo.Results.BackupID)
assert.Equal(t, bo.Status, Successful)
assert.Greater(t, bo.Results.ItemsRead, 0)
assert.Greater(t, bo.Results.ItemsWritten, 0)

View File

@ -4,22 +4,26 @@ import (
"context"
"time"
"github.com/kopia/kopia/repo/manifest"
"github.com/pkg/errors"
"github.com/alcionai/corso/internal/connector"
"github.com/alcionai/corso/internal/connector/support"
"github.com/alcionai/corso/internal/kopia"
"github.com/alcionai/corso/internal/model"
"github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/backup"
"github.com/alcionai/corso/pkg/selectors"
)
// RestoreOperation wraps an operation with restore-specific props.
type RestoreOperation struct {
operation
BackupID string `json:"backupID"`
Results RestoreResults `json:"results"`
Targets []string `json:"selectors"` // todo: replace with Selectors
Version string `json:"bersion"`
BackupID model.ID `json:"backupID"`
Results RestoreResults `json:"results"`
Selectors selectors.Selector `json:"selectors"` // todo: replace with Selectors
Version string `json:"version"`
account account.Account
}
@ -37,13 +41,13 @@ func NewRestoreOperation(
kw *kopia.Wrapper,
ms *kopia.ModelStore,
acct account.Account,
backupID string,
targets []string,
backupID model.ID,
sel selectors.Selector,
) (RestoreOperation, error) {
op := RestoreOperation{
operation: newOperation(opts, kw, ms),
BackupID: backupID,
Targets: targets,
Selectors: sel,
Version: "v0",
account: acct,
}
@ -77,23 +81,48 @@ func (op *RestoreOperation) Run(ctx context.Context) error {
stats := restoreStats{}
defer op.persistResults(time.Now(), &stats)
dc, err := op.kopia.RestoreSingleItem(ctx, op.BackupID, op.Targets)
// retrieve the restore point details
rp := backup.Backup{}
err := op.modelStore.Get(ctx, kopia.BackupModel, op.BackupID, &rp)
if err != nil {
stats.readErr = errors.Wrap(err, "retrieving restore point")
return stats.readErr
}
rpd := backup.Details{}
err = op.modelStore.GetWithModelStoreID(ctx, kopia.BackupDetailsModel, manifest.ID(rp.DetailsID), &rpd)
if err != nil {
stats.readErr = errors.Wrap(err, "retrieving restore point details")
return stats.readErr
}
er, err := op.Selectors.ToExchangeRestore()
if err != nil {
stats.readErr = err
return errors.Wrap(err, "retrieving service data")
return err
}
stats.cs = []connector.DataCollection{dc}
// format the details and retrieve the items from kopia
fds := er.FilterDetails(&rpd)
dcs, err := op.kopia.RestoreMultipleItems(ctx, rp.SnapshotID, fds)
if err != nil {
stats.readErr = errors.Wrap(err, "retrieving service data")
return stats.readErr
}
stats.cs = dcs
// restore those collections using graph
gc, err := connector.NewGraphConnector(op.account)
if err != nil {
stats.writeErr = err
return errors.Wrap(err, "connecting to graph api")
stats.writeErr = errors.Wrap(err, "connecting to graph api")
return stats.writeErr
}
if err := gc.RestoreMessages(ctx, dc); err != nil {
stats.writeErr = err
return errors.Wrap(err, "restoring service data")
if err := gc.RestoreMessages(ctx, dcs); err != nil {
stats.writeErr = errors.Wrap(err, "restoring service data")
return stats.writeErr
}
stats.gc = gc.Status()
op.Status = Successful
return nil

View File

@ -15,6 +15,7 @@ import (
"github.com/alcionai/corso/internal/kopia"
ctesting "github.com/alcionai/corso/internal/testing"
"github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/selectors"
)
// ---------------------------------------------------------------------------
@ -50,7 +51,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
}
)
op, err := NewRestoreOperation(ctx, Options{}, kw, ms, acct, "foo", nil)
op, err := NewRestoreOperation(ctx, Options{}, kw, ms, acct, "foo", selectors.Selector{})
require.NoError(t, err)
op.persistResults(now, &stats)
@ -73,7 +74,10 @@ type RestoreOpIntegrationSuite struct {
}
func TestRestoreOpIntegrationSuite(t *testing.T) {
if err := ctesting.RunOnAny(ctesting.CorsoCITests); err != nil {
if err := ctesting.RunOnAny(
ctesting.CorsoCITests,
ctesting.CorsoOperationTests,
); err != nil {
t.Skip(err)
}
suite.Run(t, new(RestoreOpIntegrationSuite))
@ -112,8 +116,69 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
test.ms,
test.acct,
"backup-id",
nil)
selectors.Selector{})
test.errCheck(t, err)
})
}
}
func (suite *RestoreOpIntegrationSuite) TestRestore_Run() {
t := suite.T()
ctx := context.Background()
m365User := "lidiah@8qzvrj.onmicrosoft.com"
acct, err := ctesting.NewM365Account()
require.NoError(t, err)
// need to initialize the repository before we can test connecting to it.
st, err := ctesting.NewPrefixedS3Storage(t)
require.NoError(t, err)
k := kopia.NewConn(st)
require.NoError(t, k.Initialize(ctx))
defer k.Close(ctx)
w, err := kopia.NewWrapper(k)
require.NoError(t, err)
defer w.Close(ctx)
ms, err := kopia.NewModelStore(k)
require.NoError(t, err)
defer ms.Close(ctx)
bsel := selectors.NewExchangeBackup()
bsel.Include(bsel.Users(m365User))
bo, err := NewBackupOperation(
ctx,
Options{},
w,
ms,
acct,
bsel.Selector)
require.NoError(t, err)
require.NoError(t, bo.Run(ctx))
require.NotEmpty(t, bo.Results.BackupID)
rsel := selectors.NewExchangeRestore()
rsel.Include(rsel.Users(m365User))
ro, err := NewRestoreOperation(
ctx,
Options{},
w,
ms,
acct,
bo.Results.BackupID,
rsel.Selector)
require.NoError(t, err)
require.NoError(t, ro.Run(ctx), "restoreOp.Run()")
require.NotEmpty(t, ro.Results, "restoreOp results")
assert.Equal(t, ro.Status, Successful, "restoreOp status")
assert.Greater(t, ro.Results.ItemsRead, 0, "restore items read")
assert.Greater(t, ro.Results.ItemsWritten, 0, "restored items written")
assert.Zero(t, ro.Results.ReadErrors, "errors while reading restore data")
assert.Zero(t, ro.Results.WriteErrors, "errors while writing restore data")
assert.Equal(t, bo.Results.ItemsWritten, ro.Results.ItemsWritten, "backup and restore wrote the same num of items")
}

View File

@ -14,6 +14,7 @@ const (
CorsoGraphConnectorTests = "CORSO_GRAPH_CONNECTOR_TESTS"
CorsoKopiaWrapperTests = "CORSO_KOPIA_WRAPPER_TESTS"
CorsoModelStoreTests = "CORSO_MODEL_STORE_TESTS"
CorsoOperationTests = "CORSO_OPERATION_TESTS"
CorsoRepositoryTests = "CORSO_REPOSITORY_TESTS"
)

View File

@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
"github.com/alcionai/corso/internal/kopia"
"github.com/alcionai/corso/internal/model"
"github.com/alcionai/corso/internal/operations"
"github.com/alcionai/corso/pkg/account"
"github.com/alcionai/corso/pkg/backup"
@ -138,15 +139,15 @@ func (r Repository) NewBackup(ctx context.Context, selector selectors.Selector)
}
// NewRestore generates a restoreOperation runner.
func (r Repository) NewRestore(ctx context.Context, backupID string, targets []string) (operations.RestoreOperation, error) {
func (r Repository) NewRestore(ctx context.Context, backupID string, sel selectors.Selector) (operations.RestoreOperation, error) {
return operations.NewRestoreOperation(
ctx,
operations.Options{},
r.dataLayer,
r.modelStore,
r.Account,
backupID,
targets)
model.ID(backupID),
sel)
}
// backups lists backups in a respository

View File

@ -190,7 +190,7 @@ func (suite *RepositoryIntegrationSuite) TestNewRestore() {
r, err := repository.Initialize(ctx, acct, st)
require.NoError(t, err)
ro, err := r.NewRestore(ctx, "backup-id", []string{})
ro, err := r.NewRestore(ctx, "backup-id", selectors.Selector{})
require.NoError(t, err)
require.NotNil(t, ro)
}

View File

@ -36,7 +36,7 @@ type (
func NewExchangeBackup() *ExchangeBackup {
src := ExchangeBackup{
exchange{
newSelector(ServiceExchange, ""),
newSelector(ServiceExchange),
},
}
return &src
@ -53,10 +53,10 @@ func (s Selector) ToExchangeBackup() (*ExchangeBackup, error) {
}
// NewExchangeRestore produces a new Selector with the service set to ServiceExchange.
func NewExchangeRestore(backupID string) *ExchangeRestore {
func NewExchangeRestore() *ExchangeRestore {
src := ExchangeRestore{
exchange{
newSelector(ServiceExchange, backupID),
newSelector(ServiceExchange),
},
}
return &src
@ -406,15 +406,15 @@ func idPath(cat exchangeCategory, path []string) map[exchangeCategory]string {
// FilterDetails reduces the entries in a backupDetails struct to only
// those that match the inclusions and exclusions in the selector.
func (s *ExchangeRestore) FilterDetails(deets *backup.Details) []string {
func (s *ExchangeRestore) FilterDetails(deets *backup.Details) [][]string {
if deets == nil {
return []string{}
return nil
}
entIncs := exchangeScopesByCategory(s.Includes)
entExcs := exchangeScopesByCategory(s.Excludes)
refs := []string{}
refs := [][]string{}
for _, ent := range deets.Entries {
path := strings.Split(ent.RepoRef, "/")
@ -438,7 +438,7 @@ func (s *ExchangeRestore) FilterDetails(deets *backup.Details) []string {
entIncs[cat.String()],
entExcs[cat.String()])
if matched {
refs = append(refs, ent.RepoRef)
refs = append(refs, path)
}
}

View File

@ -1,6 +1,7 @@
package selectors
import (
"strings"
"testing"
"github.com/alcionai/corso/pkg/backup"
@ -21,7 +22,6 @@ func (suite *ExchangeSourceSuite) TestNewExchangeBackup() {
t := suite.T()
eb := NewExchangeBackup()
assert.Equal(t, eb.Service, ServiceExchange)
assert.Zero(t, eb.BackupID)
assert.NotZero(t, eb.Scopes())
}
@ -32,26 +32,23 @@ func (suite *ExchangeSourceSuite) TestToExchangeBackup() {
eb, err := s.ToExchangeBackup()
require.NoError(t, err)
assert.Equal(t, eb.Service, ServiceExchange)
assert.Zero(t, eb.BackupID)
assert.NotZero(t, eb.Scopes())
}
func (suite *ExchangeSourceSuite) TestNewExchangeRestore() {
t := suite.T()
er := NewExchangeRestore("backupID")
er := NewExchangeRestore()
assert.Equal(t, er.Service, ServiceExchange)
assert.Equal(t, er.BackupID, "backupID")
assert.NotZero(t, er.Scopes())
}
func (suite *ExchangeSourceSuite) TestToExchangeRestore() {
t := suite.T()
eb := NewExchangeRestore("rpid")
eb := NewExchangeRestore()
s := eb.Selector
eb, err := s.ToExchangeRestore()
require.NoError(t, err)
assert.Equal(t, eb.Service, ServiceExchange)
assert.Equal(t, eb.BackupID, "rpid")
assert.NotZero(t, eb.Scopes())
}
@ -486,7 +483,7 @@ func (suite *ExchangeSourceSuite) TestExchangeScope_IncludesPath() {
)
var (
path = []string{"tid", usr, "mail", fld, mail}
es = NewExchangeRestore("rpid")
es = NewExchangeRestore()
)
table := []struct {
@ -526,7 +523,7 @@ func (suite *ExchangeSourceSuite) TestExchangeScope_ExcludesPath() {
)
var (
path = []string{"tid", usr, "mail", fld, mail}
es = NewExchangeRestore("rpid")
es = NewExchangeRestore()
)
table := []struct {
@ -622,124 +619,131 @@ func (suite *ExchangeSourceSuite) TestExchangeRestore_FilterDetails() {
event = "tid/uid/event/eid"
mail = "tid/uid/mail/mfld/mid"
)
split := func(s ...string) [][]string {
r := [][]string{}
for _, ss := range s {
r = append(r, strings.Split(ss, "/"))
}
return r
}
table := []struct {
name string
deets *backup.Details
makeSelector func() *ExchangeRestore
expect []string
expect [][]string
}{
{
"no refs",
makeDeets(),
func() *ExchangeRestore {
er := NewExchangeRestore("rpid")
er := NewExchangeRestore()
er.Include(er.Users(All))
return er
},
[]string{},
[][]string{},
},
{
"contact only",
makeDeets(contact),
func() *ExchangeRestore {
er := NewExchangeRestore("rpid")
er := NewExchangeRestore()
er.Include(er.Users(All))
return er
},
[]string{contact},
split(contact),
},
{
"event only",
makeDeets(event),
func() *ExchangeRestore {
er := NewExchangeRestore("rpid")
er := NewExchangeRestore()
er.Include(er.Users(All))
return er
},
[]string{event},
split(event),
},
{
"mail only",
makeDeets(mail),
func() *ExchangeRestore {
er := NewExchangeRestore("rpid")
er := NewExchangeRestore()
er.Include(er.Users(All))
return er
},
[]string{mail},
split(mail),
},
{
"all",
makeDeets(contact, event, mail),
func() *ExchangeRestore {
er := NewExchangeRestore("rpid")
er := NewExchangeRestore()
er.Include(er.Users(All))
return er
},
[]string{contact, event, mail},
split(contact, event, mail),
},
{
"only match contact",
makeDeets(contact, event, mail),
func() *ExchangeRestore {
er := NewExchangeRestore("rpid")
er := NewExchangeRestore()
er.Include(er.Contacts("uid", "cfld", "cid"))
return er
},
[]string{contact},
split(contact),
},
{
"only match event",
makeDeets(contact, event, mail),
func() *ExchangeRestore {
er := NewExchangeRestore("rpid")
er := NewExchangeRestore()
er.Include(er.Events("uid", "eid"))
return er
},
[]string{event},
split(event),
},
{
"only match mail",
makeDeets(contact, event, mail),
func() *ExchangeRestore {
er := NewExchangeRestore("rpid")
er := NewExchangeRestore()
er.Include(er.Mails("uid", "mfld", "mid"))
return er
},
[]string{mail},
split(mail),
},
{
"exclude contact",
makeDeets(contact, event, mail),
func() *ExchangeRestore {
er := NewExchangeRestore("rpid")
er := NewExchangeRestore()
er.Include(er.Users(All))
er.Exclude(er.Contacts("uid", "cfld", "cid"))
return er
},
[]string{event, mail},
split(event, mail),
},
{
"exclude event",
makeDeets(contact, event, mail),
func() *ExchangeRestore {
er := NewExchangeRestore("rpid")
er := NewExchangeRestore()
er.Include(er.Users(All))
er.Exclude(er.Events("uid", "eid"))
return er
},
[]string{contact, mail},
split(contact, mail),
},
{
"exclude mail",
makeDeets(contact, event, mail),
func() *ExchangeRestore {
er := NewExchangeRestore("rpid")
er := NewExchangeRestore()
er.Include(er.Users(All))
er.Exclude(er.Mails("uid", "mfld", "mid"))
return er
},
[]string{contact, event},
split(contact, event),
},
}
for _, test := range table {
@ -753,7 +757,7 @@ func (suite *ExchangeSourceSuite) TestExchangeRestore_FilterDetails() {
func (suite *ExchangeSourceSuite) TestExchangeScopesByCategory() {
var (
es = NewExchangeRestore("rpid")
es = NewExchangeRestore()
users = es.Users(All)
contacts = es.ContactFolders(All, All)
events = es.Events(All, All)
@ -798,7 +802,7 @@ func (suite *ExchangeSourceSuite) TestMatchExchangeEntry() {
return extendExchangeScopeValues(None, exchangeScope(s))
}
var (
es = NewExchangeRestore("rpid")
es = NewExchangeRestore()
inAll = include(es.Users(All))
inNone = include(es.Users(None))
inMail = include(es.Mails(All, All, mail))

View File

@ -44,16 +44,14 @@ const (
// The core selector. Has no api for setting or retrieving data.
// Is only used to pass along more specific selector instances.
type Selector struct {
BackupID string `json:"backupID,omitempty"` // A backup id, used only by restore operations.
Service service `json:"service,omitempty"` // The service scope of the data. Exchange, Teams, Sharepoint, etc.
Excludes []map[string]string `json:"exclusions,omitempty"` // A slice of exclusions. Each exclusion applies to all inclusions.
Includes []map[string]string `json:"scopes,omitempty"` // A slice of inclusions. Expected to get cast to a service wrapper within each service handler.
}
// helper for specific selector instance constructors.
func newSelector(s service, backupID string) Selector {
func newSelector(s service) Selector {
return Selector{
BackupID: backupID,
Service: s,
Excludes: []map[string]string{},
Includes: []map[string]string{},

View File

@ -17,10 +17,9 @@ func TestSelectorSuite(t *testing.T) {
func (suite *SelectorSuite) TestNewSelector() {
t := suite.T()
s := newSelector(ServiceUnknown, "backupID")
s := newSelector(ServiceUnknown)
assert.NotNil(t, s)
assert.Equal(t, s.Service, ServiceUnknown)
assert.Equal(t, s.BackupID, "backupID")
assert.NotNil(t, s.Includes)
}