diff --git a/src/cli/restore/exchange.go b/src/cli/restore/exchange.go index 396364c48..e3658e455 100644 --- a/src/cli/restore/exchange.go +++ b/src/cli/restore/exchange.go @@ -11,6 +11,7 @@ import ( "github.com/alcionai/corso/cli/utils" "github.com/alcionai/corso/pkg/logger" "github.com/alcionai/corso/pkg/repository" + "github.com/alcionai/corso/pkg/selectors" ) // exchange bucket info from flags @@ -85,7 +86,7 @@ func restoreExchangeCmd(cmd *cobra.Command, args []string) error { } defer utils.CloseRepo(ctx, r) - ro, err := r.NewRestore(ctx, backupID, []string{m365.TenantID, user, "mail", folder, mail}) + ro, err := r.NewRestore(ctx, backupID, exchangeRestoreSelectors(user, folder, mail)) if err != nil { return errors.Wrap(err, "Failed to initialize Exchange restore") } @@ -98,9 +99,32 @@ func restoreExchangeCmd(cmd *cobra.Command, args []string) error { return nil } -func validateRestoreFlags(u, f, m, bID string) error { - if len(bID) == 0 { - return errors.New("a backup ID is requried") +func exchangeRestoreSelectors(u, f, m string) selectors.Selector { + sel := selectors.NewExchangeRestore() + if u == "*" { + u = selectors.All + } + if f == "*" { + f = selectors.All + } + if m == "*" { + m = selectors.All + } + if len(m) > 0 { + sel.Include(sel.Mails(u, f, m)) + } + if len(f) > 0 && len(m) == 0 { + sel.Include(sel.MailFolders(u, f)) + } + if len(f) == 0 && len(m) == 0 { + sel.Include(sel.Users(u)) + } + return sel.Selector +} + +func validateRestoreFlags(u, f, m, rpid string) error { + if len(rpid) == 0 { + return errors.New("a restore point ID is requried") } lu, lf, lm := len(u), len(f), len(m) if (lu == 0 || u == "*") && (lf+lm > 0) { diff --git a/src/go.mod b/src/go.mod index 912960bda..693d67fc9 100644 --- a/src/go.mod +++ b/src/go.mod @@ -9,7 +9,7 @@ require ( github.com/kopia/kopia v0.11.1 github.com/microsoft/kiota-abstractions-go v0.8.1 github.com/microsoft/kiota-authentication-azure-go v0.3.0 - github.com/microsoft/kiota-serialization-json-go v0.5.4 + github.com/microsoft/kiota-serialization-json-go v0.5.5 github.com/microsoftgraph/msgraph-sdk-go v0.28.0 github.com/microsoftgraph/msgraph-sdk-go-core v0.26.1 github.com/pkg/errors v0.9.1 @@ -45,7 +45,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/chmduquesne/rollinghash v4.0.0+incompatible // indirect - github.com/cjlapao/common-go v0.0.21 // indirect + github.com/cjlapao/common-go v0.0.22 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect diff --git a/src/go.sum b/src/go.sum index 5ca88f6ef..a846d1a86 100644 --- a/src/go.sum +++ b/src/go.sum @@ -69,8 +69,8 @@ github.com/chmduquesne/rollinghash v4.0.0+incompatible/go.mod h1:Uc2I36RRfTAf7Dg github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/cjlapao/common-go v0.0.21 h1:z4PDFLQG4pJxHEmM8ecmnjDgTcR0Xr/30WZiNZF2oYM= -github.com/cjlapao/common-go v0.0.21/go.mod h1:QHUcl8KX3RgNVonFJ1WpW4mlr9NyWOHmzqxaRbwooPo= +github.com/cjlapao/common-go v0.0.22 h1:hQQ4mMupPp47eZRb5D8mxjrp0VyRSWNk/FOld7wxMsQ= +github.com/cjlapao/common-go v0.0.22/go.mod h1:RZuwsymEIdwSubzUBpNYmmGfeITqHDV5iTgnr6zYwSc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -256,8 +256,8 @@ github.com/microsoft/kiota-authentication-azure-go v0.3.0 h1:iLyy5qldAjBiYMGMk1r github.com/microsoft/kiota-authentication-azure-go v0.3.0/go.mod h1:qyZWSCug2eG1zrRnCSacyFHGsgQa4aSCWn3EOkY9Z1M= github.com/microsoft/kiota-http-go v0.5.2 h1:BS/bK2xHLT8TT+p0uZKxwu+lkXDAPByugYP2n1nV0Uo= github.com/microsoft/kiota-http-go v0.5.2/go.mod h1:WqEFNw3rMEatymG4Xh3rLSTxaKq80rJdQ/CSSh7m6jI= -github.com/microsoft/kiota-serialization-json-go v0.5.4 h1:BpkTYq1AeZPCnSsp3zpzfNL9hx3xb1/LPFteV6tbhMQ= -github.com/microsoft/kiota-serialization-json-go v0.5.4/go.mod h1:GI9vrssO1EvqzDtvMKuhjALn40phZOWkeeaMgtCk6xE= +github.com/microsoft/kiota-serialization-json-go v0.5.5 h1:B0iKBKOdi+9NKFlormLRqduQ1+77MPGRsZ7xnd74EqQ= +github.com/microsoft/kiota-serialization-json-go v0.5.5/go.mod h1:GI9vrssO1EvqzDtvMKuhjALn40phZOWkeeaMgtCk6xE= github.com/microsoft/kiota-serialization-text-go v0.4.1 h1:6QPH7+geUPCpaSZkKCQw0Scngx2IF0vKodrvvWWiu2A= github.com/microsoft/kiota-serialization-text-go v0.4.1/go.mod h1:DsriFnVBDCc4D84qxG3j8q/1Sxu16JILfhxMZm3kdfw= github.com/microsoftgraph/msgraph-sdk-go v0.28.0 h1:BolP/vNW7gsNXivg/qikcdftOicLMgMm3Z/6PpSFDvU= diff --git a/src/internal/connector/graph_connector.go b/src/internal/connector/graph_connector.go index 0238e3db7..2701141df 100644 --- a/src/internal/connector/graph_connector.go +++ b/src/internal/connector/graph_connector.go @@ -5,6 +5,7 @@ package connector import ( "bytes" "context" + "strings" az "github.com/Azure/azure-sdk-for-go/sdk/azidentity" ka "github.com/microsoft/kiota-authentication-azure-go" @@ -187,59 +188,77 @@ func (gc *GraphConnector) ExchangeDataCollection(ctx context.Context, selector s // RestoreMessages: Utility function to connect to M365 backstore // and upload messages from DataCollection. // FullPath: tenantId, userId, , FolderId -func (gc *GraphConnector) RestoreMessages(ctx context.Context, dc DataCollection) error { - var errs error - // must be user.GetId(), PrimaryName no longer works 6-15-2022 - user := dc.FullPath()[1] - items := dc.Items() +func (gc *GraphConnector) RestoreMessages(ctx context.Context, dcs []DataCollection) error { + var ( + pathCounter = map[string]bool{} + attempts, successes int + errs error + ) - for { - select { - case <-ctx.Done(): - return support.WrapAndAppend("context cancelled", ctx.Err(), errs) - case data, ok := <-items: - if !ok { - return errs - } + for _, dc := range dcs { + // must be user.GetId(), PrimaryName no longer works 6-15-2022 + user := dc.FullPath()[1] + items := dc.Items() + pathCounter[strings.Join(dc.FullPath(), "")] = true - buf := &bytes.Buffer{} - _, err := buf.ReadFrom(data.ToReader()) - if err != nil { - errs = support.WrapAndAppend(data.UUID(), err, errs) - continue - } - message, err := support.CreateMessageFromBytes(buf.Bytes()) - if err != nil { - errs = support.WrapAndAppend(data.UUID(), err, errs) - continue - } - clone := support.ToMessage(message) - address := dc.FullPath()[3] - // details on valueId settings: https://docs.microsoft.com/en-us/openspecs/exchange_server_protocols/ms-oxprops/77844470-22ca-43fb-993d-c53e96cf9cd6 - valueId := "Integer 0x0E07" - enableValue := "4" - sv := models.NewSingleValueLegacyExtendedProperty() - sv.SetId(&valueId) - sv.SetValue(&enableValue) - svlep := []models.SingleValueLegacyExtendedPropertyable{sv} - clone.SetSingleValueExtendedProperties(svlep) - draft := false - clone.SetIsDraft(&draft) - sentMessage, err := gc.client.UsersById(user).MailFoldersById(address).Messages().Post(clone) - if err != nil { - errs = support.WrapAndAppend(data.UUID()+": "+ - support.ConnectorStackErrorTrace(err), err, errs) - continue - // TODO: Add to retry Handler for the for failure - } + var exit bool + for !exit { + select { + case <-ctx.Done(): + return support.WrapAndAppend("context cancelled", ctx.Err(), errs) + case data, ok := <-items: + if !ok { + exit = true + break + } + attempts++ - if sentMessage == nil && err == nil { - errs = support.WrapAndAppend(data.UUID(), errors.New("Message not Sent: Blocked by server"), errs) + buf := &bytes.Buffer{} + _, err := buf.ReadFrom(data.ToReader()) + if err != nil { + errs = support.WrapAndAppend(data.UUID(), err, errs) + continue + } + message, err := support.CreateMessageFromBytes(buf.Bytes()) + if err != nil { + errs = support.WrapAndAppend(data.UUID(), err, errs) + continue + } + clone := support.ToMessage(message) + address := dc.FullPath()[3] + valueId := "Integer 0x0E07" + enableValue := "4" + sv := models.NewSingleValueLegacyExtendedProperty() + sv.SetId(&valueId) + sv.SetValue(&enableValue) + svlep := []models.SingleValueLegacyExtendedPropertyable{sv} + clone.SetSingleValueExtendedProperties(svlep) + draft := false + clone.SetIsDraft(&draft) + sentMessage, err := gc.client.UsersById(user).MailFoldersById(address).Messages().Post(clone) + if err != nil { + errs = support.WrapAndAppend( + data.UUID()+": "+support.ConnectorStackErrorTrace(err), + err, errs) + continue + // TODO: Add to retry Handler for the for failure + } + if sentMessage == nil && err == nil { + errs = support.WrapAndAppend(data.UUID(), errors.New("Message not Sent: Blocked by server"), errs) + } + if err != nil { + successes++ + } + // This completes the restore loop for a message.. } - // This completes the restore loop for a message.. } } + + status := support.CreateStatus(ctx, support.Restore, attempts, successes, len(pathCounter), errs) + gc.SetStatus(*status) + logger.Ctx(ctx).Debug(gc.PrintableStatus()) + return errs } // serializeMessages: Temp Function as place Holder until Collections have been added @@ -299,11 +318,9 @@ func (gc *GraphConnector) serializeMessages(ctx context.Context, user string) ([ success += edc.Length() collections = append(collections, &edc) } - status, err := support.CreateStatus(support.Backup, attemptedItems, success, len(tasklist), errs) - if err == nil { - gc.SetStatus(*status) - logger.Ctx(ctx).Debugw(gc.PrintableStatus()) - } + status := support.CreateStatus(ctx, support.Backup, attemptedItems, success, len(tasklist), errs) + gc.SetStatus(*status) + logger.Ctx(ctx).Debugw(gc.PrintableStatus()) return collections, errs } diff --git a/src/internal/connector/graph_connector_test.go b/src/internal/connector/graph_connector_test.go index 9a30c99a8..edd73c3e3 100644 --- a/src/internal/connector/graph_connector_test.go +++ b/src/internal/connector/graph_connector_test.go @@ -83,7 +83,7 @@ func (suite *GraphConnectorIntegrationSuite) TestGraphConnector_restoreMessages( edc := NewExchangeDataCollection("tenant", []string{"tenantId", evs[user], mailCategory, "Inbox"}) edc.PopulateCollection(&ds) edc.FinishPopulation() - err = suite.connector.RestoreMessages(context.Background(), &edc) + err = suite.connector.RestoreMessages(context.Background(), []DataCollection{&edc}) assert.NoError(suite.T(), err) } @@ -165,9 +165,11 @@ func (suite *DisconnectedGraphConnectorSuite) TestInterfaceAlignment() { func (suite *DisconnectedGraphConnectorSuite) TestGraphConnector_Status() { gc := GraphConnector{} suite.Equal(len(gc.PrintableStatus()), 0) - status, err := support.CreateStatus(support.Restore, 12, 9, 8, + status := support.CreateStatus( + context.Background(), + support.Restore, + 12, 9, 8, support.WrapAndAppend("tres", errors.New("three"), support.WrapAndAppend("arc376", errors.New("one"), errors.New("two")))) - assert.NoError(suite.T(), err) gc.SetStatus(*status) suite.Greater(len(gc.PrintableStatus()), 0) suite.Greater(gc.Status().ObjectCount, 0) diff --git a/src/internal/connector/mockconnector/mock_data_collection.go b/src/internal/connector/mockconnector/mock_data_collection.go index 4a31697d9..1169b15cc 100644 --- a/src/internal/connector/mockconnector/mock_data_collection.go +++ b/src/internal/connector/mockconnector/mock_data_collection.go @@ -15,6 +15,8 @@ import ( type MockExchangeDataCollection struct { fullPath []string messageCount int + Data [][]byte + Names []string } var ( @@ -26,11 +28,19 @@ var ( // NewMockExchangeDataCollection creates an data collection that will return the specified number of // mock messages when iterated func NewMockExchangeDataCollection(pathRepresentation []string, numMessagesToReturn int) *MockExchangeDataCollection { - collection := &MockExchangeDataCollection{ + c := &MockExchangeDataCollection{ fullPath: pathRepresentation, messageCount: numMessagesToReturn, + Data: [][]byte{}, + Names: []string{}, } - return collection + + for i := 0; i < c.messageCount; i++ { + // We can plug in whatever data we want here (can be an io.Reader to a test data file if needed) + c.Data = append(c.Data, []byte("test message")) + c.Names = append(c.Names, uuid.NewString()) + } + return c } func (medc *MockExchangeDataCollection) FullPath() []string { @@ -44,11 +54,11 @@ func (medc *MockExchangeDataCollection) Items() <-chan connector.DataStream { go func() { defer close(res) - for i := 0; i < medc.messageCount; i++ { - // We can plug in whatever data we want here (can be an io.Reader to a test data file if needed) - m := []byte("test message") - res <- &MockExchangeData{uuid.NewString(), io.NopCloser(bytes.NewReader(m))} + res <- &MockExchangeData{ + medc.Names[i], + io.NopCloser(bytes.NewReader(medc.Data[i])), + } } }() diff --git a/src/internal/connector/support/status.go b/src/internal/connector/support/status.go index bb2e2c18c..fbb1b338b 100644 --- a/src/internal/connector/support/status.go +++ b/src/internal/connector/support/status.go @@ -1,8 +1,10 @@ package support import ( - "errors" + "context" "fmt" + + "github.com/alcionai/corso/pkg/logger" ) type ConnectorOperationStatus struct { @@ -25,25 +27,30 @@ const ( ) // Constructor for ConnectorOperationStatus. If the counts do not agree, an error is returned. -func CreateStatus(op Operation, objects, success, folders int, err error) (*ConnectorOperationStatus, error) { +func CreateStatus(ctx context.Context, op Operation, objects, success, folders int, err error) *ConnectorOperationStatus { hasErrors := err != nil var reason string if err != nil { reason = err.Error() } + numErr := GetNumberOfErrors(err) status := ConnectorOperationStatus{ lastOperation: op, ObjectCount: objects, folderCount: folders, successful: success, - errorCount: GetNumberOfErrors(err), + errorCount: numErr, incomplete: hasErrors, incompleteReason: reason, } if status.ObjectCount != status.errorCount+status.successful { - return nil, errors.New("incorrect total on initialization") + logger.Ctx(ctx).DPanicw( + "status object count does not match errors + successes", + "objects", objects, + "successes", success, + "errors", numErr) } - return &status, nil + return &status } func (cos *ConnectorOperationStatus) String() string { diff --git a/src/internal/connector/support/status_test.go b/src/internal/connector/support/status_test.go index 734b4157d..26812533f 100644 --- a/src/internal/connector/support/status_test.go +++ b/src/internal/connector/support/status_test.go @@ -1,6 +1,7 @@ package support import ( + "context" "errors" "testing" @@ -28,38 +29,37 @@ type statusParams struct { func (suite *GCStatusTestSuite) TestCreateStatus() { table := []struct { - name string - params statusParams - expected bool - checkError assert.ValueAssertionFunc + name string + params statusParams + expect assert.BoolAssertionFunc }{ { - name: "Test: Status Success", - params: statusParams{Backup, 12, 12, 3, nil}, - expected: false, - checkError: assert.Nil, + name: "Test: Status Success", + params: statusParams{Backup, 12, 12, 3, nil}, + expect: assert.False, }, { - name: "Test: Status Failed", - params: statusParams{Restore, 12, 9, 8, WrapAndAppend("tres", errors.New("three"), WrapAndAppend("arc376", errors.New("one"), errors.New("two")))}, - expected: true, - checkError: assert.Nil, + name: "Test: Status Failed", + params: statusParams{Restore, 12, 9, 8, WrapAndAppend("tres", errors.New("three"), WrapAndAppend("arc376", errors.New("one"), errors.New("two")))}, + expect: assert.True, }, { - name: "Invalid status", - params: statusParams{Backup, 9, 3, 12, errors.New("invalidcl")}, - expected: false, - checkError: assert.NotNil, + name: "Invalid status", + // todo: expect panic once logger.DPanicw identifies dev mode. + params: statusParams{Backup, 9, 3, 13, errors.New("invalidcl")}, + expect: assert.True, }, } for _, test := range table { suite.T().Run(test.name, func(t *testing.T) { - result, err := CreateStatus(test.params.operationType, test.params.objects, - test.params.success, test.params.folders, test.params.err) - test.checkError(t, err) - if err == nil { - suite.Equal(result.incomplete, test.expected) - } + result := CreateStatus( + context.Background(), + test.params.operationType, + test.params.objects, + test.params.success, + test.params.folders, + test.params.err) + test.expect(t, result.incomplete, "status is incomplete") }) } diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index dd910a9ac..1a6228247 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -321,13 +321,12 @@ func (w Wrapper) getEntry( return e, nil } -// collectItems is a generic helper function that pulls data from kopia for the -// given item in the snapshot with ID snapshotID. If isDirectory is true, it -// returns a slice of DataCollections with data from directories in the subtree -// rooted at itemPath. If isDirectory is false it returns a DataCollection (in a -// slice) with a single item corresponding to the requested item. If the item -// does not exist or a file is found when a directory is expected (or the -// opposite) it returns an error. +// CollectItems pulls data from kopia for the given items in the snapshot with +// ID snapshotID. If isDirectory is true, it returns a slice of DataCollections +// with data from directories in the subtree rooted at itemPath. If isDirectory +// is false it returns a DataCollection (in a slice) with a single item for each +// requested item. If the item does not exist or a file is found when a directory +// is expected (or the opposite) it returns an error. func (w Wrapper) collectItems( ctx context.Context, snapshotID string, @@ -528,3 +527,30 @@ func (w Wrapper) RestoreDirectory( ) ([]connector.DataCollection, error) { return w.collectItems(ctx, snapshotID, basePath, true) } + +// RestoreSingleItem looks up all paths- assuming each is an item declaration, +// not a directory- in the snapshot with id snapshotID. The path should be the +// full path of the item from the root. Returns the results as a slice of single- +// item DataCollections, where the DataCollection.FullPath() matches the path. +// If the item does not exist in kopia or is not a file an error is returned. +// The UUID of the returned DataStreams will be the name of the kopia file the +// data is sourced from. +func (w Wrapper) RestoreMultipleItems( + ctx context.Context, + snapshotID string, + paths [][]string, +) ([]connector.DataCollection, error) { + var ( + dcs = []connector.DataCollection{} + errs *multierror.Error + ) + for _, path := range paths { + dc, err := w.RestoreSingleItem(ctx, snapshotID, path) + if err != nil { + errs = multierror.Append(errs, err) + } else { + dcs = append(dcs, dc) + } + } + return dcs, errs.ErrorOrNil() +} diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 22d5f4a15..452078fdd 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -8,6 +8,7 @@ import ( "path" "testing" + "github.com/google/uuid" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/virtualfs" "github.com/kopia/kopia/repo/manifest" @@ -72,17 +73,12 @@ func testForFiles( fullPath := path.Join(append(c.FullPath(), s.UUID())...) expected, ok := expected[fullPath] - require.True( - t, - ok, - "unexpected file with path %q", - path.Join(append(c.FullPath(), fullPath)...), - ) + require.True(t, ok, "unexpected file with path %q", fullPath) buf, err := ioutil.ReadAll(s.ToReader()) - require.NoError(t, err) + require.NoError(t, err, "reading collection item: %s", fullPath) - assert.Equal(t, expected, buf) + assert.Equal(t, expected, buf, "comparing collection item: %s", fullPath) } } @@ -674,3 +670,79 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupRestoreDirectory_Errors( }) } } + +func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() { + t := suite.T() + ctx := context.Background() + + k, err := openKopiaRepo(t, ctx) + require.NoError(t, err) + + w := &Wrapper{k} + + tid := uuid.NewString() + p1 := []string{tid, "uid", "emails", "fid"} + p2 := []string{tid, "uid2", "emails", "fid"} + dc1 := mockconnector.NewMockExchangeDataCollection(p1, 1) + dc2 := mockconnector.NewMockExchangeDataCollection(p2, 1) + fp1 := append(p1, dc1.Names[0]) + fp2 := append(p2, dc2.Names[0]) + + stats, _, err := w.BackupCollections(ctx, []connector.DataCollection{dc1, dc2}) + require.NoError(t, err) + + expected := map[string][]byte{ + path.Join(fp1...): dc1.Data[0], + path.Join(fp2...): dc2.Data[0], + } + + result, err := w.RestoreMultipleItems( + ctx, + string(stats.SnapshotID), + [][]string{fp1, fp2}) + + require.NoError(t, err) + assert.Equal(t, 2, len(result)) + + testForFiles(t, expected, result) +} + +func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems_Errors() { + table := []struct { + name string + snapshotID string + paths [][]string + }{ + { + "EmptyPaths", + string(suite.snapshotID), + [][]string{{}}, + }, + { + "NoSnapshot", + "foo", + [][]string{append(testPath, testFileName)}, + }, + { + "TargetNotAFile", + string(suite.snapshotID), + [][]string{testPath[:2]}, + }, + { + "NonExistentFile", + string(suite.snapshotID), + [][]string{append(testPath, "subdir", "foo")}, + }, + } + + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + _, err := suite.w.RestoreMultipleItems( + suite.ctx, + test.snapshotID, + test.paths, + ) + require.Error(t, err) + }) + } +} diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 7ea973a85..942bc1d4c 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -9,6 +9,7 @@ import ( "github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/kopia" + "github.com/alcionai/corso/internal/model" "github.com/alcionai/corso/pkg/account" "github.com/alcionai/corso/pkg/backup" "github.com/alcionai/corso/pkg/selectors" @@ -29,7 +30,7 @@ type BackupOperation struct { type BackupResults struct { summary metrics - // todo: Backup ID + BackupID model.ID `json:"backupID"` } // NewBackupOperation constructs and validates a backup operation. @@ -113,10 +114,15 @@ func (op *BackupOperation) createBackupModels(ctx context.Context, snapID string return errors.Wrap(err, "creating backupdetails model") } - err = op.modelStore.Put(ctx, kopia.BackupModel, backup.New(snapID, string(details.ModelStoreID))) + rp := backup.New(snapID, string(details.ModelStoreID)) + + err = op.modelStore.Put(ctx, kopia.BackupModel, rp) if err != nil { return errors.Wrap(err, "creating backup model") } + + op.Results.BackupID = rp.StableID + return nil } diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index da37ae485..eacfa49db 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -75,7 +75,10 @@ type BackupOpIntegrationSuite struct { } func TestBackupOpIntegrationSuite(t *testing.T) { - if err := ctesting.RunOnAny(ctesting.CorsoCITests); err != nil { + if err := ctesting.RunOnAny( + ctesting.CorsoCITests, + ctesting.CorsoOperationTests, + ); err != nil { t.Skip(err) } suite.Run(t, new(BackupOpIntegrationSuite)) @@ -128,10 +131,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() { t := suite.T() ctx := context.Background() - // m365User := "lidiah@8qzvrj.onmicrosoft.com" - // not the user we want to use, but all the others are - // suffering from JsonParseNode syndrome - m365User := "george.martinez@8qzvrj.onmicrosoft.com" + m365User := "lidiah@8qzvrj.onmicrosoft.com" acct, err := ctesting.NewM365Account() require.NoError(t, err) @@ -168,6 +168,7 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() { require.NoError(t, bo.Run(ctx)) require.NotEmpty(t, bo.Results) + require.NotEmpty(t, bo.Results.BackupID) assert.Equal(t, bo.Status, Successful) assert.Greater(t, bo.Results.ItemsRead, 0) assert.Greater(t, bo.Results.ItemsWritten, 0) diff --git a/src/internal/operations/restore.go b/src/internal/operations/restore.go index c38e2dac7..b0591448c 100644 --- a/src/internal/operations/restore.go +++ b/src/internal/operations/restore.go @@ -4,22 +4,26 @@ import ( "context" "time" + "github.com/kopia/kopia/repo/manifest" "github.com/pkg/errors" "github.com/alcionai/corso/internal/connector" "github.com/alcionai/corso/internal/connector/support" "github.com/alcionai/corso/internal/kopia" + "github.com/alcionai/corso/internal/model" "github.com/alcionai/corso/pkg/account" + "github.com/alcionai/corso/pkg/backup" + "github.com/alcionai/corso/pkg/selectors" ) // RestoreOperation wraps an operation with restore-specific props. type RestoreOperation struct { operation - BackupID string `json:"backupID"` - Results RestoreResults `json:"results"` - Targets []string `json:"selectors"` // todo: replace with Selectors - Version string `json:"bersion"` + BackupID model.ID `json:"backupID"` + Results RestoreResults `json:"results"` + Selectors selectors.Selector `json:"selectors"` // todo: replace with Selectors + Version string `json:"version"` account account.Account } @@ -37,13 +41,13 @@ func NewRestoreOperation( kw *kopia.Wrapper, ms *kopia.ModelStore, acct account.Account, - backupID string, - targets []string, + backupID model.ID, + sel selectors.Selector, ) (RestoreOperation, error) { op := RestoreOperation{ operation: newOperation(opts, kw, ms), BackupID: backupID, - Targets: targets, + Selectors: sel, Version: "v0", account: acct, } @@ -77,23 +81,48 @@ func (op *RestoreOperation) Run(ctx context.Context) error { stats := restoreStats{} defer op.persistResults(time.Now(), &stats) - dc, err := op.kopia.RestoreSingleItem(ctx, op.BackupID, op.Targets) + // retrieve the restore point details + rp := backup.Backup{} + err := op.modelStore.Get(ctx, kopia.BackupModel, op.BackupID, &rp) + if err != nil { + stats.readErr = errors.Wrap(err, "retrieving restore point") + return stats.readErr + } + + rpd := backup.Details{} + err = op.modelStore.GetWithModelStoreID(ctx, kopia.BackupDetailsModel, manifest.ID(rp.DetailsID), &rpd) + if err != nil { + stats.readErr = errors.Wrap(err, "retrieving restore point details") + return stats.readErr + } + + er, err := op.Selectors.ToExchangeRestore() if err != nil { stats.readErr = err - return errors.Wrap(err, "retrieving service data") + return err } - stats.cs = []connector.DataCollection{dc} + // format the details and retrieve the items from kopia + fds := er.FilterDetails(&rpd) + dcs, err := op.kopia.RestoreMultipleItems(ctx, rp.SnapshotID, fds) + if err != nil { + stats.readErr = errors.Wrap(err, "retrieving service data") + return stats.readErr + } + stats.cs = dcs + + // restore those collections using graph gc, err := connector.NewGraphConnector(op.account) if err != nil { - stats.writeErr = err - return errors.Wrap(err, "connecting to graph api") + stats.writeErr = errors.Wrap(err, "connecting to graph api") + return stats.writeErr } - if err := gc.RestoreMessages(ctx, dc); err != nil { - stats.writeErr = err - return errors.Wrap(err, "restoring service data") + if err := gc.RestoreMessages(ctx, dcs); err != nil { + stats.writeErr = errors.Wrap(err, "restoring service data") + return stats.writeErr } + stats.gc = gc.Status() op.Status = Successful return nil diff --git a/src/internal/operations/restore_test.go b/src/internal/operations/restore_test.go index 25500bb0e..ba87c7ea5 100644 --- a/src/internal/operations/restore_test.go +++ b/src/internal/operations/restore_test.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/internal/kopia" ctesting "github.com/alcionai/corso/internal/testing" "github.com/alcionai/corso/pkg/account" + "github.com/alcionai/corso/pkg/selectors" ) // --------------------------------------------------------------------------- @@ -50,7 +51,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() { } ) - op, err := NewRestoreOperation(ctx, Options{}, kw, ms, acct, "foo", nil) + op, err := NewRestoreOperation(ctx, Options{}, kw, ms, acct, "foo", selectors.Selector{}) require.NoError(t, err) op.persistResults(now, &stats) @@ -73,7 +74,10 @@ type RestoreOpIntegrationSuite struct { } func TestRestoreOpIntegrationSuite(t *testing.T) { - if err := ctesting.RunOnAny(ctesting.CorsoCITests); err != nil { + if err := ctesting.RunOnAny( + ctesting.CorsoCITests, + ctesting.CorsoOperationTests, + ); err != nil { t.Skip(err) } suite.Run(t, new(RestoreOpIntegrationSuite)) @@ -112,8 +116,69 @@ func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() { test.ms, test.acct, "backup-id", - nil) + selectors.Selector{}) test.errCheck(t, err) }) } } + +func (suite *RestoreOpIntegrationSuite) TestRestore_Run() { + t := suite.T() + ctx := context.Background() + + m365User := "lidiah@8qzvrj.onmicrosoft.com" + acct, err := ctesting.NewM365Account() + require.NoError(t, err) + + // need to initialize the repository before we can test connecting to it. + st, err := ctesting.NewPrefixedS3Storage(t) + require.NoError(t, err) + + k := kopia.NewConn(st) + require.NoError(t, k.Initialize(ctx)) + defer k.Close(ctx) + + w, err := kopia.NewWrapper(k) + require.NoError(t, err) + defer w.Close(ctx) + + ms, err := kopia.NewModelStore(k) + require.NoError(t, err) + defer ms.Close(ctx) + + bsel := selectors.NewExchangeBackup() + bsel.Include(bsel.Users(m365User)) + + bo, err := NewBackupOperation( + ctx, + Options{}, + w, + ms, + acct, + bsel.Selector) + require.NoError(t, err) + require.NoError(t, bo.Run(ctx)) + require.NotEmpty(t, bo.Results.BackupID) + + rsel := selectors.NewExchangeRestore() + rsel.Include(rsel.Users(m365User)) + + ro, err := NewRestoreOperation( + ctx, + Options{}, + w, + ms, + acct, + bo.Results.BackupID, + rsel.Selector) + require.NoError(t, err) + + require.NoError(t, ro.Run(ctx), "restoreOp.Run()") + require.NotEmpty(t, ro.Results, "restoreOp results") + assert.Equal(t, ro.Status, Successful, "restoreOp status") + assert.Greater(t, ro.Results.ItemsRead, 0, "restore items read") + assert.Greater(t, ro.Results.ItemsWritten, 0, "restored items written") + assert.Zero(t, ro.Results.ReadErrors, "errors while reading restore data") + assert.Zero(t, ro.Results.WriteErrors, "errors while writing restore data") + assert.Equal(t, bo.Results.ItemsWritten, ro.Results.ItemsWritten, "backup and restore wrote the same num of items") +} diff --git a/src/internal/testing/integration_runners.go b/src/internal/testing/integration_runners.go index 10575338c..412ff5a07 100644 --- a/src/internal/testing/integration_runners.go +++ b/src/internal/testing/integration_runners.go @@ -14,6 +14,7 @@ const ( CorsoGraphConnectorTests = "CORSO_GRAPH_CONNECTOR_TESTS" CorsoKopiaWrapperTests = "CORSO_KOPIA_WRAPPER_TESTS" CorsoModelStoreTests = "CORSO_MODEL_STORE_TESTS" + CorsoOperationTests = "CORSO_OPERATION_TESTS" CorsoRepositoryTests = "CORSO_REPOSITORY_TESTS" ) diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 447e01212..16ca63f91 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" "github.com/alcionai/corso/internal/kopia" + "github.com/alcionai/corso/internal/model" "github.com/alcionai/corso/internal/operations" "github.com/alcionai/corso/pkg/account" "github.com/alcionai/corso/pkg/backup" @@ -138,15 +139,15 @@ func (r Repository) NewBackup(ctx context.Context, selector selectors.Selector) } // NewRestore generates a restoreOperation runner. -func (r Repository) NewRestore(ctx context.Context, backupID string, targets []string) (operations.RestoreOperation, error) { +func (r Repository) NewRestore(ctx context.Context, backupID string, sel selectors.Selector) (operations.RestoreOperation, error) { return operations.NewRestoreOperation( ctx, operations.Options{}, r.dataLayer, r.modelStore, r.Account, - backupID, - targets) + model.ID(backupID), + sel) } // backups lists backups in a respository diff --git a/src/pkg/repository/repository_test.go b/src/pkg/repository/repository_test.go index 1ee61e6f5..5525b6766 100644 --- a/src/pkg/repository/repository_test.go +++ b/src/pkg/repository/repository_test.go @@ -190,7 +190,7 @@ func (suite *RepositoryIntegrationSuite) TestNewRestore() { r, err := repository.Initialize(ctx, acct, st) require.NoError(t, err) - ro, err := r.NewRestore(ctx, "backup-id", []string{}) + ro, err := r.NewRestore(ctx, "backup-id", selectors.Selector{}) require.NoError(t, err) require.NotNil(t, ro) } diff --git a/src/pkg/selectors/exchange.go b/src/pkg/selectors/exchange.go index e6db1c761..911b34a14 100644 --- a/src/pkg/selectors/exchange.go +++ b/src/pkg/selectors/exchange.go @@ -36,7 +36,7 @@ type ( func NewExchangeBackup() *ExchangeBackup { src := ExchangeBackup{ exchange{ - newSelector(ServiceExchange, ""), + newSelector(ServiceExchange), }, } return &src @@ -53,10 +53,10 @@ func (s Selector) ToExchangeBackup() (*ExchangeBackup, error) { } // NewExchangeRestore produces a new Selector with the service set to ServiceExchange. -func NewExchangeRestore(backupID string) *ExchangeRestore { +func NewExchangeRestore() *ExchangeRestore { src := ExchangeRestore{ exchange{ - newSelector(ServiceExchange, backupID), + newSelector(ServiceExchange), }, } return &src @@ -406,15 +406,15 @@ func idPath(cat exchangeCategory, path []string) map[exchangeCategory]string { // FilterDetails reduces the entries in a backupDetails struct to only // those that match the inclusions and exclusions in the selector. -func (s *ExchangeRestore) FilterDetails(deets *backup.Details) []string { +func (s *ExchangeRestore) FilterDetails(deets *backup.Details) [][]string { if deets == nil { - return []string{} + return nil } entIncs := exchangeScopesByCategory(s.Includes) entExcs := exchangeScopesByCategory(s.Excludes) - refs := []string{} + refs := [][]string{} for _, ent := range deets.Entries { path := strings.Split(ent.RepoRef, "/") @@ -438,7 +438,7 @@ func (s *ExchangeRestore) FilterDetails(deets *backup.Details) []string { entIncs[cat.String()], entExcs[cat.String()]) if matched { - refs = append(refs, ent.RepoRef) + refs = append(refs, path) } } diff --git a/src/pkg/selectors/exchange_test.go b/src/pkg/selectors/exchange_test.go index 8acbd6545..a6c2f8f66 100644 --- a/src/pkg/selectors/exchange_test.go +++ b/src/pkg/selectors/exchange_test.go @@ -1,6 +1,7 @@ package selectors import ( + "strings" "testing" "github.com/alcionai/corso/pkg/backup" @@ -21,7 +22,6 @@ func (suite *ExchangeSourceSuite) TestNewExchangeBackup() { t := suite.T() eb := NewExchangeBackup() assert.Equal(t, eb.Service, ServiceExchange) - assert.Zero(t, eb.BackupID) assert.NotZero(t, eb.Scopes()) } @@ -32,26 +32,23 @@ func (suite *ExchangeSourceSuite) TestToExchangeBackup() { eb, err := s.ToExchangeBackup() require.NoError(t, err) assert.Equal(t, eb.Service, ServiceExchange) - assert.Zero(t, eb.BackupID) assert.NotZero(t, eb.Scopes()) } func (suite *ExchangeSourceSuite) TestNewExchangeRestore() { t := suite.T() - er := NewExchangeRestore("backupID") + er := NewExchangeRestore() assert.Equal(t, er.Service, ServiceExchange) - assert.Equal(t, er.BackupID, "backupID") assert.NotZero(t, er.Scopes()) } func (suite *ExchangeSourceSuite) TestToExchangeRestore() { t := suite.T() - eb := NewExchangeRestore("rpid") + eb := NewExchangeRestore() s := eb.Selector eb, err := s.ToExchangeRestore() require.NoError(t, err) assert.Equal(t, eb.Service, ServiceExchange) - assert.Equal(t, eb.BackupID, "rpid") assert.NotZero(t, eb.Scopes()) } @@ -486,7 +483,7 @@ func (suite *ExchangeSourceSuite) TestExchangeScope_IncludesPath() { ) var ( path = []string{"tid", usr, "mail", fld, mail} - es = NewExchangeRestore("rpid") + es = NewExchangeRestore() ) table := []struct { @@ -526,7 +523,7 @@ func (suite *ExchangeSourceSuite) TestExchangeScope_ExcludesPath() { ) var ( path = []string{"tid", usr, "mail", fld, mail} - es = NewExchangeRestore("rpid") + es = NewExchangeRestore() ) table := []struct { @@ -622,124 +619,131 @@ func (suite *ExchangeSourceSuite) TestExchangeRestore_FilterDetails() { event = "tid/uid/event/eid" mail = "tid/uid/mail/mfld/mid" ) + split := func(s ...string) [][]string { + r := [][]string{} + for _, ss := range s { + r = append(r, strings.Split(ss, "/")) + } + return r + } table := []struct { name string deets *backup.Details makeSelector func() *ExchangeRestore - expect []string + expect [][]string }{ { "no refs", makeDeets(), func() *ExchangeRestore { - er := NewExchangeRestore("rpid") + er := NewExchangeRestore() er.Include(er.Users(All)) return er }, - []string{}, + [][]string{}, }, { "contact only", makeDeets(contact), func() *ExchangeRestore { - er := NewExchangeRestore("rpid") + er := NewExchangeRestore() er.Include(er.Users(All)) return er }, - []string{contact}, + split(contact), }, { "event only", makeDeets(event), func() *ExchangeRestore { - er := NewExchangeRestore("rpid") + er := NewExchangeRestore() er.Include(er.Users(All)) return er }, - []string{event}, + split(event), }, { "mail only", makeDeets(mail), func() *ExchangeRestore { - er := NewExchangeRestore("rpid") + er := NewExchangeRestore() er.Include(er.Users(All)) return er }, - []string{mail}, + split(mail), }, { "all", makeDeets(contact, event, mail), func() *ExchangeRestore { - er := NewExchangeRestore("rpid") + er := NewExchangeRestore() er.Include(er.Users(All)) return er }, - []string{contact, event, mail}, + split(contact, event, mail), }, { "only match contact", makeDeets(contact, event, mail), func() *ExchangeRestore { - er := NewExchangeRestore("rpid") + er := NewExchangeRestore() er.Include(er.Contacts("uid", "cfld", "cid")) return er }, - []string{contact}, + split(contact), }, { "only match event", makeDeets(contact, event, mail), func() *ExchangeRestore { - er := NewExchangeRestore("rpid") + er := NewExchangeRestore() er.Include(er.Events("uid", "eid")) return er }, - []string{event}, + split(event), }, { "only match mail", makeDeets(contact, event, mail), func() *ExchangeRestore { - er := NewExchangeRestore("rpid") + er := NewExchangeRestore() er.Include(er.Mails("uid", "mfld", "mid")) return er }, - []string{mail}, + split(mail), }, { "exclude contact", makeDeets(contact, event, mail), func() *ExchangeRestore { - er := NewExchangeRestore("rpid") + er := NewExchangeRestore() er.Include(er.Users(All)) er.Exclude(er.Contacts("uid", "cfld", "cid")) return er }, - []string{event, mail}, + split(event, mail), }, { "exclude event", makeDeets(contact, event, mail), func() *ExchangeRestore { - er := NewExchangeRestore("rpid") + er := NewExchangeRestore() er.Include(er.Users(All)) er.Exclude(er.Events("uid", "eid")) return er }, - []string{contact, mail}, + split(contact, mail), }, { "exclude mail", makeDeets(contact, event, mail), func() *ExchangeRestore { - er := NewExchangeRestore("rpid") + er := NewExchangeRestore() er.Include(er.Users(All)) er.Exclude(er.Mails("uid", "mfld", "mid")) return er }, - []string{contact, event}, + split(contact, event), }, } for _, test := range table { @@ -753,7 +757,7 @@ func (suite *ExchangeSourceSuite) TestExchangeRestore_FilterDetails() { func (suite *ExchangeSourceSuite) TestExchangeScopesByCategory() { var ( - es = NewExchangeRestore("rpid") + es = NewExchangeRestore() users = es.Users(All) contacts = es.ContactFolders(All, All) events = es.Events(All, All) @@ -798,7 +802,7 @@ func (suite *ExchangeSourceSuite) TestMatchExchangeEntry() { return extendExchangeScopeValues(None, exchangeScope(s)) } var ( - es = NewExchangeRestore("rpid") + es = NewExchangeRestore() inAll = include(es.Users(All)) inNone = include(es.Users(None)) inMail = include(es.Mails(All, All, mail)) diff --git a/src/pkg/selectors/selectors.go b/src/pkg/selectors/selectors.go index b4c8867ee..a9f76759e 100644 --- a/src/pkg/selectors/selectors.go +++ b/src/pkg/selectors/selectors.go @@ -44,16 +44,14 @@ const ( // The core selector. Has no api for setting or retrieving data. // Is only used to pass along more specific selector instances. type Selector struct { - BackupID string `json:"backupID,omitempty"` // A backup id, used only by restore operations. Service service `json:"service,omitempty"` // The service scope of the data. Exchange, Teams, Sharepoint, etc. Excludes []map[string]string `json:"exclusions,omitempty"` // A slice of exclusions. Each exclusion applies to all inclusions. Includes []map[string]string `json:"scopes,omitempty"` // A slice of inclusions. Expected to get cast to a service wrapper within each service handler. } // helper for specific selector instance constructors. -func newSelector(s service, backupID string) Selector { +func newSelector(s service) Selector { return Selector{ - BackupID: backupID, Service: s, Excludes: []map[string]string{}, Includes: []map[string]string{}, diff --git a/src/pkg/selectors/selectors_test.go b/src/pkg/selectors/selectors_test.go index ae72f6cd0..c31154bdc 100644 --- a/src/pkg/selectors/selectors_test.go +++ b/src/pkg/selectors/selectors_test.go @@ -17,10 +17,9 @@ func TestSelectorSuite(t *testing.T) { func (suite *SelectorSuite) TestNewSelector() { t := suite.T() - s := newSelector(ServiceUnknown, "backupID") + s := newSelector(ServiceUnknown) assert.NotNil(t, s) assert.Equal(t, s.Service, ServiceUnknown) - assert.Equal(t, s.BackupID, "backupID") assert.NotNil(t, s.Includes) }