Keepers 6cdc691c6f
correct the corso module (#749)
swaps the corso go module from github.com/
alcionai/corso to github.com/alcionai/corso/src
to align with the location of the go.mod and
go.sum files inside the repo.

All other changes in the repository update the
package imports to the new module path.
2022-09-07 15:50:54 -06:00

268 lines
5.4 KiB
Go

package kopia
import (
"context"
"path/filepath"
"sync"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/common"
"github.com/alcionai/corso/src/pkg/storage"
)
const (
defaultKopiaConfigDir = "/tmp/"
defaultKopiaConfigFile = "repository.config"
defaultCompressor = "s2-default"
)
var (
errInit = errors.New("initializing repo")
errConnect = errors.New("connecting repo")
)
type ErrorRepoAlreadyExists struct {
common.Err
}
func RepoAlreadyExistsError(e error) error {
return ErrorRepoAlreadyExists{*common.EncapsulateError(e)}
}
func IsRepoAlreadyExistsError(e error) bool {
var erae ErrorRepoAlreadyExists
return errors.As(e, &erae)
}
type conn struct {
storage storage.Storage
repo.Repository
mu sync.Mutex
refCount int
}
func NewConn(s storage.Storage) *conn {
return &conn{
storage: s,
}
}
func (w *conn) Initialize(ctx context.Context) error {
bst, err := blobStoreByProvider(ctx, w.storage)
if err != nil {
return errors.Wrap(err, errInit.Error())
}
defer bst.Close(ctx)
cfg, err := w.storage.CommonConfig()
if err != nil {
return err
}
// todo - issue #75: nil here should be a storage.NewRepoOptions()
if err = repo.Initialize(ctx, bst, nil, cfg.CorsoPassword); err != nil {
if errors.Is(err, repo.ErrAlreadyInitialized) {
return RepoAlreadyExistsError(err)
}
return errors.Wrap(err, errInit.Error())
}
return w.commonConnect(
ctx,
cfg.KopiaCfgDir,
bst,
cfg.CorsoPassword,
defaultCompressor,
)
}
func (w *conn) Connect(ctx context.Context) error {
bst, err := blobStoreByProvider(ctx, w.storage)
if err != nil {
return errors.Wrap(err, errInit.Error())
}
defer bst.Close(ctx)
cfg, err := w.storage.CommonConfig()
if err != nil {
return err
}
return w.commonConnect(
ctx,
cfg.KopiaCfgDir,
bst,
cfg.CorsoPassword,
defaultCompressor,
)
}
func (w *conn) commonConnect(
ctx context.Context,
configDir string,
bst blob.Storage,
password, compressor string,
) error {
var opts *repo.ConnectOptions
if len(configDir) > 0 {
opts = &repo.ConnectOptions{
CachingOptions: content.CachingOptions{
CacheDirectory: configDir,
},
}
} else {
configDir = defaultKopiaConfigDir
}
cfgFile := filepath.Join(configDir, defaultKopiaConfigFile)
// todo - issue #75: nil here should be storage.ConnectOptions()
if err := repo.Connect(
ctx,
cfgFile,
bst,
password,
opts,
); err != nil {
return errors.Wrap(err, errConnect.Error())
}
if err := w.open(ctx, cfgFile, password); err != nil {
return err
}
return w.Compression(ctx, compressor)
}
func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage, error) {
switch s.Provider {
case storage.ProviderS3:
return s3BlobStorage(ctx, s)
default:
return nil, errors.New("storage provider details are required")
}
}
func (w *conn) Close(ctx context.Context) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.refCount == 0 {
return nil
}
w.refCount--
if w.refCount > 0 {
return nil
}
return w.close(ctx)
}
// close closes the kopia handle. Safe to run without the mutex because other
// functions check only the refCount variable.
func (w *conn) close(ctx context.Context) error {
err := w.Repository.Close(ctx)
w.Repository = nil
return errors.Wrap(err, "closing repository connection")
}
func (w *conn) open(ctx context.Context, configPath, password string) error {
w.mu.Lock()
defer w.mu.Unlock()
w.refCount++
// TODO(ashmrtnz): issue #75: nil here should be storage.ConnectionOptions().
rep, err := repo.Open(ctx, configPath, password, nil)
if err != nil {
return errors.Wrap(err, "opening repository connection")
}
w.Repository = rep
return nil
}
func (w *conn) wrap() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.refCount == 0 {
return errors.New("conn already closed")
}
w.refCount++
return nil
}
// Compression attempts to set the global compression policy for the kopia repo
// to the given compressor.
func (w *conn) Compression(ctx context.Context, compressor string) error {
comp := compression.Name(compressor)
if err := checkCompressor(comp); err != nil {
return err
}
si := policy.GlobalPolicySourceInfo
p, err := w.getPolicyOrEmpty(ctx, si)
if err != nil {
return err
}
if compressor == string(p.CompressionPolicy.CompressorName) {
return nil
}
p.CompressionPolicy = policy.CompressionPolicy{
CompressorName: compression.Name(comp),
}
err = repo.WriteSession(
ctx,
w.Repository,
repo.WriteSessionOptions{Purpose: "UpdateGlobalCompressionPolicy"},
func(innerCtx context.Context, rw repo.RepositoryWriter) error {
return policy.SetPolicy(ctx, rw, si, p)
},
)
return errors.Wrap(err, "updating global compression policy")
}
func (w *conn) getPolicyOrEmpty(ctx context.Context, si snapshot.SourceInfo) (*policy.Policy, error) {
p, err := policy.GetDefinedPolicy(ctx, w.Repository, si)
if err != nil {
if errors.Is(err, policy.ErrPolicyNotFound) {
return &policy.Policy{}, nil
}
return nil, errors.Wrap(err, "getting global backup policy")
}
return p, nil
}
func checkCompressor(compressor compression.Name) error {
for c := range compression.ByName {
if c == compressor {
return nil
}
}
return errors.Errorf("unknown compressor type %s", compressor)
}