add backup to operations (#132)
* add backup to operations Extend the operations set with a backupOperation. New backup operations are created through an existing repository and, once created, can be Run synchronously. Integration tests have a skeleton, but the meat of them is on hold while dependencies undergo implementation. * remove operation progress
This commit is contained in:
parent
8a9c1acc9e
commit
39a9e0945d
57
src/internal/operations/backup.go
Normal file
57
src/internal/operations/backup.go
Normal file
@ -0,0 +1,57 @@
|
||||
package operations
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/alcionai/corso/internal/kopia"
|
||||
)
|
||||
|
||||
// BackupOperation wraps an operation with backup-specific props.
|
||||
type BackupOperation struct {
|
||||
operation
|
||||
Version string
|
||||
Targets []string // something for targets/filter/source/app&users/etc
|
||||
Work []string // something to reference the artifacts created, or at least their count
|
||||
|
||||
// todo - graphConnector data streams
|
||||
// dataStreams []*DataStream
|
||||
}
|
||||
|
||||
// NewBackupOperation constructs and validates a backup operation.
|
||||
func NewBackupOperation(
|
||||
ctx context.Context,
|
||||
opts OperationOpts,
|
||||
kw *kopia.KopiaWrapper,
|
||||
targets []string,
|
||||
) (BackupOperation, error) {
|
||||
// todo - initialize a graphConnector
|
||||
// gc, err := graphConnector.Connect(bo.account)
|
||||
|
||||
bo := BackupOperation{
|
||||
operation: newOperation(opts, kw),
|
||||
Version: "v0",
|
||||
Targets: targets,
|
||||
Work: []string{},
|
||||
}
|
||||
if err := bo.validate(); err != nil {
|
||||
return BackupOperation{}, err
|
||||
}
|
||||
|
||||
return bo, nil
|
||||
}
|
||||
|
||||
func (bo BackupOperation) validate() error {
|
||||
return bo.operation.validate()
|
||||
}
|
||||
|
||||
// Run begins a synchronous backup operation.
|
||||
func (bo BackupOperation) Run(ctx context.Context) error {
|
||||
// todo - use the graphConnector to create datastreams
|
||||
// dStreams, err := bo.gc.BackupOp(bo.Targets)
|
||||
|
||||
// todo - send backup write request to BackupWriter
|
||||
// err = kopia.BackupWriter(ctx, bo.gc.TenantID, wg, prog, dStreams...)
|
||||
|
||||
bo.Status = Successful
|
||||
return nil
|
||||
}
|
||||
48
src/internal/operations/backup_test.go
Normal file
48
src/internal/operations/backup_test.go
Normal file
@ -0,0 +1,48 @@
|
||||
package operations_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/internal/kopia"
|
||||
"github.com/alcionai/corso/internal/operations"
|
||||
)
|
||||
|
||||
type BackupOpIntegrationSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestBackupOpIntegrationSuite(t *testing.T) {
|
||||
if len(os.Getenv("CORSO_INTEGRATION_TESTS")) == 0 {
|
||||
t.Skip()
|
||||
}
|
||||
suite.Run(t, new(BackupOpIntegrationSuite))
|
||||
}
|
||||
|
||||
func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
||||
table := []struct {
|
||||
name string
|
||||
opts operations.OperationOpts
|
||||
kw *kopia.KopiaWrapper
|
||||
targets []string
|
||||
}{
|
||||
{"good", operations.OperationOpts{}, new(kopia.KopiaWrapper), nil},
|
||||
{"missing kopia", operations.OperationOpts{}, nil, nil},
|
||||
}
|
||||
for _, test := range table {
|
||||
suite.T().Run(test.name, func(t *testing.T) {
|
||||
_, err := operations.NewBackupOperation(
|
||||
context.Background(),
|
||||
operations.OperationOpts{},
|
||||
new(kopia.KopiaWrapper),
|
||||
nil)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// todo (rkeepers) - TestBackup_Run()
|
||||
@ -1,51 +0,0 @@
|
||||
package operations
|
||||
|
||||
type (
|
||||
progressChan chan string
|
||||
errorChan chan error
|
||||
)
|
||||
|
||||
// opProgress allows downstream writers to communicate async progress and
|
||||
// errors to the operation. Per-process wrappers of operation are required
|
||||
// to implement receivers for each channel.
|
||||
// Operations should not write to opProgress themselves.
|
||||
type opProgress struct {
|
||||
progressChan
|
||||
errorChan
|
||||
}
|
||||
|
||||
func newOpProgress() *opProgress {
|
||||
return &opProgress{
|
||||
progressChan: make(progressChan),
|
||||
errorChan: make(errorChan),
|
||||
}
|
||||
}
|
||||
|
||||
// Report transmits a progress report to the operation.
|
||||
// Cannot be called concurrently with Close()
|
||||
func (rch progressChan) Report(rpt string) {
|
||||
if rch != nil {
|
||||
rch <- rpt
|
||||
}
|
||||
}
|
||||
|
||||
// Error transmits an error report to the operation.
|
||||
// Cannot be called concurrently with Close()
|
||||
func (ech errorChan) Error(err error) {
|
||||
if ech != nil {
|
||||
ech <- err
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes all communication channels used by opProgress.
|
||||
// Should only be called by whichever component writes to opProgress.
|
||||
func (op *opProgress) Close() {
|
||||
if op.progressChan != nil {
|
||||
close(op.progressChan)
|
||||
op.progressChan = nil
|
||||
}
|
||||
if op.errorChan != nil {
|
||||
close(op.errorChan)
|
||||
op.errorChan = nil
|
||||
}
|
||||
}
|
||||
@ -1,86 +0,0 @@
|
||||
package operations
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
type OpProgressSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestOpProgressSuite(t *testing.T) {
|
||||
suite.Run(t, new(OpProgressSuite))
|
||||
}
|
||||
|
||||
func (suite *OpProgressSuite) TestNewOpProgress() {
|
||||
t := suite.T()
|
||||
|
||||
op := newOpProgress()
|
||||
assert.NotNil(t, op.progressChan)
|
||||
assert.NotNil(t, op.errorChan)
|
||||
|
||||
op.Close()
|
||||
assert.Nil(t, op.progressChan)
|
||||
assert.Nil(t, op.errorChan)
|
||||
}
|
||||
|
||||
func (suite *OpProgressSuite) TestOpProgress_Report() {
|
||||
t := suite.T()
|
||||
op := newOpProgress()
|
||||
go func() {
|
||||
for range op.progressChan {
|
||||
}
|
||||
}()
|
||||
|
||||
assert.NotPanics(t,
|
||||
func() {
|
||||
op.Report("test")
|
||||
})
|
||||
|
||||
ch := op.progressChan
|
||||
op.progressChan = nil
|
||||
assert.NotPanics(t,
|
||||
func() {
|
||||
op.Report("test")
|
||||
})
|
||||
|
||||
op.progressChan = ch
|
||||
op.Close()
|
||||
|
||||
assert.NotPanics(t,
|
||||
func() {
|
||||
op.Report("test")
|
||||
})
|
||||
}
|
||||
|
||||
func (suite *OpProgressSuite) TestOpProgress_Error() {
|
||||
t := suite.T()
|
||||
op := newOpProgress()
|
||||
go func() {
|
||||
for range op.errorChan {
|
||||
}
|
||||
}()
|
||||
|
||||
assert.NotPanics(t,
|
||||
func() {
|
||||
op.Error(assert.AnError)
|
||||
})
|
||||
|
||||
ch := op.errorChan
|
||||
op.errorChan = nil
|
||||
assert.NotPanics(t,
|
||||
func() {
|
||||
op.Error(assert.AnError)
|
||||
})
|
||||
|
||||
op.errorChan = ch
|
||||
op.Close()
|
||||
|
||||
assert.NotPanics(t,
|
||||
func() {
|
||||
op.Error(assert.AnError)
|
||||
})
|
||||
}
|
||||
@ -8,6 +8,7 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/internal/kopia"
|
||||
"github.com/alcionai/corso/internal/operations"
|
||||
"github.com/alcionai/corso/pkg/storage"
|
||||
)
|
||||
|
||||
@ -102,3 +103,12 @@ func (r *Repository) Close(ctx context.Context) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewBackup generates a backupOperation runner.
|
||||
func (r Repository) NewBackup(ctx context.Context, targets []string) (operations.BackupOperation, error) {
|
||||
return operations.NewBackupOperation(
|
||||
ctx,
|
||||
operations.OperationOpts{},
|
||||
r.dataLayer,
|
||||
targets)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user