Cleanup figuring out kopia retention parameters (#3857)

Create a struct that handles:
* initialization from existing kopia config info
* in-memory updates to config info
* detecting which config info structs from kopia need updated
* returning kopia config info structs

Overall, this allows us to isolate the logic for
calculating the new retention configuration info
in kopia

Viewing by commit may help.
First commit just splits up existing code, moving it into
either conn.go (will be used later) or retention/opts.go.
Subsequent commits switch to using a struct, add tests,
and fixup existing logic

---

#### Does this PR need a docs update or release note?

- [ ]  Yes, it's included
- [ ] 🕐 Yes, but in a later PR
- [x]  No

#### Type of change

- [ ] 🌻 Feature
- [ ] 🐛 Bugfix
- [ ] 🗺️ Documentation
- [ ] 🤖 Supportability/Tests
- [ ] 💻 CI/Deployment
- [x] 🧹 Tech Debt/Cleanup

#### Test Plan

- [ ] 💪 Manual
- [x]  Unit test
- [ ] 💚 E2E
This commit is contained in:
ashmrtn 2023-07-24 11:43:27 -07:00 committed by GitHub
parent 0bb2c06a43
commit ab344422d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 463 additions and 205 deletions

View File

@ -12,12 +12,16 @@ import (
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/kopia/retention"
"github.com/alcionai/corso/src/pkg/control/repository"
"github.com/alcionai/corso/src/pkg/storage"
)
@ -326,12 +330,12 @@ func updateCompressionOnPolicy(compressor string, p *policy.Policy) (bool, error
return true, nil
}
func updateRetentionOnPolicy(retention policy.RetentionPolicy, p *policy.Policy) bool {
if retention == p.RetentionPolicy {
func updateRetentionOnPolicy(retPolicy policy.RetentionPolicy, p *policy.Policy) bool {
if retPolicy == p.RetentionPolicy {
return false
}
p.RetentionPolicy = retention
p.RetentionPolicy = retPolicy
return true
}
@ -410,6 +414,118 @@ func checkCompressor(compressor compression.Name) error {
return clues.Stack(clues.New("unknown compressor type"), clues.New(string(compressor)))
}
func (w *conn) setRetentionParameters(
ctx context.Context,
rrOpts repository.Retention,
) error {
if rrOpts.Mode == nil && rrOpts.Duration == nil && rrOpts.Extend == nil {
return nil
}
// Somewhat confusing case, when we have no retention but a non-zero duration
// it acts like we passed in only the duration and returns an error about
// having to set both. Return a clearer error here instead.
if ptr.Val(rrOpts.Mode) == repository.NoRetention && ptr.Val(rrOpts.Duration) != 0 {
return clues.New("duration must be 0 if rrOpts is disabled").WithClues(ctx)
}
dr, ok := w.Repository.(repo.DirectRepository)
if !ok {
return clues.New("getting handle to repo").WithClues(ctx)
}
blobCfg, params, err := getRetentionConfigs(ctx, dr)
if err != nil {
return clues.Stack(err)
}
opts := retention.OptsFromConfigs(*blobCfg, *params)
if err := opts.Set(rrOpts); err != nil {
return clues.Stack(err).WithClues(ctx)
}
return clues.Stack(persistRetentionConfigs(ctx, dr, opts)).OrNil()
}
func getRetentionConfigs(
ctx context.Context,
dr repo.DirectRepository,
) (*format.BlobStorageConfiguration, *maintenance.Params, error) {
blobCfg, err := dr.FormatManager().BlobCfgBlob()
if err != nil {
return nil, nil, clues.Wrap(err, "getting storage config").WithClues(ctx)
}
params, err := maintenance.GetParams(ctx, dr)
if err != nil {
return nil, nil, clues.Wrap(err, "getting maintenance config").WithClues(ctx)
}
return &blobCfg, params, nil
}
func persistRetentionConfigs(
ctx context.Context,
dr repo.DirectRepository,
opts *retention.Opts,
) error {
// Persist changes.
if !opts.BlobChanged() && !opts.ParamsChanged() {
return nil
}
blobCfg, params, err := opts.AsConfigs(ctx)
if err != nil {
return clues.Stack(err)
}
mp, err := dr.FormatManager().GetMutableParameters()
if err != nil {
return clues.Wrap(err, "getting mutable parameters").WithClues(ctx)
}
requiredFeatures, err := dr.FormatManager().RequiredFeatures()
if err != nil {
return clues.Wrap(err, "getting required features").WithClues(ctx)
}
// Must be the case that only blob changed.
if !opts.ParamsChanged() {
return clues.Wrap(
dr.FormatManager().SetParameters(ctx, mp, blobCfg, requiredFeatures),
"persisting storage config",
).WithClues(ctx).OrNil()
}
// Both blob and maintenance changed. A DirectWriteSession is required to
// update the maintenance config but not the blob config.
err = repo.DirectWriteSession(
ctx,
dr,
repo.WriteSessionOptions{
Purpose: "Corso immutable backups config",
},
func(ctx context.Context, dw repo.DirectRepositoryWriter) error {
// Set the maintenance config first as we can bail out of the write
// session later.
if err := maintenance.SetParams(ctx, dw, &params); err != nil {
return clues.Wrap(err, "maintenance config").
WithClues(ctx)
}
if !opts.BlobChanged() {
return nil
}
return clues.Wrap(
dr.FormatManager().SetParameters(ctx, mp, blobCfg, requiredFeatures),
"storage config",
).WithClues(ctx).OrNil()
})
return clues.Wrap(err, "persisting config changes").WithClues(ctx).OrNil()
}
func (w *conn) LoadSnapshot(
ctx context.Context,
id manifest.ID,

View File

@ -0,0 +1,139 @@
package retention
import (
"context"
"time"
"github.com/alcionai/clues"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/maintenance"
"github.com/alcionai/corso/src/pkg/control/repository"
)
type Opts struct {
blobCfg format.BlobStorageConfiguration
params maintenance.Params
blobChanged bool
paramsChanged bool
}
func NewOpts() *Opts {
return &Opts{}
}
func OptsFromConfigs(
blobCfg format.BlobStorageConfiguration,
params maintenance.Params,
) *Opts {
return &Opts{
blobCfg: blobCfg,
params: params,
}
}
func (r *Opts) AsConfigs(
ctx context.Context,
) (format.BlobStorageConfiguration, maintenance.Params, error) {
// Check the new config is valid.
if r.blobCfg.IsRetentionEnabled() {
if err := maintenance.CheckExtendRetention(ctx, r.blobCfg, &r.params); err != nil {
return format.BlobStorageConfiguration{}, maintenance.Params{}, clues.Wrap(
err,
"invalid retention config",
).WithClues(ctx)
}
}
return r.blobCfg, r.params, nil
}
func (r *Opts) BlobChanged() bool {
return r.blobChanged
}
func (r *Opts) ParamsChanged() bool {
return r.paramsChanged
}
func (r *Opts) Set(opts repository.Retention) error {
r.setMaintenanceParams(opts.Extend)
return clues.Wrap(
r.setBlobConfigParams(opts.Mode, opts.Duration),
"setting mode or duration",
).OrNil()
}
func (r *Opts) setMaintenanceParams(extend *bool) {
if extend != nil && r.params.ExtendObjectLocks != *extend {
r.params.ExtendObjectLocks = *extend
r.paramsChanged = true
}
}
func (r *Opts) setBlobConfigParams(
mode *repository.RetentionMode,
duration *time.Duration,
) error {
err := r.setBlobConfigMode(mode)
if err != nil {
return clues.Stack(err)
}
r.setBlobConfigDuration(duration)
return nil
}
func (r *Opts) setBlobConfigDuration(duration *time.Duration) {
if duration != nil && r.blobCfg.RetentionPeriod != *duration {
r.blobCfg.RetentionPeriod = *duration
r.blobChanged = true
}
}
func (r *Opts) setBlobConfigMode(
mode *repository.RetentionMode,
) error {
if mode == nil {
return nil
}
startMode := r.blobCfg.RetentionMode
switch *mode {
case repository.NoRetention:
if !r.blobCfg.IsRetentionEnabled() {
return nil
}
r.blobCfg.RetentionMode = ""
r.blobCfg.RetentionPeriod = 0
case repository.GovernanceRetention:
r.blobCfg.RetentionMode = blob.Governance
case repository.ComplianceRetention:
r.blobCfg.RetentionMode = blob.Compliance
default:
return clues.New("unknown retention mode").
With("provided_retention_mode", mode.String())
}
// Only check if the retention mode is not empty. IsValid errors out if it's
// empty.
if len(r.blobCfg.RetentionMode) > 0 && !r.blobCfg.RetentionMode.IsValid() {
return clues.New("invalid retention mode").
With("retention_mode", r.blobCfg.RetentionMode)
}
// Take into account previous operations on r that could have already updated
// blobChanged.
r.blobChanged = r.blobChanged || startMode != r.blobCfg.RetentionMode
return nil
}

View File

@ -0,0 +1,204 @@
package retention_test
import (
"testing"
"time"
"github.com/alcionai/clues"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/maintenance"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/kopia/retention"
"github.com/alcionai/corso/src/internal/tester"
"github.com/alcionai/corso/src/pkg/control/repository"
)
type OptsUnitSuite struct {
tester.Suite
}
func TestOptsUnitSuite(t *testing.T) {
suite.Run(t, &OptsUnitSuite{Suite: tester.NewUnitSuite(t)})
}
func (suite *OptsUnitSuite) TestOptsFromConfigs() {
var (
t = suite.T()
mode = blob.Governance
duration = time.Hour * 48
extend = true
blobCfgInput = format.BlobStorageConfiguration{
RetentionMode: mode,
RetentionPeriod: duration,
}
paramsInput = maintenance.Params{ExtendObjectLocks: extend}
)
ctx, flush := tester.NewContext(t)
defer flush()
opts := retention.OptsFromConfigs(blobCfgInput, paramsInput)
assert.False(t, opts.BlobChanged(), "BlobChanged")
assert.False(t, opts.ParamsChanged(), "ParamsChanged")
blobCfg, params, err := opts.AsConfigs(ctx)
require.NoError(t, err, "AsConfigs: %v", clues.ToCore(err))
assert.Equal(t, blobCfgInput, blobCfg)
assert.Equal(t, paramsInput, params)
}
func (suite *OptsUnitSuite) TestSet() {
var (
kopiaMode = blob.Governance
mode = repository.GovernanceRetention
duration = time.Hour * 48
)
table := []struct {
name string
inputBlob format.BlobStorageConfiguration
inputParams maintenance.Params
ctrlOpts repository.Retention
setErr require.ErrorAssertionFunc
expectMode blob.RetentionMode
expectDuration time.Duration
expectExtend bool
expectBlobChanged bool
expectParamsChanged bool
}{
{
name: "All Nils",
setErr: require.NoError,
},
{
name: "All Off",
ctrlOpts: repository.Retention{
Mode: ptr.To(repository.NoRetention),
Duration: ptr.To(time.Duration(0)),
Extend: ptr.To(false),
},
setErr: require.NoError,
},
{
name: "UnknownRetention",
ctrlOpts: repository.Retention{
Mode: ptr.To(repository.UnknownRetention),
Duration: ptr.To(duration),
},
setErr: require.Error,
},
{
name: "Invalid Retention Mode",
ctrlOpts: repository.Retention{
Mode: ptr.To(repository.RetentionMode(-1)),
Duration: ptr.To(duration),
},
setErr: require.Error,
},
{
name: "Valid Set All",
ctrlOpts: repository.Retention{
Mode: ptr.To(mode),
Duration: ptr.To(duration),
Extend: ptr.To(true),
},
setErr: require.NoError,
expectMode: kopiaMode,
expectDuration: duration,
expectExtend: true,
expectBlobChanged: true,
expectParamsChanged: true,
},
{
name: "Valid Set BlobConfig",
ctrlOpts: repository.Retention{
Mode: ptr.To(mode),
Duration: ptr.To(duration),
},
setErr: require.NoError,
expectMode: kopiaMode,
expectDuration: duration,
expectBlobChanged: true,
},
{
name: "Valid Set Params",
ctrlOpts: repository.Retention{
Extend: ptr.To(true),
},
setErr: require.NoError,
expectExtend: true,
expectParamsChanged: true,
},
{
name: "Partial BlobConfig Change",
inputBlob: format.BlobStorageConfiguration{
RetentionMode: kopiaMode,
RetentionPeriod: duration,
},
ctrlOpts: repository.Retention{
Duration: ptr.To(duration + time.Hour),
},
setErr: require.NoError,
expectMode: kopiaMode,
expectDuration: duration + time.Hour,
expectBlobChanged: true,
},
{
name: "No BlobConfig Change",
inputBlob: format.BlobStorageConfiguration{
RetentionMode: kopiaMode,
RetentionPeriod: duration,
},
ctrlOpts: repository.Retention{
Mode: ptr.To(mode),
Duration: ptr.To(duration),
},
setErr: require.NoError,
expectMode: kopiaMode,
expectDuration: duration,
},
{
name: "No Params Change",
inputParams: maintenance.Params{ExtendObjectLocks: true},
ctrlOpts: repository.Retention{
Extend: ptr.To(true),
},
setErr: require.NoError,
expectExtend: true,
},
}
for _, test := range table {
suite.Run(test.name, func() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
opts := retention.OptsFromConfigs(test.inputBlob, test.inputParams)
err := opts.Set(test.ctrlOpts)
test.setErr(t, err, "setting params: %v", clues.ToCore(err))
if err != nil {
return
}
blobCfg, params, err := opts.AsConfigs(ctx)
require.NoError(t, err, "getting configs: %v", clues.ToCore(err))
assert.Equal(t, test.expectMode, blobCfg.RetentionMode, "mode")
assert.Equal(t, test.expectDuration, blobCfg.RetentionPeriod, "duration")
assert.Equal(t, test.expectExtend, params.ExtendObjectLocks, "extend locks")
assert.Equal(t, test.expectBlobChanged, opts.BlobChanged(), "blob changed")
assert.Equal(t, test.expectParamsChanged, opts.ParamsChanged(), "params changed")
})
}
}

View File

@ -4,13 +4,10 @@ import (
"context"
"errors"
"strings"
"time"
"github.com/alcionai/clues"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
@ -20,7 +17,6 @@ import (
"golang.org/x/exp/maps"
"github.com/alcionai/corso/src/internal/common/prefixmatcher"
"github.com/alcionai/corso/src/internal/common/ptr"
"github.com/alcionai/corso/src/internal/data"
"github.com/alcionai/corso/src/internal/diagnostics"
"github.com/alcionai/corso/src/internal/observe"
@ -737,202 +733,5 @@ func (w *Wrapper) SetRetentionParameters(
ctx context.Context,
retention repository.Retention,
) error {
if retention.Mode == nil && retention.Duration == nil && retention.Extend == nil {
return nil
}
// Somewhat confusing case, when we have no retention but a non-zero duration
// it acts like we passed in only the duration and returns an error about
// having to set both. Return a clearer error here instead. Check if mode is
// set so we still allow changing duration if mode is already set.
if m, ok := ptr.ValOK(retention.Mode); ok && m == repository.NoRetention && ptr.Val(retention.Duration) != 0 {
return clues.New("duration must be 0 if retention is disabled").WithClues(ctx)
}
dr, ok := w.c.Repository.(repo.DirectRepository)
if !ok {
return clues.New("getting handle to repo").WithClues(ctx)
}
blobCfg, params, err := getRetentionConfigs(ctx, dr)
if err != nil {
return clues.Stack(err)
}
// Update blob config information.
blobChanged, err := w.setBlobConfigParams(retention.Mode, retention.Duration, blobCfg)
if err != nil {
return clues.Wrap(err, "setting retention mode or duration").WithClues(ctx)
}
// Update maintenance config information.
var maintenanceChanged bool
if retention.Extend != nil && params.ExtendObjectLocks != *retention.Extend {
params.ExtendObjectLocks = *retention.Extend
maintenanceChanged = true
}
// Check the new config is valid.
if blobCfg.IsRetentionEnabled() {
if err := maintenance.CheckExtendRetention(ctx, *blobCfg, params); err != nil {
return clues.Wrap(err, "invalid retention config").WithClues(ctx)
}
}
return clues.Stack(persistRetentionConfigs(
ctx,
dr,
blobCfg,
blobChanged,
params,
maintenanceChanged,
)).OrNil()
}
func getRetentionConfigs(
ctx context.Context,
dr repo.DirectRepository,
) (*format.BlobStorageConfiguration, *maintenance.Params, error) {
blobCfg, err := dr.FormatManager().BlobCfgBlob()
if err != nil {
return nil, nil, clues.Wrap(err, "getting storage config").WithClues(ctx)
}
params, err := maintenance.GetParams(ctx, dr)
if err != nil {
return nil, nil, clues.Wrap(err, "getting maintenance config").WithClues(ctx)
}
return &blobCfg, params, nil
}
func persistRetentionConfigs(
ctx context.Context,
dr repo.DirectRepository,
blobCfg *format.BlobStorageConfiguration,
blobChanged bool,
params *maintenance.Params,
maintenanceChanged bool,
) error {
// Persist changes.
if !blobChanged && !maintenanceChanged {
return nil
}
mp, err := dr.FormatManager().GetMutableParameters()
if err != nil {
return clues.Wrap(err, "getting mutable parameters")
}
requiredFeatures, err := dr.FormatManager().RequiredFeatures()
if err != nil {
return clues.Wrap(err, "getting required features").WithClues(ctx)
}
// Must be the case that only blob changed.
if !maintenanceChanged {
return clues.Wrap(
dr.FormatManager().SetParameters(ctx, mp, *blobCfg, requiredFeatures),
"persisting storage config",
).WithClues(ctx).OrNil()
}
// Both blob and maintenance changed. A DirectWriteSession is required to
// update the maintenance config but not the blob config.
err = repo.DirectWriteSession(
ctx,
dr,
repo.WriteSessionOptions{
Purpose: "Corso immutable backups config",
},
func(ctx context.Context, dw repo.DirectRepositoryWriter) error {
// Set the maintenance config first as we can bail out of the write
// session later.
if err := maintenance.SetParams(ctx, dw, params); err != nil {
return clues.Wrap(err, "maintenance config").
WithClues(ctx)
}
if !blobChanged {
return nil
}
return clues.Wrap(
dr.FormatManager().SetParameters(ctx, mp, *blobCfg, requiredFeatures),
"storage config",
).WithClues(ctx).OrNil()
})
return clues.Wrap(err, "persisting config changes").WithClues(ctx).OrNil()
}
func (w Wrapper) setBlobConfigParams(
mode *repository.RetentionMode,
duration *time.Duration,
blobCfg *format.BlobStorageConfiguration,
) (bool, error) {
changed, err := setBlobConfigMode(mode, blobCfg)
if err != nil {
return false, clues.Stack(err)
}
tmp := setBlobConfigDuration(duration, blobCfg)
changed = changed || tmp
return changed, nil
}
func setBlobConfigDuration(
duration *time.Duration,
blobCfg *format.BlobStorageConfiguration,
) bool {
var changed bool
if duration != nil && blobCfg.RetentionPeriod != *duration {
blobCfg.RetentionPeriod = *duration
changed = true
}
return changed
}
func setBlobConfigMode(
mode *repository.RetentionMode,
blobCfg *format.BlobStorageConfiguration,
) (bool, error) {
if mode == nil {
return false, nil
}
startMode := blobCfg.RetentionMode
switch *mode {
case repository.NoRetention:
if !blobCfg.IsRetentionEnabled() {
return false, nil
}
blobCfg.RetentionMode = ""
blobCfg.RetentionPeriod = 0
case repository.GovernanceRetention:
blobCfg.RetentionMode = blob.Governance
case repository.ComplianceRetention:
blobCfg.RetentionMode = blob.Compliance
default:
return false, clues.New("unknown retention mode").
With("provided_retention_mode", mode.String())
}
// Only check if the retention mode is not empty. IsValid errors out if it's
// empty.
if len(blobCfg.RetentionMode) > 0 && !blobCfg.RetentionMode.IsValid() {
return false, clues.New("invalid retention mode").
With("retention_mode", blobCfg.RetentionMode)
}
return startMode != blobCfg.RetentionMode, nil
return clues.Stack(w.c.setRetentionParameters(ctx, retention)).OrNil()
}