From 8678c0b1591eee5fbaeac9c072544bfb7cdca27e Mon Sep 17 00:00:00 2001 From: ashmrtn Date: Tue, 20 Sep 2022 08:09:05 -0700 Subject: [PATCH] Disable kopia retention policy (#904) ## Description Disable the kopia retention policy that it uses by default. All snapshot deletions in corso must be done manually. There should really be a follow up change to this after making a method in kopia public. Right now this PR is going off [some code](https://github.com/kopia/kopia/blob/7bda16ab337d162c61b1565becf02a421b7fd599/snapshot/policy/retention_policy.go#L117) that shows the effective number of "latest" snapshots will default to intMAX if all of the other fields are set. We really should have a regression test or something for this to ensure that's actually the case. I've manually tested this patch, but that's not a scalable solution ## Type of change - [ ] :sunflower: Feature - [x] :bug: Bugfix - [ ] :world_map: Documentation - [ ] :robot: Test - [ ] :computer: CI/Deployment - [ ] :hamster: Trivial/Minor ## Issue(s) * closes #889 ## Test Plan - [x] :muscle: Manual - [x] :zap: Unit test - [ ] :green_heart: E2E --- src/internal/kopia/conn.go | 130 +++++++++++++++++++++++++++----- src/internal/kopia/conn_test.go | 78 +++++++++++++------ 2 files changed, 169 insertions(+), 39 deletions(-) diff --git a/src/internal/kopia/conn.go b/src/internal/kopia/conn.go index ac483d7f3..522472c01 100644 --- a/src/internal/kopia/conn.go +++ b/src/internal/kopia/conn.go @@ -23,11 +23,26 @@ const ( defaultCompressor = "s2-default" ) +const defaultConfigErrTmpl = "setting default repo config values" + var ( errInit = errors.New("initializing repo") errConnect = errors.New("connecting repo") ) +// Having all fields set to 0 causes it to keep max-int versions of snapshots. +var ( + zeroOpt = policy.OptionalInt(0) + defaultRetention = policy.RetentionPolicy{ + KeepLatest: &zeroOpt, + KeepHourly: &zeroOpt, + KeepWeekly: &zeroOpt, + KeepDaily: &zeroOpt, + KeepMonthly: &zeroOpt, + KeepAnnual: &zeroOpt, + } +) + type ErrorRepoAlreadyExists struct { common.Err } @@ -139,7 +154,7 @@ func (w *conn) commonConnect( return err } - return w.Compression(ctx, compressor) + return w.setDefaultConfigValues(ctx) } func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage, error) { @@ -207,40 +222,92 @@ func (w *conn) wrap() error { return nil } +func (w *conn) setDefaultConfigValues(ctx context.Context) error { + p, err := w.getGlobalPolicyOrEmpty(ctx) + if err != nil { + return errors.Wrap(err, defaultConfigErrTmpl) + } + + changed, err := updateCompressionOnPolicy(defaultCompressor, p) + if err != nil { + return errors.Wrap(err, defaultConfigErrTmpl) + } + + if updateRetentionOnPolicy(defaultRetention, p) { + changed = true + } + + if !changed { + return nil + } + + return errors.Wrap( + w.writeGlobalPolicy(ctx, "UpdateGlobalPolicyWithDefaults", p), + "updating global policy with defaults", + ) +} + // Compression attempts to set the global compression policy for the kopia repo // to the given compressor. func (w *conn) Compression(ctx context.Context, compressor string) error { + // Redo this check so we can exit without looking up a policy if a bad + // compressor was given. comp := compression.Name(compressor) - if err := checkCompressor(comp); err != nil { return err } - si := policy.GlobalPolicySourceInfo - - p, err := w.getPolicyOrEmpty(ctx, si) + p, err := w.getGlobalPolicyOrEmpty(ctx) if err != nil { return err } - if compressor == string(p.CompressionPolicy.CompressorName) { + changed, err := updateCompressionOnPolicy(compressor, p) + if err != nil { + return err + } + + if !changed { return nil } - p.CompressionPolicy = policy.CompressionPolicy{ - CompressorName: compression.Name(comp), + return errors.Wrap( + w.writeGlobalPolicy(ctx, "UpdateGlobalCompressionPolicy", p), + "updating global compression policy", + ) +} + +func updateCompressionOnPolicy(compressor string, p *policy.Policy) (bool, error) { + comp := compression.Name(compressor) + + if err := checkCompressor(comp); err != nil { + return false, err } - err = repo.WriteSession( - ctx, - w.Repository, - repo.WriteSessionOptions{Purpose: "UpdateGlobalCompressionPolicy"}, - func(innerCtx context.Context, rw repo.RepositoryWriter) error { - return policy.SetPolicy(ctx, rw, si, p) - }, - ) + if comp == p.CompressionPolicy.CompressorName { + return false, nil + } - return errors.Wrap(err, "updating global compression policy") + p.CompressionPolicy = policy.CompressionPolicy{ + CompressorName: comp, + } + + return true, nil +} + +func updateRetentionOnPolicy(retention policy.RetentionPolicy, p *policy.Policy) bool { + if retention == p.RetentionPolicy { + return false + } + + p.RetentionPolicy = retention + + return true +} + +func (w *conn) getGlobalPolicyOrEmpty(ctx context.Context) (*policy.Policy, error) { + si := policy.GlobalPolicySourceInfo + return w.getPolicyOrEmpty(ctx, si) } func (w *conn) getPolicyOrEmpty(ctx context.Context, si snapshot.SourceInfo) (*policy.Policy, error) { @@ -250,12 +317,39 @@ func (w *conn) getPolicyOrEmpty(ctx context.Context, si snapshot.SourceInfo) (*p return &policy.Policy{}, nil } - return nil, errors.Wrap(err, "getting global backup policy") + return nil, errors.Wrapf(err, "getting backup policy for %+v", si) } return p, nil } +func (w *conn) writeGlobalPolicy( + ctx context.Context, + purpose string, + p *policy.Policy, +) error { + si := policy.GlobalPolicySourceInfo + return w.writePolicy(ctx, purpose, si, p) +} + +func (w *conn) writePolicy( + ctx context.Context, + purpose string, + si snapshot.SourceInfo, + p *policy.Policy, +) error { + err := repo.WriteSession( + ctx, + w.Repository, + repo.WriteSessionOptions{Purpose: purpose}, + func(innerCtx context.Context, rw repo.RepositoryWriter) error { + return policy.SetPolicy(ctx, rw, si, p) + }, + ) + + return errors.Wrapf(err, "updating policy for %+v", si) +} + func checkCompressor(compressor compression.Name) error { for c := range compression.ByName { if c == compressor { diff --git a/src/internal/kopia/conn_test.go b/src/internal/kopia/conn_test.go index a4f325ead..5e96f9cde 100644 --- a/src/internal/kopia/conn_test.go +++ b/src/internal/kopia/conn_test.go @@ -185,35 +185,71 @@ func (suite *WrapperIntegrationSuite) TestSetCompressor() { ) } -func (suite *WrapperIntegrationSuite) TestCompressorSetOnInitAndConnect() { - ctx := context.Background() - t := suite.T() - tmpComp := "pgzip" +func (suite *WrapperIntegrationSuite) TestConfigDefaultsSetOnInitAndConnect() { + table := []struct { + name string + checkFunc func(*testing.T, *policy.Policy) + mutator func(context.Context, *policy.Policy) error + }{ + { + name: "Compression", + checkFunc: func(t *testing.T, p *policy.Policy) { + t.Helper() + require.Equal(t, defaultCompressor, string(p.CompressionPolicy.CompressorName)) + }, + mutator: func(innerCtx context.Context, p *policy.Policy) error { + _, res := updateCompressionOnPolicy("pgzip", p) + return res + }, + }, + { + name: "Retention", + checkFunc: func(t *testing.T, p *policy.Policy) { + t.Helper() + require.Equal( + t, + defaultRetention, + p.RetentionPolicy, + ) + }, + mutator: func(innerCtx context.Context, p *policy.Policy) error { + newRetentionDaily := policy.OptionalInt(42) + newRetention := policy.RetentionPolicy{KeepDaily: &newRetentionDaily} + updateRetentionOnPolicy(newRetention, p) - k, err := openKopiaRepo(t, ctx) - require.NoError(t, err) + return nil + }, + }, + } - // Check the policy was actually created and has the right compressor. - p, err := k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo) - require.NoError(t, err) + for _, test := range table { + suite.T().Run(test.name, func(t *testing.T) { + ctx := context.Background() - require.Equal(t, defaultCompressor, string(p.CompressionPolicy.CompressorName)) + k, err := openKopiaRepo(t, ctx) + require.NoError(t, err) - // Change the compressor to something else. - require.NoError(t, k.Compression(ctx, tmpComp)) - require.NoError(t, k.Close(ctx)) + p, err := k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo) + require.NoError(t, err) - // Re-open with Connect to see if the compressor changed back. - require.NoError(t, k.Connect(ctx)) + test.checkFunc(t, p) - defer func() { - assert.NoError(t, k.Close(ctx)) - }() + require.NoError(t, test.mutator(ctx, p)) + require.NoError(t, k.writeGlobalPolicy(ctx, "TestDefaultPolicyConfigSet", p)) + require.NoError(t, k.Close(ctx)) - p, err = k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo) - require.NoError(t, err) + require.NoError(t, k.Connect(ctx)) - assert.Equal(t, defaultCompressor, string(p.CompressionPolicy.CompressorName)) + defer func() { + assert.NoError(t, k.Close(ctx)) + }() + + p, err = k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo) + require.NoError(t, err) + + test.checkFunc(t, p) + }) + } } func (suite *WrapperIntegrationSuite) TestInitAndConnWithTempDirectory() {