Split KopiaWrapper into handle and backup/restore logic (#281)
* Split KopiaWrapper into repo handle and logic With ModelStore, multiple structs need a reference to the kopia repo. Make a small wrapper class (conn) that can open and initialize a repo. The wrapper handles concurrent closes and opens and does ref counting to ensure it only drops the kopia handle when the last reference is closed. Rename KopiaWrapper to Wrapper and keep backup/restore functionality in it.
This commit is contained in:
parent
233aa43f4c
commit
70e7a1e2d3
162
src/internal/kopia/conn.go
Normal file
162
src/internal/kopia/conn.go
Normal file
@ -0,0 +1,162 @@
|
||||
package kopia
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/pkg/storage"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultKopiaConfigFilePath = "/tmp/repository.config"
|
||||
)
|
||||
|
||||
var (
|
||||
errInit = errors.New("initializing repo")
|
||||
errConnect = errors.New("connecting repo")
|
||||
)
|
||||
|
||||
type conn struct {
|
||||
storage storage.Storage
|
||||
repo.Repository
|
||||
mu sync.Mutex
|
||||
refCount int
|
||||
}
|
||||
|
||||
func NewConn(s storage.Storage) *conn {
|
||||
return &conn{
|
||||
storage: s,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *conn) Initialize(ctx context.Context) error {
|
||||
bst, err := blobStoreByProvider(ctx, w.storage)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, errInit.Error())
|
||||
}
|
||||
defer bst.Close(ctx)
|
||||
|
||||
cfg, err := w.storage.CommonConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// todo - issue #75: nil here should be a storage.NewRepoOptions()
|
||||
if err = repo.Initialize(ctx, bst, nil, cfg.CorsoPassword); err != nil {
|
||||
return errors.Wrap(err, errInit.Error())
|
||||
}
|
||||
|
||||
// todo - issue #75: nil here should be a storage.ConnectOptions()
|
||||
if err := repo.Connect(
|
||||
ctx,
|
||||
defaultKopiaConfigFilePath,
|
||||
bst,
|
||||
cfg.CorsoPassword,
|
||||
nil,
|
||||
); err != nil {
|
||||
return errors.Wrap(err, errConnect.Error())
|
||||
}
|
||||
|
||||
if err := w.open(ctx, cfg.CorsoPassword); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *conn) Connect(ctx context.Context) error {
|
||||
bst, err := blobStoreByProvider(ctx, w.storage)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, errInit.Error())
|
||||
}
|
||||
defer bst.Close(ctx)
|
||||
|
||||
cfg, err := w.storage.CommonConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// todo - issue #75: nil here should be storage.ConnectOptions()
|
||||
if err := repo.Connect(
|
||||
ctx,
|
||||
defaultKopiaConfigFilePath,
|
||||
bst,
|
||||
cfg.CorsoPassword,
|
||||
nil,
|
||||
); err != nil {
|
||||
return errors.Wrap(err, errConnect.Error())
|
||||
}
|
||||
|
||||
if err := w.open(ctx, cfg.CorsoPassword); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage, error) {
|
||||
switch s.Provider {
|
||||
case storage.ProviderS3:
|
||||
return s3BlobStorage(ctx, s)
|
||||
default:
|
||||
return nil, errors.New("storage provider details are required")
|
||||
}
|
||||
}
|
||||
|
||||
func (w *conn) Close(ctx context.Context) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if w.refCount == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
w.refCount--
|
||||
|
||||
if w.refCount > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return w.close(ctx)
|
||||
}
|
||||
|
||||
// close closes the kopia handle. Safe to run without the mutex because other
|
||||
// functions check only the refCount variable.
|
||||
func (w *conn) close(ctx context.Context) error {
|
||||
err := w.Repository.Close(ctx)
|
||||
w.Repository = nil
|
||||
|
||||
return errors.Wrap(err, "closing repository connection")
|
||||
}
|
||||
|
||||
func (w *conn) open(ctx context.Context, password string) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
w.refCount++
|
||||
|
||||
// TODO(ashmrtnz): issue #75: nil here should be storage.ConnectionOptions().
|
||||
rep, err := repo.Open(ctx, defaultKopiaConfigFilePath, password, nil)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "opening repository connection")
|
||||
}
|
||||
|
||||
w.Repository = rep
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *conn) wrap() error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if w.refCount == 0 {
|
||||
return errors.New("conn already closed")
|
||||
}
|
||||
|
||||
w.refCount++
|
||||
return nil
|
||||
}
|
||||
111
src/internal/kopia/conn_test.go
Normal file
111
src/internal/kopia/conn_test.go
Normal file
@ -0,0 +1,111 @@
|
||||
package kopia
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
ctesting "github.com/alcionai/corso/internal/testing"
|
||||
)
|
||||
|
||||
func openKopiaRepo(t *testing.T, ctx context.Context) (*conn, error) {
|
||||
storage, err := ctesting.NewPrefixedS3Storage(t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
k := NewConn(storage)
|
||||
if err = k.Initialize(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return k, nil
|
||||
}
|
||||
|
||||
// ---------------
|
||||
// unit tests
|
||||
// ---------------
|
||||
type WrapperUnitSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestWrapperUnitSuite(t *testing.T) {
|
||||
suite.Run(t, new(WrapperUnitSuite))
|
||||
}
|
||||
|
||||
func (suite *WrapperUnitSuite) TestCloseWithoutOpenDoesNotCrash() {
|
||||
ctx := context.Background()
|
||||
|
||||
k := conn{}
|
||||
assert.NotPanics(suite.T(), func() {
|
||||
k.Close(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------
|
||||
// integration tests that use kopia
|
||||
// ---------------
|
||||
type WrapperIntegrationSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestWrapperIntegrationSuite(t *testing.T) {
|
||||
if err := ctesting.RunOnAny(
|
||||
ctesting.CorsoCITests,
|
||||
ctesting.CorsoKopiaWrapperTests,
|
||||
); err != nil {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
suite.Run(t, new(WrapperIntegrationSuite))
|
||||
}
|
||||
|
||||
func (suite *WrapperIntegrationSuite) SetupSuite() {
|
||||
_, err := ctesting.GetRequiredEnvVars(ctesting.AWSStorageCredEnvs...)
|
||||
require.NoError(suite.T(), err)
|
||||
}
|
||||
|
||||
func (suite *WrapperIntegrationSuite) TestCloseTwiceDoesNotCrash() {
|
||||
ctx := context.Background()
|
||||
t := suite.T()
|
||||
|
||||
k, err := openKopiaRepo(t, ctx)
|
||||
require.NoError(t, err)
|
||||
assert.NoError(t, k.Close(ctx))
|
||||
assert.Nil(t, k.Repository)
|
||||
assert.NoError(t, k.Close(ctx))
|
||||
}
|
||||
|
||||
func (suite *WrapperIntegrationSuite) TestCloseAfterWrap() {
|
||||
ctx := context.Background()
|
||||
t := suite.T()
|
||||
|
||||
k, err := openKopiaRepo(t, ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
k.wrap()
|
||||
|
||||
assert.Equal(t, 2, k.refCount)
|
||||
|
||||
require.NoError(t, k.Close(ctx))
|
||||
assert.NotNil(t, k.Repository)
|
||||
assert.Equal(t, 1, k.refCount)
|
||||
|
||||
require.NoError(t, k.Close(ctx))
|
||||
assert.Nil(t, k.Repository)
|
||||
assert.Equal(t, 0, k.refCount)
|
||||
}
|
||||
|
||||
func (suite *WrapperIntegrationSuite) TestOpenAfterClose() {
|
||||
ctx := context.Background()
|
||||
t := suite.T()
|
||||
|
||||
k, err := openKopiaRepo(t, ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.NoError(t, k.Close(ctx))
|
||||
assert.Error(t, k.wrap())
|
||||
}
|
||||
@ -31,13 +31,27 @@ const (
|
||||
RestorePointDetailsModel
|
||||
)
|
||||
|
||||
func NewModelStore(kw *KopiaWrapper) *ModelStore {
|
||||
return &ModelStore{wrapper: kw}
|
||||
func NewModelStore(c *conn) (*ModelStore, error) {
|
||||
if err := c.wrap(); err != nil {
|
||||
return nil, errors.Wrap(err, "creating ModelStore")
|
||||
}
|
||||
return &ModelStore{c: c}, nil
|
||||
}
|
||||
|
||||
// ModelStore must not be accessed after the given KopiaWrapper is closed.
|
||||
type ModelStore struct {
|
||||
wrapper *KopiaWrapper
|
||||
c *conn
|
||||
}
|
||||
|
||||
func (ms *ModelStore) Close(ctx context.Context) error {
|
||||
if ms.c == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := ms.c.Close(ctx)
|
||||
ms.c = nil
|
||||
|
||||
return errors.Wrap(err, "closing ModelStore")
|
||||
}
|
||||
|
||||
// tagsForModel creates a copy of tags and adds a tag for the model type to it.
|
||||
@ -126,7 +140,7 @@ func (ms *ModelStore) Put(
|
||||
) error {
|
||||
err := repo.WriteSession(
|
||||
ctx,
|
||||
ms.wrapper.rep,
|
||||
ms.c,
|
||||
repo.WriteSessionOptions{Purpose: "ModelStorePut"},
|
||||
func(innerCtx context.Context, w repo.RepositoryWriter) error {
|
||||
err := putInner(innerCtx, w, t, m, true)
|
||||
@ -179,7 +193,7 @@ func (ms *ModelStore) GetIDsForType(
|
||||
return nil, errors.Wrap(err, "getting model metadata")
|
||||
}
|
||||
|
||||
metadata, err := ms.wrapper.rep.FindManifests(ctx, tmpTags)
|
||||
metadata, err := ms.c.FindManifests(ctx, tmpTags)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "getting model metadata")
|
||||
}
|
||||
@ -211,7 +225,7 @@ func (ms *ModelStore) getModelStoreID(
|
||||
}
|
||||
|
||||
tags := map[string]string{stableIDKey: string(id)}
|
||||
metadata, err := ms.wrapper.rep.FindManifests(ctx, tags)
|
||||
metadata, err := ms.c.FindManifests(ctx, tags)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "getting ModelStoreID")
|
||||
}
|
||||
@ -261,7 +275,7 @@ func (ms *ModelStore) GetWithModelStoreID(
|
||||
return errors.WithStack(errNoModelStoreID)
|
||||
}
|
||||
|
||||
metadata, err := ms.wrapper.rep.GetManifest(ctx, id, data)
|
||||
metadata, err := ms.c.GetManifest(ctx, id, data)
|
||||
// TODO(ashmrtnz): Should probably return some recognizable, non-kopia error
|
||||
// if not found. That way kopia doesn't need to be imported to higher layers.
|
||||
if err != nil {
|
||||
@ -295,7 +309,7 @@ func (ms *ModelStore) checkPrevModelVersion(
|
||||
}
|
||||
|
||||
// We actually got something back during our lookup.
|
||||
meta, err := ms.wrapper.rep.GetManifest(ctx, id, nil)
|
||||
meta, err := ms.c.GetManifest(ctx, id, nil)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "getting previous model version")
|
||||
}
|
||||
@ -333,7 +347,7 @@ func (ms *ModelStore) Update(
|
||||
|
||||
err := repo.WriteSession(
|
||||
ctx,
|
||||
ms.wrapper.rep,
|
||||
ms.c,
|
||||
repo.WriteSessionOptions{Purpose: "ModelStoreUpdate"},
|
||||
func(innerCtx context.Context, w repo.RepositoryWriter) (innerErr error) {
|
||||
oldID := base.ModelStoreID
|
||||
@ -393,7 +407,7 @@ func (ms *ModelStore) DeleteWithModelStoreID(ctx context.Context, id manifest.ID
|
||||
|
||||
err := repo.WriteSession(
|
||||
ctx,
|
||||
ms.wrapper.rep,
|
||||
ms.c,
|
||||
repo.WriteSessionOptions{Purpose: "ModelStoreDelete"},
|
||||
func(innerCtx context.Context, w repo.RepositoryWriter) error {
|
||||
return w.DeleteManifest(innerCtx, id)
|
||||
|
||||
@ -21,10 +21,27 @@ type fooModel struct {
|
||||
}
|
||||
|
||||
func getModelStore(t *testing.T, ctx context.Context) *ModelStore {
|
||||
kw, err := openKopiaRepo(t, ctx)
|
||||
c, err := openKopiaRepo(t, ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
return NewModelStore(kw)
|
||||
return &ModelStore{c}
|
||||
}
|
||||
|
||||
// ---------------
|
||||
// unit tests
|
||||
// ---------------
|
||||
type ModelStoreUnitSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestModelStoreUnitSuite(t *testing.T) {
|
||||
suite.Run(t, new(ModelStoreUnitSuite))
|
||||
}
|
||||
func (suite *ModelStoreUnitSuite) TestCloseWithoutInitDoesNotPanic() {
|
||||
assert.NotPanics(suite.T(), func() {
|
||||
m := &ModelStore{}
|
||||
m.Close(context.Background())
|
||||
})
|
||||
}
|
||||
|
||||
// ---------------
|
||||
@ -58,7 +75,7 @@ func (suite *ModelStoreIntegrationSuite) SetupTest() {
|
||||
}
|
||||
|
||||
func (suite *ModelStoreIntegrationSuite) TearDownTest() {
|
||||
assert.NoError(suite.T(), suite.m.wrapper.Close(suite.ctx))
|
||||
assert.NoError(suite.T(), suite.m.Close(suite.ctx))
|
||||
}
|
||||
|
||||
func (suite *ModelStoreIntegrationSuite) TestBadTagsErrors() {
|
||||
@ -309,7 +326,7 @@ func (suite *ModelStoreIntegrationSuite) TestPutUpdate() {
|
||||
|
||||
m := getModelStore(t, ctx)
|
||||
defer func() {
|
||||
assert.NoError(t, m.wrapper.Close(ctx))
|
||||
assert.NoError(t, m.c.Close(ctx))
|
||||
}()
|
||||
|
||||
foo := &fooModel{Bar: uuid.NewString()}
|
||||
@ -376,7 +393,7 @@ func (suite *ModelStoreIntegrationSuite) TestPutUpdate_FailsNotMatchingPrev() {
|
||||
|
||||
m := getModelStore(t, ctx)
|
||||
defer func() {
|
||||
assert.NoError(t, m.wrapper.Close(ctx))
|
||||
assert.NoError(t, m.c.Close(ctx))
|
||||
}()
|
||||
|
||||
foo := &fooModel{Bar: uuid.NewString()}
|
||||
@ -446,7 +463,7 @@ func (suite *ModelStoreRegressionSuite) TestFailDuringWriteSessionHasNoVisibleEf
|
||||
|
||||
m := getModelStore(t, ctx)
|
||||
defer func() {
|
||||
assert.NoError(t, m.wrapper.Close(ctx))
|
||||
assert.NoError(t, m.c.Close(ctx))
|
||||
}()
|
||||
|
||||
foo := &fooModel{Bar: uuid.NewString()}
|
||||
@ -462,7 +479,7 @@ func (suite *ModelStoreRegressionSuite) TestFailDuringWriteSessionHasNoVisibleEf
|
||||
newID := manifest.ID("")
|
||||
err := repo.WriteSession(
|
||||
ctx,
|
||||
m.wrapper.rep,
|
||||
m.c,
|
||||
repo.WriteSessionOptions{Purpose: "WriteSessionFailureTest"},
|
||||
func(innerCtx context.Context, w repo.RepositoryWriter) (innerErr error) {
|
||||
base := foo.Base()
|
||||
|
||||
@ -6,7 +6,6 @@ import (
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/fs/virtualfs"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
@ -14,12 +13,9 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/internal/connector"
|
||||
"github.com/alcionai/corso/pkg/storage"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultKopiaConfigFilePath = "/tmp/repository.config"
|
||||
|
||||
// TODO(ashmrtnz): These should be some values from upper layer corso,
|
||||
// possibly corresponding to who is making the backup.
|
||||
kTestHost = "a-test-machine"
|
||||
@ -27,8 +23,6 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
errInit = errors.New("initializing repo")
|
||||
errConnect = errors.New("connecting repo")
|
||||
errNotConnected = errors.New("not connected to repo")
|
||||
errUnsupportedDir = errors.New("unsupported static children in streaming directory")
|
||||
)
|
||||
@ -55,113 +49,26 @@ func manifestToStats(man *snapshot.Manifest) BackupStats {
|
||||
}
|
||||
}
|
||||
|
||||
type KopiaWrapper struct {
|
||||
storage storage.Storage
|
||||
rep repo.Repository
|
||||
func NewWrapper(c *conn) (*Wrapper, error) {
|
||||
if err := c.wrap(); err != nil {
|
||||
return nil, errors.Wrap(err, "creating Wrapper")
|
||||
}
|
||||
return &Wrapper{c}, nil
|
||||
}
|
||||
|
||||
func New(s storage.Storage) *KopiaWrapper {
|
||||
return &KopiaWrapper{storage: s}
|
||||
type Wrapper struct {
|
||||
c *conn
|
||||
}
|
||||
|
||||
func (kw *KopiaWrapper) Initialize(ctx context.Context) error {
|
||||
bst, err := blobStoreByProvider(ctx, kw.storage)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, errInit.Error())
|
||||
}
|
||||
defer bst.Close(ctx)
|
||||
|
||||
cfg, err := kw.storage.CommonConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// todo - issue #75: nil here should be a storage.NewRepoOptions()
|
||||
if err = repo.Initialize(ctx, bst, nil, cfg.CorsoPassword); err != nil {
|
||||
return errors.Wrap(err, errInit.Error())
|
||||
}
|
||||
|
||||
// todo - issue #75: nil here should be a storage.ConnectOptions()
|
||||
if err := repo.Connect(
|
||||
ctx,
|
||||
defaultKopiaConfigFilePath,
|
||||
bst,
|
||||
cfg.CorsoPassword,
|
||||
nil,
|
||||
); err != nil {
|
||||
return errors.Wrap(err, errConnect.Error())
|
||||
}
|
||||
|
||||
if err := kw.open(ctx, cfg.CorsoPassword); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kw *KopiaWrapper) Connect(ctx context.Context) error {
|
||||
bst, err := blobStoreByProvider(ctx, kw.storage)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, errInit.Error())
|
||||
}
|
||||
defer bst.Close(ctx)
|
||||
|
||||
cfg, err := kw.storage.CommonConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// todo - issue #75: nil here should be storage.ConnectOptions()
|
||||
if err := repo.Connect(
|
||||
ctx,
|
||||
defaultKopiaConfigFilePath,
|
||||
bst,
|
||||
cfg.CorsoPassword,
|
||||
nil,
|
||||
); err != nil {
|
||||
return errors.Wrap(err, errConnect.Error())
|
||||
}
|
||||
|
||||
if err := kw.open(ctx, cfg.CorsoPassword); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage, error) {
|
||||
switch s.Provider {
|
||||
case storage.ProviderS3:
|
||||
return s3BlobStorage(ctx, s)
|
||||
default:
|
||||
return nil, errors.New("storage provider details are required")
|
||||
}
|
||||
}
|
||||
|
||||
func (kw *KopiaWrapper) Close(ctx context.Context) error {
|
||||
if kw.rep == nil {
|
||||
func (w *Wrapper) Close(ctx context.Context) error {
|
||||
if w.c == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := kw.rep.Close(ctx)
|
||||
kw.rep = nil
|
||||
err := w.c.Close(ctx)
|
||||
w.c = nil
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "closing repository connection")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (kw *KopiaWrapper) open(ctx context.Context, password string) error {
|
||||
// TODO(ashmrtnz): issue #75: nil here should be storage.ConnectionOptions().
|
||||
rep, err := repo.Open(ctx, defaultKopiaConfigFilePath, password, nil)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "opening repository connection")
|
||||
}
|
||||
|
||||
kw.rep = rep
|
||||
return nil
|
||||
return errors.Wrap(err, "closing Wrapper")
|
||||
}
|
||||
|
||||
// getStreamItemFunc returns a function that can be used by kopia's
|
||||
@ -306,11 +213,11 @@ func inflateDirTree(ctx context.Context, collections []connector.DataCollection)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (kw KopiaWrapper) BackupCollections(
|
||||
func (w Wrapper) BackupCollections(
|
||||
ctx context.Context,
|
||||
collections []connector.DataCollection,
|
||||
) (*BackupStats, error) {
|
||||
if kw.rep == nil {
|
||||
if w.c == nil {
|
||||
return nil, errNotConnected
|
||||
}
|
||||
|
||||
@ -319,7 +226,7 @@ func (kw KopiaWrapper) BackupCollections(
|
||||
return nil, errors.Wrap(err, "building kopia directories")
|
||||
}
|
||||
|
||||
stats, err := kw.makeSnapshotWithRoot(ctx, dirTree)
|
||||
stats, err := w.makeSnapshotWithRoot(ctx, dirTree)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -327,7 +234,7 @@ func (kw KopiaWrapper) BackupCollections(
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func (kw KopiaWrapper) makeSnapshotWithRoot(
|
||||
func (w Wrapper) makeSnapshotWithRoot(
|
||||
ctx context.Context,
|
||||
root fs.Directory,
|
||||
) (*BackupStats, error) {
|
||||
@ -337,12 +244,12 @@ func (kw KopiaWrapper) makeSnapshotWithRoot(
|
||||
// TODO(ashmrtnz): will this be something useful for snapshot lookups later?
|
||||
Path: root.Name(),
|
||||
}
|
||||
ctx, rw, err := kw.rep.NewWriter(ctx, repo.WriteSessionOptions{})
|
||||
ctx, rw, err := w.c.NewWriter(ctx, repo.WriteSessionOptions{})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get repo writer")
|
||||
}
|
||||
|
||||
policyTree, err := policy.TreeForSource(ctx, kw.rep, si)
|
||||
policyTree, err := policy.TreeForSource(ctx, w.c, si)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get policy tree")
|
||||
}
|
||||
@ -373,23 +280,23 @@ func (kw KopiaWrapper) makeSnapshotWithRoot(
|
||||
// split(dirname(itemPath), "/"). If the item does not exist in kopia or is not
|
||||
// a file an error is returned. The UUID of the returned DataStreams will be the
|
||||
// name of the kopia file the data is sourced from.
|
||||
func (kw KopiaWrapper) RestoreSingleItem(
|
||||
func (w Wrapper) RestoreSingleItem(
|
||||
ctx context.Context,
|
||||
snapshotID string,
|
||||
itemPath []string,
|
||||
) (connector.DataCollection, error) {
|
||||
manifest, err := snapshot.LoadSnapshot(ctx, kw.rep, manifest.ID(snapshotID))
|
||||
manifest, err := snapshot.LoadSnapshot(ctx, w.c, manifest.ID(snapshotID))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "getting snapshot handle")
|
||||
}
|
||||
|
||||
rootDirEntry, err := snapshotfs.SnapshotRoot(kw.rep, manifest)
|
||||
rootDirEntry, err := snapshotfs.SnapshotRoot(w.c, manifest)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "getting root directory")
|
||||
}
|
||||
|
||||
// Fine if rootDirEntry is nil, will be checked in called function.
|
||||
return kw.restoreSingleItem(ctx, rootDirEntry, itemPath[1:])
|
||||
return w.restoreSingleItem(ctx, rootDirEntry, itemPath[1:])
|
||||
}
|
||||
|
||||
// restoreSingleItem looks up the item at the given path starting from rootDir
|
||||
@ -399,7 +306,7 @@ func (kw KopiaWrapper) RestoreSingleItem(
|
||||
// does not exist in kopia or is not a file an error is returned. The UUID of
|
||||
// the returned DataStreams will be the name of the kopia file the data is
|
||||
// sourced from.
|
||||
func (kw KopiaWrapper) restoreSingleItem(
|
||||
func (w Wrapper) restoreSingleItem(
|
||||
ctx context.Context,
|
||||
rootDir fs.Entry,
|
||||
itemPath []string,
|
||||
@ -31,20 +31,6 @@ var (
|
||||
testFileData = []byte("abcdefghijklmnopqrstuvwxyz")
|
||||
)
|
||||
|
||||
func openKopiaRepo(t *testing.T, ctx context.Context) (*KopiaWrapper, error) {
|
||||
storage, err := ctesting.NewPrefixedS3Storage(t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
k := New(storage)
|
||||
if err = k.Initialize(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return k, nil
|
||||
}
|
||||
|
||||
func entriesToNames(entries []fs.Entry) []string {
|
||||
res := make([]string, 0, len(entries))
|
||||
|
||||
@ -66,13 +52,10 @@ func TestKopiaUnitSuite(t *testing.T) {
|
||||
suite.Run(t, new(KopiaUnitSuite))
|
||||
}
|
||||
|
||||
func (suite *KopiaUnitSuite) TestCloseWithoutOpenDoesNotCrash() {
|
||||
ctx := context.Background()
|
||||
ctesting.LogTimeOfTest(suite.T())
|
||||
|
||||
k := KopiaWrapper{}
|
||||
func (suite *KopiaUnitSuite) TestCloseWithoutInitDoesNotPanic() {
|
||||
assert.NotPanics(suite.T(), func() {
|
||||
k.Close(ctx)
|
||||
w := &Wrapper{}
|
||||
w.Close(context.Background())
|
||||
})
|
||||
}
|
||||
|
||||
@ -233,7 +216,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_Fails() {
|
||||
// ---------------
|
||||
type KopiaIntegrationSuite struct {
|
||||
suite.Suite
|
||||
k *KopiaWrapper
|
||||
w *Wrapper
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
@ -255,24 +238,13 @@ func (suite *KopiaIntegrationSuite) SetupSuite() {
|
||||
|
||||
func (suite *KopiaIntegrationSuite) SetupTest() {
|
||||
suite.ctx = context.Background()
|
||||
k, err := openKopiaRepo(suite.T(), suite.ctx)
|
||||
c, err := openKopiaRepo(suite.T(), suite.ctx)
|
||||
require.NoError(suite.T(), err)
|
||||
suite.k = k
|
||||
suite.w = &Wrapper{c}
|
||||
}
|
||||
|
||||
func (suite *KopiaIntegrationSuite) TearDownTest() {
|
||||
assert.NoError(suite.T(), suite.k.Close(suite.ctx))
|
||||
}
|
||||
|
||||
func (suite *KopiaIntegrationSuite) TestCloseTwiceDoesNotCrash() {
|
||||
ctx := context.Background()
|
||||
t := suite.T()
|
||||
|
||||
k, err := openKopiaRepo(t, ctx)
|
||||
require.NoError(t, err)
|
||||
assert.NoError(t, k.Close(ctx))
|
||||
assert.Nil(t, k.rep)
|
||||
assert.NoError(t, k.Close(ctx))
|
||||
assert.NoError(suite.T(), suite.w.Close(suite.ctx))
|
||||
}
|
||||
|
||||
func (suite *KopiaIntegrationSuite) TestBackupCollections() {
|
||||
@ -289,7 +261,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
|
||||
),
|
||||
}
|
||||
|
||||
stats, err := suite.k.BackupCollections(suite.ctx, collections)
|
||||
stats, err := suite.w.BackupCollections(suite.ctx, collections)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, stats.TotalFileCount, 47)
|
||||
assert.Equal(t, stats.TotalDirectoryCount, 5)
|
||||
@ -300,7 +272,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
|
||||
|
||||
type KopiaSimpleRepoIntegrationSuite struct {
|
||||
suite.Suite
|
||||
k *KopiaWrapper
|
||||
w *Wrapper
|
||||
ctx context.Context
|
||||
snapshotID manifest.ID
|
||||
}
|
||||
@ -324,10 +296,10 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupSuite() {
|
||||
func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
|
||||
t := suite.T()
|
||||
suite.ctx = context.Background()
|
||||
k, err := openKopiaRepo(t, suite.ctx)
|
||||
c, err := openKopiaRepo(t, suite.ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
suite.k = k
|
||||
suite.w = &Wrapper{c}
|
||||
|
||||
collections := []connector.DataCollection{
|
||||
&singleItemCollection{
|
||||
@ -339,7 +311,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
|
||||
},
|
||||
}
|
||||
|
||||
stats, err := suite.k.BackupCollections(suite.ctx, collections)
|
||||
stats, err := suite.w.BackupCollections(suite.ctx, collections)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, stats.TotalFileCount, 1)
|
||||
require.Equal(t, stats.TotalDirectoryCount, 3)
|
||||
@ -351,13 +323,13 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
|
||||
}
|
||||
|
||||
func (suite *KopiaSimpleRepoIntegrationSuite) TearDownTest() {
|
||||
assert.NoError(suite.T(), suite.k.Close(suite.ctx))
|
||||
assert.NoError(suite.T(), suite.w.Close(suite.ctx))
|
||||
}
|
||||
|
||||
func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem() {
|
||||
t := suite.T()
|
||||
|
||||
c, err := suite.k.RestoreSingleItem(
|
||||
c, err := suite.w.RestoreSingleItem(
|
||||
suite.ctx,
|
||||
string(suite.snapshotID),
|
||||
append(testPath, testFileUUID),
|
||||
@ -411,7 +383,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem_Err
|
||||
|
||||
for _, test := range table {
|
||||
suite.T().Run(test.name, func(t *testing.T) {
|
||||
_, err := suite.k.RestoreSingleItem(
|
||||
_, err := suite.w.RestoreSingleItem(
|
||||
suite.ctx,
|
||||
string(test.snapshotIDFunc(suite.snapshotID)),
|
||||
test.path,
|
||||
@ -427,19 +399,19 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem_Err
|
||||
func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem_Errors2() {
|
||||
table := []struct {
|
||||
name string
|
||||
rootDirFunc func(*testing.T, context.Context, *KopiaWrapper) fs.Entry
|
||||
rootDirFunc func(*testing.T, context.Context, *Wrapper) fs.Entry
|
||||
path []string
|
||||
}{
|
||||
{
|
||||
"FileAsRoot",
|
||||
func(t *testing.T, ctx context.Context, k *KopiaWrapper) fs.Entry {
|
||||
func(t *testing.T, ctx context.Context, w *Wrapper) fs.Entry {
|
||||
return virtualfs.StreamingFileFromReader(testFileUUID, bytes.NewReader(testFileData))
|
||||
},
|
||||
append(testPath[1:], testFileUUID),
|
||||
},
|
||||
{
|
||||
"NoRootDir",
|
||||
func(t *testing.T, ctx context.Context, k *KopiaWrapper) fs.Entry {
|
||||
func(t *testing.T, ctx context.Context, w *Wrapper) fs.Entry {
|
||||
return nil
|
||||
},
|
||||
append(testPath[1:], testFileUUID),
|
||||
@ -448,9 +420,9 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem_Err
|
||||
|
||||
for _, test := range table {
|
||||
suite.T().Run(test.name, func(t *testing.T) {
|
||||
_, err := suite.k.restoreSingleItem(
|
||||
_, err := suite.w.restoreSingleItem(
|
||||
suite.ctx,
|
||||
test.rootDirFunc(t, suite.ctx, suite.k),
|
||||
test.rootDirFunc(t, suite.ctx, suite.w),
|
||||
test.path,
|
||||
)
|
||||
require.Error(t, err)
|
||||
@ -34,7 +34,7 @@ type BackupResults struct {
|
||||
func NewBackupOperation(
|
||||
ctx context.Context,
|
||||
opts Options,
|
||||
kw *kopia.KopiaWrapper,
|
||||
kw *kopia.Wrapper,
|
||||
acct account.Account,
|
||||
targets []string,
|
||||
) (BackupOperation, error) {
|
||||
|
||||
@ -35,7 +35,7 @@ func (suite *BackupOpSuite) TestBackupOperation_PersistResults() {
|
||||
ctx := context.Background()
|
||||
|
||||
var (
|
||||
kw = &kopia.KopiaWrapper{}
|
||||
kw = &kopia.Wrapper{}
|
||||
acct = account.Account{}
|
||||
now = time.Now()
|
||||
cs = []connector.DataCollection{&connector.ExchangeDataCollection{}}
|
||||
@ -86,14 +86,14 @@ func (suite *BackupOpIntegrationSuite) SetupSuite() {
|
||||
}
|
||||
|
||||
func (suite *BackupOpIntegrationSuite) TestNewBackupOperation() {
|
||||
kw := &kopia.KopiaWrapper{}
|
||||
kw := &kopia.Wrapper{}
|
||||
acct, err := ctesting.NewM365Account()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
opts Options
|
||||
kw *kopia.KopiaWrapper
|
||||
kw *kopia.Wrapper
|
||||
acct account.Account
|
||||
targets []string
|
||||
errCheck assert.ErrorAssertionFunc
|
||||
@ -129,13 +129,20 @@ func (suite *BackupOpIntegrationSuite) TestBackup_Run() {
|
||||
st, err := ctesting.NewPrefixedS3Storage(t)
|
||||
require.NoError(t, err)
|
||||
|
||||
k := kopia.New(st)
|
||||
k := kopia.NewConn(st)
|
||||
require.NoError(t, k.Initialize(ctx))
|
||||
|
||||
// kopiaRef comes with a count of 1 and Wrapper bumps it again so safe
|
||||
// to close here.
|
||||
defer k.Close(ctx)
|
||||
|
||||
w, err := kopia.NewWrapper(k)
|
||||
require.NoError(t, err)
|
||||
|
||||
bo, err := NewBackupOperation(
|
||||
ctx,
|
||||
Options{},
|
||||
k,
|
||||
w,
|
||||
acct,
|
||||
[]string{m365User})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -31,7 +31,7 @@ type operation struct {
|
||||
Options Options `json:"options"`
|
||||
Status opStatus `json:"status"`
|
||||
|
||||
kopia *kopia.KopiaWrapper
|
||||
kopia *kopia.Wrapper
|
||||
}
|
||||
|
||||
// Options configure some parameters of the operation
|
||||
@ -42,7 +42,7 @@ type Options struct {
|
||||
|
||||
func newOperation(
|
||||
opts Options,
|
||||
kw *kopia.KopiaWrapper,
|
||||
kw *kopia.Wrapper,
|
||||
) operation {
|
||||
return operation{
|
||||
ID: uuid.New(),
|
||||
|
||||
@ -24,10 +24,10 @@ func (suite *OperationSuite) TestNewOperation() {
|
||||
}
|
||||
|
||||
func (suite *OperationSuite) TestOperation_Validate() {
|
||||
kwStub := &kopia.KopiaWrapper{}
|
||||
kwStub := &kopia.Wrapper{}
|
||||
table := []struct {
|
||||
name string
|
||||
kw *kopia.KopiaWrapper
|
||||
kw *kopia.Wrapper
|
||||
errCheck assert.ErrorAssertionFunc
|
||||
}{
|
||||
{"good", kwStub, assert.NoError},
|
||||
|
||||
@ -34,7 +34,7 @@ type RestoreResults struct {
|
||||
func NewRestoreOperation(
|
||||
ctx context.Context,
|
||||
opts Options,
|
||||
kw *kopia.KopiaWrapper,
|
||||
kw *kopia.Wrapper,
|
||||
acct account.Account,
|
||||
restorePointID string,
|
||||
targets []string,
|
||||
|
||||
@ -35,7 +35,7 @@ func (suite *RestoreOpSuite) TestRestoreOperation_PersistResults() {
|
||||
ctx := context.Background()
|
||||
|
||||
var (
|
||||
kw = &kopia.KopiaWrapper{}
|
||||
kw = &kopia.Wrapper{}
|
||||
acct = account.Account{}
|
||||
now = time.Now()
|
||||
cs = []connector.DataCollection{&connector.ExchangeDataCollection{}}
|
||||
@ -78,14 +78,14 @@ func (suite *RestoreOpIntegrationSuite) SetupSuite() {
|
||||
}
|
||||
|
||||
func (suite *RestoreOpIntegrationSuite) TestNewRestoreOperation() {
|
||||
kw := &kopia.KopiaWrapper{}
|
||||
kw := &kopia.Wrapper{}
|
||||
acct, err := ctesting.NewM365Account()
|
||||
require.NoError(suite.T(), err)
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
opts Options
|
||||
kw *kopia.KopiaWrapper
|
||||
kw *kopia.Wrapper
|
||||
acct account.Account
|
||||
targets []string
|
||||
errCheck assert.ErrorAssertionFunc
|
||||
|
||||
@ -21,7 +21,7 @@ type Repository struct {
|
||||
|
||||
Account account.Account // the user's m365 account connection details
|
||||
Storage storage.Storage // the storage provider details and configuration
|
||||
dataLayer *kopia.KopiaWrapper
|
||||
dataLayer *kopia.Wrapper
|
||||
}
|
||||
|
||||
// Initialize will:
|
||||
@ -37,16 +37,25 @@ func Initialize(
|
||||
acct account.Account,
|
||||
storage storage.Storage,
|
||||
) (*Repository, error) {
|
||||
k := kopia.New(storage)
|
||||
if err := k.Initialize(ctx); err != nil {
|
||||
kopiaRef := kopia.NewConn(storage)
|
||||
if err := kopiaRef.Initialize(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// kopiaRef comes with a count of 1 and NewWrapper bumps it again so safe
|
||||
// to close here.
|
||||
defer kopiaRef.Close(ctx)
|
||||
|
||||
w, err := kopia.NewWrapper(kopiaRef)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r := Repository{
|
||||
ID: uuid.New(),
|
||||
Version: "v1",
|
||||
Account: acct,
|
||||
Storage: storage,
|
||||
dataLayer: k,
|
||||
dataLayer: w,
|
||||
}
|
||||
return &r, nil
|
||||
}
|
||||
@ -61,16 +70,25 @@ func Connect(
|
||||
acct account.Account,
|
||||
storage storage.Storage,
|
||||
) (*Repository, error) {
|
||||
k := kopia.New(storage)
|
||||
if err := k.Connect(ctx); err != nil {
|
||||
kopiaRef := kopia.NewConn(storage)
|
||||
if err := kopiaRef.Connect(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// kopiaRef comes with a count of 1 and NewWrapper bumps it again so safe
|
||||
// to close here.
|
||||
defer kopiaRef.Close(ctx)
|
||||
|
||||
w, err := kopia.NewWrapper(kopiaRef)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// todo: ID and CreatedAt should get retrieved from a stored kopia config.
|
||||
r := Repository{
|
||||
Version: "v1",
|
||||
Account: acct,
|
||||
Storage: storage,
|
||||
dataLayer: k,
|
||||
dataLayer: w,
|
||||
}
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user