Allow configuring kopia compression (#544)
* Function that allows adjusting global compression Takes the name of a kopia compressor and sets the default compressor to be that in the repo. * Basic unit tests for setting compression Ensures that the policy is actually applied to future snapshots that kopia will take. * Integration test for changing compression types Make sure that data is still available even if the global compression policy changes from what it was when the backup was made.
This commit is contained in:
parent
2f7d8420ae
commit
9fc5cbb9d2
@ -6,6 +6,8 @@ import (
|
||||
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/compression"
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/internal/common"
|
||||
@ -169,3 +171,49 @@ func (w *conn) wrap() error {
|
||||
w.refCount++
|
||||
return nil
|
||||
}
|
||||
|
||||
// Compression attempts to set the global compression policy for the kopia repo
|
||||
// to the given compressor.
|
||||
func (w *conn) Compression(ctx context.Context, compressor string) error {
|
||||
comp := compression.Name(compressor)
|
||||
|
||||
if err := checkCompressor(comp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
si := policy.GlobalPolicySourceInfo
|
||||
|
||||
p, err := policy.GetDefinedPolicy(ctx, w.Repository, si)
|
||||
if err != nil {
|
||||
if !errors.Is(err, policy.ErrPolicyNotFound) {
|
||||
return errors.Wrap(err, "getting global compression policy")
|
||||
}
|
||||
|
||||
p = &policy.Policy{}
|
||||
}
|
||||
|
||||
p.CompressionPolicy = policy.CompressionPolicy{
|
||||
CompressorName: compression.Name(comp),
|
||||
}
|
||||
|
||||
err = repo.WriteSession(
|
||||
ctx,
|
||||
w.Repository,
|
||||
repo.WriteSessionOptions{Purpose: "UpdateGlobalCompression"},
|
||||
func(innerCtx context.Context, rw repo.RepositoryWriter) error {
|
||||
return policy.SetPolicy(ctx, rw, si, p)
|
||||
},
|
||||
)
|
||||
|
||||
return errors.Wrap(err, "updating global compression policy")
|
||||
}
|
||||
|
||||
func checkCompressor(compressor compression.Name) error {
|
||||
for c := range compression.ByName {
|
||||
if c == compressor {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return errors.Errorf("unknown compressor type %s", compressor)
|
||||
}
|
||||
|
||||
@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/kopia/kopia/snapshot/policy"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
@ -107,3 +109,55 @@ func (suite *WrapperIntegrationSuite) TestOpenAfterClose() {
|
||||
assert.NoError(t, k.Close(ctx))
|
||||
assert.Error(t, k.wrap())
|
||||
}
|
||||
|
||||
func (suite *WrapperIntegrationSuite) TestBadCompressorType() {
|
||||
ctx := context.Background()
|
||||
t := suite.T()
|
||||
|
||||
k, err := openKopiaRepo(t, ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
assert.NoError(t, k.Close(ctx))
|
||||
}()
|
||||
|
||||
assert.Error(t, k.Compression(ctx, "not-a-compressor"))
|
||||
}
|
||||
|
||||
func (suite *WrapperIntegrationSuite) TestSetCompressor() {
|
||||
ctx := context.Background()
|
||||
t := suite.T()
|
||||
compressor := "s2-default"
|
||||
|
||||
k, err := openKopiaRepo(t, ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
assert.NoError(t, k.Close(ctx))
|
||||
}()
|
||||
|
||||
assert.NoError(t, k.Compression(ctx, compressor))
|
||||
|
||||
// Check the policy was actually created and has the right compressor.
|
||||
p, err := policy.GetDefinedPolicy(ctx, k.Repository, policy.GlobalPolicySourceInfo)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, compressor, string(p.CompressionPolicy.CompressorName))
|
||||
|
||||
// Check the global policy will be the effective policy in future snapshots
|
||||
// for some source info.
|
||||
si := snapshot.SourceInfo{
|
||||
Host: corsoHost,
|
||||
UserName: corsoUser,
|
||||
Path: "test-path-root",
|
||||
}
|
||||
|
||||
policyTree, err := policy.TreeForSource(ctx, k, si)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(
|
||||
t,
|
||||
compressor,
|
||||
string(policyTree.EffectivePolicy().CompressionPolicy.CompressorName),
|
||||
)
|
||||
}
|
||||
|
||||
@ -486,6 +486,47 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
|
||||
assert.Len(t, rp.Entries, 47)
|
||||
}
|
||||
|
||||
func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() {
|
||||
t := suite.T()
|
||||
ctx := context.Background()
|
||||
|
||||
k, err := openKopiaRepo(t, ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, k.Compression(ctx, "s2-default"))
|
||||
|
||||
w := &Wrapper{k}
|
||||
|
||||
tid := uuid.NewString()
|
||||
p1 := []string{tid, "uid", "emails", "fid"}
|
||||
p2 := []string{tid, "uid2", "emails", "fid"}
|
||||
dc1 := mockconnector.NewMockExchangeCollection(p1, 1)
|
||||
dc2 := mockconnector.NewMockExchangeCollection(p2, 1)
|
||||
|
||||
fp1 := append(p1, dc1.Names[0])
|
||||
fp2 := append(p2, dc2.Names[0])
|
||||
|
||||
stats, _, err := w.BackupCollections(ctx, []data.Collection{dc1, dc2})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, k.Compression(ctx, "gzip"))
|
||||
|
||||
expected := map[string][]byte{
|
||||
path.Join(fp1...): dc1.Data[0],
|
||||
path.Join(fp2...): dc2.Data[0],
|
||||
}
|
||||
|
||||
result, err := w.RestoreMultipleItems(
|
||||
ctx,
|
||||
string(stats.SnapshotID),
|
||||
[][]string{fp1, fp2})
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 2, len(result))
|
||||
|
||||
testForFiles(t, expected, result)
|
||||
}
|
||||
|
||||
type KopiaSimpleRepoIntegrationSuite struct {
|
||||
suite.Suite
|
||||
w *Wrapper
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user