From 39a9e0945d2070c4fbb18b85b6588d3967ed056b Mon Sep 17 00:00:00 2001 From: Keepers <104464746+ryanfkeepers@users.noreply.github.com> Date: Mon, 6 Jun 2022 14:11:54 -0600 Subject: [PATCH] add backup to operations (#132) * add backup to operations Extend the operations set with a backupOperation. New backup operations are created through an existing repository and, once created, can be Run synchronously. Integration tests have a skeleton, but the meat of them is on hold while dependencies undergo implementation. * remove operation progress --- src/internal/operations/backup.go | 57 ++++++++++++ src/internal/operations/backup_test.go | 48 +++++++++++ src/internal/operations/operation_progress.go | 51 ----------- .../operations/operation_progress_test.go | 86 ------------------- src/pkg/repository/repository.go | 10 +++ 5 files changed, 115 insertions(+), 137 deletions(-) create mode 100644 src/internal/operations/backup.go create mode 100644 src/internal/operations/backup_test.go delete mode 100644 src/internal/operations/operation_progress.go delete mode 100644 src/internal/operations/operation_progress_test.go diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go new file mode 100644 index 000000000..f128d1d51 --- /dev/null +++ b/src/internal/operations/backup.go @@ -0,0 +1,57 @@ +package operations + +import ( + "context" + + "github.com/alcionai/corso/internal/kopia" +) + +// BackupOperation wraps an operation with backup-specific props. +type BackupOperation struct { + operation + Version string + Targets []string // something for targets/filter/source/app&users/etc + Work []string // something to reference the artifacts created, or at least their count + + // todo - graphConnector data streams + // dataStreams []*DataStream +} + +// NewBackupOperation constructs and validates a backup operation. +func NewBackupOperation( + ctx context.Context, + opts OperationOpts, + kw *kopia.KopiaWrapper, + targets []string, +) (BackupOperation, error) { + // todo - initialize a graphConnector + // gc, err := graphConnector.Connect(bo.account) + + bo := BackupOperation{ + operation: newOperation(opts, kw), + Version: "v0", + Targets: targets, + Work: []string{}, + } + if err := bo.validate(); err != nil { + return BackupOperation{}, err + } + + return bo, nil +} + +func (bo BackupOperation) validate() error { + return bo.operation.validate() +} + +// Run begins a synchronous backup operation. +func (bo BackupOperation) Run(ctx context.Context) error { + // todo - use the graphConnector to create datastreams + // dStreams, err := bo.gc.BackupOp(bo.Targets) + + // todo - send backup write request to BackupWriter + // err = kopia.BackupWriter(ctx, bo.gc.TenantID, wg, prog, dStreams...) + + bo.Status = Successful + return nil +} diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go new file mode 100644 index 000000000..8f91075b1 --- /dev/null +++ b/src/internal/operations/backup_test.go @@ -0,0 +1,48 @@ +package operations_test + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/alcionai/corso/internal/kopia" + "github.com/alcionai/corso/internal/operations" +) + +type BackupOpIntegrationSuite struct { + suite.Suite +} + +func TestBackupOpIntegrationSuite(t *testing.T) { + if len(os.Getenv("CORSO_INTEGRATION_TESTS")) == 0 { + t.Skip() + } + suite.Run(t, new(BackupOpIntegrationSuite)) +} + +func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() { + table := []struct { + name string + opts operations.OperationOpts + kw *kopia.KopiaWrapper + targets []string + }{ + {"good", operations.OperationOpts{}, new(kopia.KopiaWrapper), nil}, + {"missing kopia", operations.OperationOpts{}, nil, nil}, + } + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + _, err := operations.NewBackupOperation( + context.Background(), + operations.OperationOpts{}, + new(kopia.KopiaWrapper), + nil) + assert.NoError(t, err) + }) + } +} + +// todo (rkeepers) - TestBackup_Run() diff --git a/src/internal/operations/operation_progress.go b/src/internal/operations/operation_progress.go deleted file mode 100644 index 9a67a5349..000000000 --- a/src/internal/operations/operation_progress.go +++ /dev/null @@ -1,51 +0,0 @@ -package operations - -type ( - progressChan chan string - errorChan chan error -) - -// opProgress allows downstream writers to communicate async progress and -// errors to the operation. Per-process wrappers of operation are required -// to implement receivers for each channel. -// Operations should not write to opProgress themselves. -type opProgress struct { - progressChan - errorChan -} - -func newOpProgress() *opProgress { - return &opProgress{ - progressChan: make(progressChan), - errorChan: make(errorChan), - } -} - -// Report transmits a progress report to the operation. -// Cannot be called concurrently with Close() -func (rch progressChan) Report(rpt string) { - if rch != nil { - rch <- rpt - } -} - -// Error transmits an error report to the operation. -// Cannot be called concurrently with Close() -func (ech errorChan) Error(err error) { - if ech != nil { - ech <- err - } -} - -// Close closes all communication channels used by opProgress. -// Should only be called by whichever component writes to opProgress. -func (op *opProgress) Close() { - if op.progressChan != nil { - close(op.progressChan) - op.progressChan = nil - } - if op.errorChan != nil { - close(op.errorChan) - op.errorChan = nil - } -} diff --git a/src/internal/operations/operation_progress_test.go b/src/internal/operations/operation_progress_test.go deleted file mode 100644 index 84026b868..000000000 --- a/src/internal/operations/operation_progress_test.go +++ /dev/null @@ -1,86 +0,0 @@ -package operations - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/suite" -) - -type OpProgressSuite struct { - suite.Suite -} - -func TestOpProgressSuite(t *testing.T) { - suite.Run(t, new(OpProgressSuite)) -} - -func (suite *OpProgressSuite) TestNewOpProgress() { - t := suite.T() - - op := newOpProgress() - assert.NotNil(t, op.progressChan) - assert.NotNil(t, op.errorChan) - - op.Close() - assert.Nil(t, op.progressChan) - assert.Nil(t, op.errorChan) -} - -func (suite *OpProgressSuite) TestOpProgress_Report() { - t := suite.T() - op := newOpProgress() - go func() { - for range op.progressChan { - } - }() - - assert.NotPanics(t, - func() { - op.Report("test") - }) - - ch := op.progressChan - op.progressChan = nil - assert.NotPanics(t, - func() { - op.Report("test") - }) - - op.progressChan = ch - op.Close() - - assert.NotPanics(t, - func() { - op.Report("test") - }) -} - -func (suite *OpProgressSuite) TestOpProgress_Error() { - t := suite.T() - op := newOpProgress() - go func() { - for range op.errorChan { - } - }() - - assert.NotPanics(t, - func() { - op.Error(assert.AnError) - }) - - ch := op.errorChan - op.errorChan = nil - assert.NotPanics(t, - func() { - op.Error(assert.AnError) - }) - - op.errorChan = ch - op.Close() - - assert.NotPanics(t, - func() { - op.Error(assert.AnError) - }) -} diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index c705ed1ed..117c2cd3b 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" "github.com/alcionai/corso/internal/kopia" + "github.com/alcionai/corso/internal/operations" "github.com/alcionai/corso/pkg/storage" ) @@ -102,3 +103,12 @@ func (r *Repository) Close(ctx context.Context) error { return nil } + +// NewBackup generates a backupOperation runner. +func (r Repository) NewBackup(ctx context.Context, targets []string) (operations.BackupOperation, error) { + return operations.NewBackupOperation( + ctx, + operations.OperationOpts{}, + r.dataLayer, + targets) +}