Persist and enable kopia compression options (#577)
* Skip updating policy if no changes needed * Set default compressor on repo connection and initialization * Factor out common code for connecting to a repo * New unit test ensuring compressor is set properly Test compressor is set both through Initialize and Connect because client code doesn't have to call Connect if it just did Initialize.
This commit is contained in:
parent
f870667856
commit
24b40cf601
@ -7,6 +7,7 @@ import (
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/compression"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@ -16,6 +17,7 @@ import (
|
||||
|
||||
const (
|
||||
defaultKopiaConfigFilePath = "/tmp/repository.config"
|
||||
defaultCompressor = "s2-default"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -69,18 +71,13 @@ func (w *conn) Initialize(ctx context.Context) error {
|
||||
return errors.Wrap(err, errInit.Error())
|
||||
}
|
||||
|
||||
// todo - issue #75: nil here should be a storage.ConnectOptions()
|
||||
if err := repo.Connect(
|
||||
return w.commonConnect(
|
||||
ctx,
|
||||
defaultKopiaConfigFilePath,
|
||||
bst,
|
||||
cfg.CorsoPassword,
|
||||
nil,
|
||||
); err != nil {
|
||||
return errors.Wrap(err, errConnect.Error())
|
||||
}
|
||||
|
||||
return w.open(ctx, cfg.CorsoPassword)
|
||||
defaultCompressor,
|
||||
)
|
||||
}
|
||||
|
||||
func (w *conn) Connect(ctx context.Context) error {
|
||||
@ -95,18 +92,37 @@ func (w *conn) Connect(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// todo - issue #75: nil here should be storage.ConnectOptions()
|
||||
if err := repo.Connect(
|
||||
return w.commonConnect(
|
||||
ctx,
|
||||
defaultKopiaConfigFilePath,
|
||||
bst,
|
||||
cfg.CorsoPassword,
|
||||
defaultCompressor,
|
||||
)
|
||||
}
|
||||
|
||||
func (w *conn) commonConnect(
|
||||
ctx context.Context,
|
||||
configPath string,
|
||||
bst blob.Storage,
|
||||
password, compressor string,
|
||||
) error {
|
||||
// todo - issue #75: nil here should be storage.ConnectOptions()
|
||||
if err := repo.Connect(
|
||||
ctx,
|
||||
configPath,
|
||||
bst,
|
||||
password,
|
||||
nil,
|
||||
); err != nil {
|
||||
return errors.Wrap(err, errConnect.Error())
|
||||
}
|
||||
|
||||
return w.open(ctx, cfg.CorsoPassword)
|
||||
if err := w.open(ctx, configPath, password); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return w.Compression(ctx, compressor)
|
||||
}
|
||||
|
||||
func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage, error) {
|
||||
@ -144,14 +160,14 @@ func (w *conn) close(ctx context.Context) error {
|
||||
return errors.Wrap(err, "closing repository connection")
|
||||
}
|
||||
|
||||
func (w *conn) open(ctx context.Context, password string) error {
|
||||
func (w *conn) open(ctx context.Context, configPath, password string) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
w.refCount++
|
||||
|
||||
// TODO(ashmrtnz): issue #75: nil here should be storage.ConnectionOptions().
|
||||
rep, err := repo.Open(ctx, defaultKopiaConfigFilePath, password, nil)
|
||||
rep, err := repo.Open(ctx, configPath, password, nil)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "opening repository connection")
|
||||
}
|
||||
@ -183,13 +199,13 @@ func (w *conn) Compression(ctx context.Context, compressor string) error {
|
||||
|
||||
si := policy.GlobalPolicySourceInfo
|
||||
|
||||
p, err := policy.GetDefinedPolicy(ctx, w.Repository, si)
|
||||
p, err := w.getPolicyOrEmpty(ctx, si)
|
||||
if err != nil {
|
||||
if !errors.Is(err, policy.ErrPolicyNotFound) {
|
||||
return errors.Wrap(err, "getting global compression policy")
|
||||
return err
|
||||
}
|
||||
|
||||
p = &policy.Policy{}
|
||||
if compressor == string(p.CompressionPolicy.CompressorName) {
|
||||
return nil
|
||||
}
|
||||
|
||||
p.CompressionPolicy = policy.CompressionPolicy{
|
||||
@ -199,7 +215,7 @@ func (w *conn) Compression(ctx context.Context, compressor string) error {
|
||||
err = repo.WriteSession(
|
||||
ctx,
|
||||
w.Repository,
|
||||
repo.WriteSessionOptions{Purpose: "UpdateGlobalCompression"},
|
||||
repo.WriteSessionOptions{Purpose: "UpdateGlobalCompressionPolicy"},
|
||||
func(innerCtx context.Context, rw repo.RepositoryWriter) error {
|
||||
return policy.SetPolicy(ctx, rw, si, p)
|
||||
},
|
||||
@ -208,6 +224,19 @@ func (w *conn) Compression(ctx context.Context, compressor string) error {
|
||||
return errors.Wrap(err, "updating global compression policy")
|
||||
}
|
||||
|
||||
func (w *conn) getPolicyOrEmpty(ctx context.Context, si snapshot.SourceInfo) (*policy.Policy, error) {
|
||||
p, err := policy.GetDefinedPolicy(ctx, w.Repository, si)
|
||||
if err != nil {
|
||||
if errors.Is(err, policy.ErrPolicyNotFound) {
|
||||
return &policy.Policy{}, nil
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "getting global backup policy")
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func checkCompressor(compressor compression.Name) error {
|
||||
for c := range compression.ByName {
|
||||
if c == compressor {
|
||||
|
||||
@ -124,10 +124,33 @@ func (suite *WrapperIntegrationSuite) TestBadCompressorType() {
|
||||
assert.Error(t, k.Compression(ctx, "not-a-compressor"))
|
||||
}
|
||||
|
||||
func (suite *WrapperIntegrationSuite) TestGetPolicyOrDefault_GetsDefault() {
|
||||
ctx := context.Background()
|
||||
t := suite.T()
|
||||
|
||||
k, err := openKopiaRepo(t, ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
assert.NoError(t, k.Close(ctx))
|
||||
}()
|
||||
|
||||
si := snapshot.SourceInfo{
|
||||
Host: corsoHost,
|
||||
UserName: corsoUser,
|
||||
Path: "test-path-root",
|
||||
}
|
||||
|
||||
p, err := k.getPolicyOrEmpty(ctx, si)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, policy.Policy{}, *p)
|
||||
}
|
||||
|
||||
func (suite *WrapperIntegrationSuite) TestSetCompressor() {
|
||||
ctx := context.Background()
|
||||
t := suite.T()
|
||||
compressor := "s2-default"
|
||||
compressor := "pgzip"
|
||||
|
||||
k, err := openKopiaRepo(t, ctx)
|
||||
require.NoError(t, err)
|
||||
@ -139,7 +162,7 @@ func (suite *WrapperIntegrationSuite) TestSetCompressor() {
|
||||
assert.NoError(t, k.Compression(ctx, compressor))
|
||||
|
||||
// Check the policy was actually created and has the right compressor.
|
||||
p, err := policy.GetDefinedPolicy(ctx, k.Repository, policy.GlobalPolicySourceInfo)
|
||||
p, err := k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, compressor, string(p.CompressionPolicy.CompressorName))
|
||||
@ -161,3 +184,34 @@ func (suite *WrapperIntegrationSuite) TestSetCompressor() {
|
||||
string(policyTree.EffectivePolicy().CompressionPolicy.CompressorName),
|
||||
)
|
||||
}
|
||||
|
||||
func (suite *WrapperIntegrationSuite) TestCompressorSetOnInitAndConnect() {
|
||||
ctx := context.Background()
|
||||
t := suite.T()
|
||||
tmpComp := "pgzip"
|
||||
|
||||
k, err := openKopiaRepo(t, ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check the policy was actually created and has the right compressor.
|
||||
p, err := k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, defaultCompressor, string(p.CompressionPolicy.CompressorName))
|
||||
|
||||
// Change the compressor to something else.
|
||||
require.NoError(t, k.Compression(ctx, tmpComp))
|
||||
require.NoError(t, k.Close(ctx))
|
||||
|
||||
// Re-open with Connect to see if the compressor changed back.
|
||||
require.NoError(t, k.Connect(ctx))
|
||||
|
||||
defer func() {
|
||||
assert.NoError(t, k.Close(ctx))
|
||||
}()
|
||||
|
||||
p, err = k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, defaultCompressor, string(p.CompressionPolicy.CompressorName))
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user