diff --git a/src/internal/kopia/conn.go b/src/internal/kopia/conn.go index b788157f7..cffc33a0e 100644 --- a/src/internal/kopia/conn.go +++ b/src/internal/kopia/conn.go @@ -7,6 +7,7 @@ import ( "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/compression" + "github.com/kopia/kopia/snapshot" "github.com/kopia/kopia/snapshot/policy" "github.com/pkg/errors" @@ -16,6 +17,7 @@ import ( const ( defaultKopiaConfigFilePath = "/tmp/repository.config" + defaultCompressor = "s2-default" ) var ( @@ -69,18 +71,13 @@ func (w *conn) Initialize(ctx context.Context) error { return errors.Wrap(err, errInit.Error()) } - // todo - issue #75: nil here should be a storage.ConnectOptions() - if err := repo.Connect( + return w.commonConnect( ctx, defaultKopiaConfigFilePath, bst, cfg.CorsoPassword, - nil, - ); err != nil { - return errors.Wrap(err, errConnect.Error()) - } - - return w.open(ctx, cfg.CorsoPassword) + defaultCompressor, + ) } func (w *conn) Connect(ctx context.Context) error { @@ -95,18 +92,37 @@ func (w *conn) Connect(ctx context.Context) error { return err } - // todo - issue #75: nil here should be storage.ConnectOptions() - if err := repo.Connect( + return w.commonConnect( ctx, defaultKopiaConfigFilePath, bst, cfg.CorsoPassword, + defaultCompressor, + ) +} + +func (w *conn) commonConnect( + ctx context.Context, + configPath string, + bst blob.Storage, + password, compressor string, +) error { + // todo - issue #75: nil here should be storage.ConnectOptions() + if err := repo.Connect( + ctx, + configPath, + bst, + password, nil, ); err != nil { return errors.Wrap(err, errConnect.Error()) } - return w.open(ctx, cfg.CorsoPassword) + if err := w.open(ctx, configPath, password); err != nil { + return err + } + + return w.Compression(ctx, compressor) } func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage, error) { @@ -144,14 +160,14 @@ func (w *conn) close(ctx context.Context) error { return errors.Wrap(err, "closing repository connection") } -func (w *conn) open(ctx context.Context, password string) error { +func (w *conn) open(ctx context.Context, configPath, password string) error { w.mu.Lock() defer w.mu.Unlock() w.refCount++ // TODO(ashmrtnz): issue #75: nil here should be storage.ConnectionOptions(). - rep, err := repo.Open(ctx, defaultKopiaConfigFilePath, password, nil) + rep, err := repo.Open(ctx, configPath, password, nil) if err != nil { return errors.Wrap(err, "opening repository connection") } @@ -183,13 +199,13 @@ func (w *conn) Compression(ctx context.Context, compressor string) error { si := policy.GlobalPolicySourceInfo - p, err := policy.GetDefinedPolicy(ctx, w.Repository, si) + p, err := w.getPolicyOrEmpty(ctx, si) if err != nil { - if !errors.Is(err, policy.ErrPolicyNotFound) { - return errors.Wrap(err, "getting global compression policy") - } + return err + } - p = &policy.Policy{} + if compressor == string(p.CompressionPolicy.CompressorName) { + return nil } p.CompressionPolicy = policy.CompressionPolicy{ @@ -199,7 +215,7 @@ func (w *conn) Compression(ctx context.Context, compressor string) error { err = repo.WriteSession( ctx, w.Repository, - repo.WriteSessionOptions{Purpose: "UpdateGlobalCompression"}, + repo.WriteSessionOptions{Purpose: "UpdateGlobalCompressionPolicy"}, func(innerCtx context.Context, rw repo.RepositoryWriter) error { return policy.SetPolicy(ctx, rw, si, p) }, @@ -208,6 +224,19 @@ func (w *conn) Compression(ctx context.Context, compressor string) error { return errors.Wrap(err, "updating global compression policy") } +func (w *conn) getPolicyOrEmpty(ctx context.Context, si snapshot.SourceInfo) (*policy.Policy, error) { + p, err := policy.GetDefinedPolicy(ctx, w.Repository, si) + if err != nil { + if errors.Is(err, policy.ErrPolicyNotFound) { + return &policy.Policy{}, nil + } + + return nil, errors.Wrap(err, "getting global backup policy") + } + + return p, nil +} + func checkCompressor(compressor compression.Name) error { for c := range compression.ByName { if c == compressor { diff --git a/src/internal/kopia/conn_test.go b/src/internal/kopia/conn_test.go index 1a83ff6d4..c2f8e5cb2 100644 --- a/src/internal/kopia/conn_test.go +++ b/src/internal/kopia/conn_test.go @@ -124,10 +124,33 @@ func (suite *WrapperIntegrationSuite) TestBadCompressorType() { assert.Error(t, k.Compression(ctx, "not-a-compressor")) } +func (suite *WrapperIntegrationSuite) TestGetPolicyOrDefault_GetsDefault() { + ctx := context.Background() + t := suite.T() + + k, err := openKopiaRepo(t, ctx) + require.NoError(t, err) + + defer func() { + assert.NoError(t, k.Close(ctx)) + }() + + si := snapshot.SourceInfo{ + Host: corsoHost, + UserName: corsoUser, + Path: "test-path-root", + } + + p, err := k.getPolicyOrEmpty(ctx, si) + require.NoError(t, err) + + assert.Equal(t, policy.Policy{}, *p) +} + func (suite *WrapperIntegrationSuite) TestSetCompressor() { ctx := context.Background() t := suite.T() - compressor := "s2-default" + compressor := "pgzip" k, err := openKopiaRepo(t, ctx) require.NoError(t, err) @@ -139,7 +162,7 @@ func (suite *WrapperIntegrationSuite) TestSetCompressor() { assert.NoError(t, k.Compression(ctx, compressor)) // Check the policy was actually created and has the right compressor. - p, err := policy.GetDefinedPolicy(ctx, k.Repository, policy.GlobalPolicySourceInfo) + p, err := k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo) require.NoError(t, err) assert.Equal(t, compressor, string(p.CompressionPolicy.CompressorName)) @@ -161,3 +184,34 @@ func (suite *WrapperIntegrationSuite) TestSetCompressor() { string(policyTree.EffectivePolicy().CompressionPolicy.CompressorName), ) } + +func (suite *WrapperIntegrationSuite) TestCompressorSetOnInitAndConnect() { + ctx := context.Background() + t := suite.T() + tmpComp := "pgzip" + + k, err := openKopiaRepo(t, ctx) + require.NoError(t, err) + + // Check the policy was actually created and has the right compressor. + p, err := k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo) + require.NoError(t, err) + + require.Equal(t, defaultCompressor, string(p.CompressionPolicy.CompressorName)) + + // Change the compressor to something else. + require.NoError(t, k.Compression(ctx, tmpComp)) + require.NoError(t, k.Close(ctx)) + + // Re-open with Connect to see if the compressor changed back. + require.NoError(t, k.Connect(ctx)) + + defer func() { + assert.NoError(t, k.Close(ctx)) + }() + + p, err = k.getPolicyOrEmpty(ctx, policy.GlobalPolicySourceInfo) + require.NoError(t, err) + + assert.Equal(t, defaultCompressor, string(p.CompressionPolicy.CompressorName)) +}