* Split KopiaWrapper into repo handle and logic With ModelStore, multiple structs need a reference to the kopia repo. Make a small wrapper class (conn) that can open and initialize a repo. The wrapper handles concurrent closes and opens and does ref counting to ensure it only drops the kopia handle when the last reference is closed. Rename KopiaWrapper to Wrapper and keep backup/restore functionality in it.
163 lines
3.1 KiB
Go
163 lines
3.1 KiB
Go
package kopia
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"github.com/kopia/kopia/repo"
|
|
"github.com/kopia/kopia/repo/blob"
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/alcionai/corso/pkg/storage"
|
|
)
|
|
|
|
const (
|
|
defaultKopiaConfigFilePath = "/tmp/repository.config"
|
|
)
|
|
|
|
var (
|
|
errInit = errors.New("initializing repo")
|
|
errConnect = errors.New("connecting repo")
|
|
)
|
|
|
|
type conn struct {
|
|
storage storage.Storage
|
|
repo.Repository
|
|
mu sync.Mutex
|
|
refCount int
|
|
}
|
|
|
|
func NewConn(s storage.Storage) *conn {
|
|
return &conn{
|
|
storage: s,
|
|
}
|
|
}
|
|
|
|
func (w *conn) Initialize(ctx context.Context) error {
|
|
bst, err := blobStoreByProvider(ctx, w.storage)
|
|
if err != nil {
|
|
return errors.Wrap(err, errInit.Error())
|
|
}
|
|
defer bst.Close(ctx)
|
|
|
|
cfg, err := w.storage.CommonConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// todo - issue #75: nil here should be a storage.NewRepoOptions()
|
|
if err = repo.Initialize(ctx, bst, nil, cfg.CorsoPassword); err != nil {
|
|
return errors.Wrap(err, errInit.Error())
|
|
}
|
|
|
|
// todo - issue #75: nil here should be a storage.ConnectOptions()
|
|
if err := repo.Connect(
|
|
ctx,
|
|
defaultKopiaConfigFilePath,
|
|
bst,
|
|
cfg.CorsoPassword,
|
|
nil,
|
|
); err != nil {
|
|
return errors.Wrap(err, errConnect.Error())
|
|
}
|
|
|
|
if err := w.open(ctx, cfg.CorsoPassword); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *conn) Connect(ctx context.Context) error {
|
|
bst, err := blobStoreByProvider(ctx, w.storage)
|
|
if err != nil {
|
|
return errors.Wrap(err, errInit.Error())
|
|
}
|
|
defer bst.Close(ctx)
|
|
|
|
cfg, err := w.storage.CommonConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// todo - issue #75: nil here should be storage.ConnectOptions()
|
|
if err := repo.Connect(
|
|
ctx,
|
|
defaultKopiaConfigFilePath,
|
|
bst,
|
|
cfg.CorsoPassword,
|
|
nil,
|
|
); err != nil {
|
|
return errors.Wrap(err, errConnect.Error())
|
|
}
|
|
|
|
if err := w.open(ctx, cfg.CorsoPassword); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage, error) {
|
|
switch s.Provider {
|
|
case storage.ProviderS3:
|
|
return s3BlobStorage(ctx, s)
|
|
default:
|
|
return nil, errors.New("storage provider details are required")
|
|
}
|
|
}
|
|
|
|
func (w *conn) Close(ctx context.Context) error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if w.refCount == 0 {
|
|
return nil
|
|
}
|
|
|
|
w.refCount--
|
|
|
|
if w.refCount > 0 {
|
|
return nil
|
|
}
|
|
|
|
return w.close(ctx)
|
|
}
|
|
|
|
// close closes the kopia handle. Safe to run without the mutex because other
|
|
// functions check only the refCount variable.
|
|
func (w *conn) close(ctx context.Context) error {
|
|
err := w.Repository.Close(ctx)
|
|
w.Repository = nil
|
|
|
|
return errors.Wrap(err, "closing repository connection")
|
|
}
|
|
|
|
func (w *conn) open(ctx context.Context, password string) error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
w.refCount++
|
|
|
|
// TODO(ashmrtnz): issue #75: nil here should be storage.ConnectionOptions().
|
|
rep, err := repo.Open(ctx, defaultKopiaConfigFilePath, password, nil)
|
|
if err != nil {
|
|
return errors.Wrap(err, "opening repository connection")
|
|
}
|
|
|
|
w.Repository = rep
|
|
return nil
|
|
}
|
|
|
|
func (w *conn) wrap() error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if w.refCount == 0 {
|
|
return errors.New("conn already closed")
|
|
}
|
|
|
|
w.refCount++
|
|
return nil
|
|
}
|