Disable kopia retention policy (#904)
## Description
Disable the kopia retention policy that it uses by default. All snapshot deletions in corso must be done manually.
There should really be a follow up change to this after making a method in kopia public. Right now this PR is going off [some code](7bda16ab33/snapshot/policy/retention_policy.go (L117)) that shows the effective number of "latest" snapshots will default to intMAX if all of the other fields are set. We really should have a regression test or something for this to ensure that's actually the case. I've manually tested this patch, but that's not a scalable solution
## Type of change
<!--- Please check the type of change your PR introduces: --->
- [ ] 🌻 Feature
- [x] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Test
- [ ] 💻 CI/Deployment
- [ ] 🐹 Trivial/Minor
## Issue(s)
* closes #889
## Test Plan
<!-- How will this be tested prior to merging.-->
- [x] 💪 Manual
- [x] ⚡ Unit test
- [ ] 💚 E2E
This commit is contained in:
parent
01476561fd
commit
8678c0b159
@ -23,11 +23,26 @@ const (
|
|||||||
defaultCompressor = "s2-default"
|
defaultCompressor = "s2-default"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const defaultConfigErrTmpl = "setting default repo config values"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errInit = errors.New("initializing repo")
|
errInit = errors.New("initializing repo")
|
||||||
errConnect = errors.New("connecting repo")
|
errConnect = errors.New("connecting repo")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Having all fields set to 0 causes it to keep max-int versions of snapshots.
|
||||||
|
var (
|
||||||
|
zeroOpt = policy.OptionalInt(0)
|
||||||
|
defaultRetention = policy.RetentionPolicy{
|
||||||
|
KeepLatest: &zeroOpt,
|
||||||
|
KeepHourly: &zeroOpt,
|
||||||
|
KeepWeekly: &zeroOpt,
|
||||||
|
KeepDaily: &zeroOpt,
|
||||||
|
KeepMonthly: &zeroOpt,
|
||||||
|
KeepAnnual: &zeroOpt,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
type ErrorRepoAlreadyExists struct {
|
type ErrorRepoAlreadyExists struct {
|
||||||
common.Err
|
common.Err
|
||||||
}
|
}
|
||||||
@ -139,7 +154,7 @@ func (w *conn) commonConnect(
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return w.Compression(ctx, compressor)
|
return w.setDefaultConfigValues(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage, error) {
|
func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage, error) {
|
||||||
@ -207,40 +222,92 @@ func (w *conn) wrap() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *conn) setDefaultConfigValues(ctx context.Context) error {
|
||||||
|
p, err := w.getGlobalPolicyOrEmpty(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, defaultConfigErrTmpl)
|
||||||
|
}
|
||||||
|
|
||||||
|
changed, err := updateCompressionOnPolicy(defaultCompressor, p)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, defaultConfigErrTmpl)
|
||||||
|
}
|
||||||
|
|
||||||
|
if updateRetentionOnPolicy(defaultRetention, p) {
|
||||||
|
changed = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if !changed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.Wrap(
|
||||||
|
w.writeGlobalPolicy(ctx, "UpdateGlobalPolicyWithDefaults", p),
|
||||||
|
"updating global policy with defaults",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// Compression attempts to set the global compression policy for the kopia repo
|
// Compression attempts to set the global compression policy for the kopia repo
|
||||||
// to the given compressor.
|
// to the given compressor.
|
||||||
func (w *conn) Compression(ctx context.Context, compressor string) error {
|
func (w *conn) Compression(ctx context.Context, compressor string) error {
|
||||||
|
// Redo this check so we can exit without looking up a policy if a bad
|
||||||
|
// compressor was given.
|
||||||
comp := compression.Name(compressor)
|
comp := compression.Name(compressor)
|
||||||
|
|
||||||
if err := checkCompressor(comp); err != nil {
|
if err := checkCompressor(comp); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
si := policy.GlobalPolicySourceInfo
|
p, err := w.getGlobalPolicyOrEmpty(ctx)
|
||||||
|
|
||||||
p, err := w.getPolicyOrEmpty(ctx, si)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if compressor == string(p.CompressionPolicy.CompressorName) {
|
changed, err := updateCompressionOnPolicy(compressor, p)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !changed {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
p.CompressionPolicy = policy.CompressionPolicy{
|
return errors.Wrap(
|
||||||
CompressorName: compression.Name(comp),
|
w.writeGlobalPolicy(ctx, "UpdateGlobalCompressionPolicy", p),
|
||||||
|
"updating global compression policy",
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = repo.WriteSession(
|
func updateCompressionOnPolicy(compressor string, p *policy.Policy) (bool, error) {
|
||||||
ctx,
|
comp := compression.Name(compressor)
|
||||||
w.Repository,
|
|
||||||
repo.WriteSessionOptions{Purpose: "UpdateGlobalCompressionPolicy"},
|
|
||||||
func(innerCtx context.Context, rw repo.RepositoryWriter) error {
|
|
||||||
return policy.SetPolicy(ctx, rw, si, p)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
return errors.Wrap(err, "updating global compression policy")
|
if err := checkCompressor(comp); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if comp == p.CompressionPolicy.CompressorName {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
p.CompressionPolicy = policy.CompressionPolicy{
|
||||||
|
CompressorName: comp,
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateRetentionOnPolicy(retention policy.RetentionPolicy, p *policy.Policy) bool {
|
||||||
|
if retention == p.RetentionPolicy {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
p.RetentionPolicy = retention
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *conn) getGlobalPolicyOrEmpty(ctx context.Context) (*policy.Policy, error) {
|
||||||
|
si := policy.GlobalPolicySourceInfo
|
||||||
|
return w.getPolicyOrEmpty(ctx, si)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *conn) getPolicyOrEmpty(ctx context.Context, si snapshot.SourceInfo) (*policy.Policy, error) {
|
func (w *conn) getPolicyOrEmpty(ctx context.Context, si snapshot.SourceInfo) (*policy.Policy, error) {
|
||||||
@ -250,12 +317,39 @@ func (w *conn) getPolicyOrEmpty(ctx context.Context, si snapshot.SourceInfo) (*p
|
|||||||
return &policy.Policy{}, nil
|
return &policy.Policy{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, errors.Wrap(err, "getting global backup policy")
|
return nil, errors.Wrapf(err, "getting backup policy for %+v", si)
|
||||||
}
|
}
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *conn) writeGlobalPolicy(
|
||||||
|
ctx context.Context,
|
||||||
|
purpose string,
|
||||||
|
p *policy.Policy,
|
||||||
|
) error {
|
||||||
|
si := policy.GlobalPolicySourceInfo
|
||||||
|
return w.writePolicy(ctx, purpose, si, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *conn) writePolicy(
|
||||||
|
ctx context.Context,
|
||||||
|
purpose string,
|
||||||
|
si snapshot.SourceInfo,
|
||||||
|
p *policy.Policy,
|
||||||
|
) error {
|
||||||
|
err := repo.WriteSession(
|
||||||
|
ctx,
|
||||||
|
w.Repository,
|
||||||
|
repo.WriteSessionOptions{Purpose: purpose},
|
||||||
|
func(innerCtx context.Context, rw repo.RepositoryWriter) error {
|
||||||
|
return policy.SetPolicy(ctx, rw, si, p)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
return errors.Wrapf(err, "updating policy for %+v", si)
|
||||||
|
}
|
||||||
|
|
||||||
func checkCompressor(compressor compression.Name) error {
|
func checkCompressor(compressor compression.Name) error {
|
||||||
for c := range compression.ByName {
|
for c := range compression.ByName {
|
||||||
if c == compressor {
|
if c == compressor {
|
||||||
|
|||||||
@ -185,25 +185,59 @@ func (suite *WrapperIntegrationSuite) TestSetCompressor() {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *WrapperIntegrationSuite) TestCompressorSetOnInitAndConnect() {
|
func (suite *WrapperIntegrationSuite) TestConfigDefaultsSetOnInitAndConnect() {
|
||||||
|
table := []struct {
|
||||||
|
name string
|
||||||
|
checkFunc func(*testing.T, *policy.Policy)
|
||||||
|
mutator func(context.Context, *policy.Policy) error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Compression",
|
||||||
|
checkFunc: func(t *testing.T, p *policy.Policy) {
|
||||||
|
t.Helper()
|
||||||
|
require.Equal(t, defaultCompressor, string(p.CompressionPolicy.CompressorName))
|
||||||
|
},
|
||||||
|
mutator: func(innerCtx context.Context, p *policy.Policy) error {
|
||||||
|
_, res := updateCompressionOnPolicy("pgzip", p)
|
||||||
|
return res
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Retention",
|
||||||
|
checkFunc: func(t *testing.T, p *policy.Policy) {
|
||||||
|
t.Helper()
|
||||||
|
require.Equal(
|
||||||
|
t,
|
||||||
|
defaultRetention,
|
||||||
|
p.RetentionPolicy,
|
||||||
|
)
|
||||||
|
},
|
||||||
|
mutator: func(innerCtx context.Context, p *policy.Policy) error {
|
||||||
|
newRetentionDaily := policy.OptionalInt(42)
|
||||||
|
newRetention := policy.RetentionPolicy{KeepDaily: &newRetentionDaily}
|
||||||
|
updateRetentionOnPolicy(newRetention, p)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range table {
|
||||||
|
suite.T().Run(test.name, func(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
t := suite.T()
|
|
||||||
tmpComp := "pgzip"
|
|
||||||
|
|
||||||
k, err := openKopiaRepo(t, ctx)
|
k, err := openKopiaRepo(t, ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Check the policy was actually created and has the right compressor.
|
|
||||||
p, err := k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo)
|
p, err := k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, defaultCompressor, string(p.CompressionPolicy.CompressorName))
|
test.checkFunc(t, p)
|
||||||
|
|
||||||
// Change the compressor to something else.
|
require.NoError(t, test.mutator(ctx, p))
|
||||||
require.NoError(t, k.Compression(ctx, tmpComp))
|
require.NoError(t, k.writeGlobalPolicy(ctx, "TestDefaultPolicyConfigSet", p))
|
||||||
require.NoError(t, k.Close(ctx))
|
require.NoError(t, k.Close(ctx))
|
||||||
|
|
||||||
// Re-open with Connect to see if the compressor changed back.
|
|
||||||
require.NoError(t, k.Connect(ctx))
|
require.NoError(t, k.Connect(ctx))
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -213,7 +247,9 @@ func (suite *WrapperIntegrationSuite) TestCompressorSetOnInitAndConnect() {
|
|||||||
p, err = k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo)
|
p, err = k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assert.Equal(t, defaultCompressor, string(p.CompressionPolicy.CompressorName))
|
test.checkFunc(t, p)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (suite *WrapperIntegrationSuite) TestInitAndConnWithTempDirectory() {
|
func (suite *WrapperIntegrationSuite) TestInitAndConnWithTempDirectory() {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user