Keepers 5a78f478a1
add operations test for adv rest (#3783)
ads operations level tests for advanced restore
configuration on all three services.
Code is largely boilerplate between each service, but with just enough quirks that full consolidation would require excess jumping through hoops.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🤖 Supportability/Tests

#### Issue(s)

* #3562

#### Test Plan

- [x] 💚 E2E
2023-07-17 03:56:08 +00:00

428 lines
9.2 KiB
Go

package kopia
import (
"context"
"path/filepath"
"sync"
"time"
"github.com/alcionai/clues"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
"github.com/pkg/errors"
"github.com/alcionai/corso/src/pkg/control/repository"
"github.com/alcionai/corso/src/pkg/storage"
)
const (
defaultKopiaConfigDir = "/tmp/"
defaultKopiaConfigFile = "repository.config"
defaultCompressor = "zstd-better-compression"
// Interval of 0 disables scheduling.
defaultSchedulingInterval = time.Second * 0
)
var (
ErrSettingDefaultConfig = clues.New("setting default repo config values")
ErrorRepoAlreadyExists = clues.New("repo already exists")
)
// Having all fields set to 0 causes it to keep max-int versions of snapshots.
var (
zeroOpt = policy.OptionalInt(0)
defaultRetention = policy.RetentionPolicy{
KeepLatest: &zeroOpt,
KeepHourly: &zeroOpt,
KeepWeekly: &zeroOpt,
KeepDaily: &zeroOpt,
KeepMonthly: &zeroOpt,
KeepAnnual: &zeroOpt,
}
)
type snapshotLoader interface {
SnapshotRoot(man *snapshot.Manifest) (fs.Entry, error)
}
var (
_ snapshotManager = &conn{}
_ snapshotLoader = &conn{}
)
type conn struct {
storage storage.Storage
repo.Repository
mu sync.Mutex
refCount int
}
func NewConn(s storage.Storage) *conn {
return &conn{
storage: s,
}
}
func (w *conn) Initialize(ctx context.Context, opts repository.Options) error {
bst, err := blobStoreByProvider(ctx, opts, w.storage)
if err != nil {
return clues.Wrap(err, "initializing storage")
}
defer bst.Close(ctx)
cfg, err := w.storage.CommonConfig()
if err != nil {
return clues.Stack(err).WithClues(ctx)
}
// todo - issue #75: nil here should be a storage.NewRepoOptions()
if err = repo.Initialize(ctx, bst, nil, cfg.CorsoPassphrase); err != nil {
if errors.Is(err, repo.ErrAlreadyInitialized) {
return clues.Stack(ErrorRepoAlreadyExists, err).WithClues(ctx)
}
return clues.Wrap(err, "initializing repo").WithClues(ctx)
}
err = w.commonConnect(
ctx,
opts,
cfg.KopiaCfgDir,
bst,
cfg.CorsoPassphrase,
defaultCompressor,
)
if err != nil {
return err
}
if err := w.setDefaultConfigValues(ctx); err != nil {
return clues.Stack(err).WithClues(ctx)
}
return nil
}
func (w *conn) Connect(ctx context.Context, opts repository.Options) error {
bst, err := blobStoreByProvider(ctx, opts, w.storage)
if err != nil {
return clues.Wrap(err, "initializing storage")
}
defer bst.Close(ctx)
cfg, err := w.storage.CommonConfig()
if err != nil {
return clues.Stack(err).WithClues(ctx)
}
return w.commonConnect(
ctx,
opts,
cfg.KopiaCfgDir,
bst,
cfg.CorsoPassphrase,
defaultCompressor,
)
}
func (w *conn) commonConnect(
ctx context.Context,
opts repository.Options,
configDir string,
bst blob.Storage,
password, compressor string,
) error {
kopiaOpts := &repo.ConnectOptions{
ClientOptions: repo.ClientOptions{
Username: opts.User,
Hostname: opts.Host,
},
}
if len(configDir) > 0 {
kopiaOpts.CachingOptions = content.CachingOptions{
CacheDirectory: configDir,
}
} else {
configDir = defaultKopiaConfigDir
}
cfgFile := filepath.Join(configDir, defaultKopiaConfigFile)
// todo - issue #75: nil here should be storage.ConnectOptions()
if err := repo.Connect(
ctx,
cfgFile,
bst,
password,
kopiaOpts,
); err != nil {
return clues.Wrap(err, "connecting to repo").WithClues(ctx)
}
if err := w.open(ctx, cfgFile, password); err != nil {
return clues.Stack(err).WithClues(ctx)
}
return nil
}
func blobStoreByProvider(
ctx context.Context,
opts repository.Options,
s storage.Storage,
) (blob.Storage, error) {
switch s.Provider {
case storage.ProviderS3:
return s3BlobStorage(ctx, opts, s)
default:
return nil, clues.New("storage provider details are required").WithClues(ctx)
}
}
func (w *conn) Close(ctx context.Context) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.refCount == 0 {
return nil
}
w.refCount--
if w.refCount > 0 {
return nil
}
return w.close(ctx)
}
// close closes the kopia handle. Safe to run without the mutex because other
// functions check only the refCount variable.
func (w *conn) close(ctx context.Context) error {
err := w.Repository.Close(ctx)
w.Repository = nil
if err != nil {
return clues.Wrap(err, "closing repository connection").WithClues(ctx)
}
return nil
}
func (w *conn) open(ctx context.Context, configPath, password string) error {
w.mu.Lock()
defer w.mu.Unlock()
w.refCount++
// TODO(ashmrtnz): issue #75: nil here should be storage.ConnectionOptions().
rep, err := repo.Open(ctx, configPath, password, nil)
if err != nil {
return clues.Wrap(err, "opening repository connection").WithClues(ctx)
}
w.Repository = rep
return nil
}
func (w *conn) wrap() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.refCount == 0 {
return clues.New("conn not established or already closed")
}
w.refCount++
return nil
}
func (w *conn) setDefaultConfigValues(ctx context.Context) error {
p, err := w.getGlobalPolicyOrEmpty(ctx)
if err != nil {
return clues.Stack(ErrSettingDefaultConfig, err)
}
changed, err := updateCompressionOnPolicy(defaultCompressor, p)
if err != nil {
return clues.Stack(ErrSettingDefaultConfig, err)
}
if updateRetentionOnPolicy(defaultRetention, p) {
changed = true
}
if updateSchedulingOnPolicy(defaultSchedulingInterval, p) {
changed = true
}
if !changed {
return nil
}
if err := w.writeGlobalPolicy(ctx, "UpdateGlobalPolicyWithDefaults", p); err != nil {
return clues.Wrap(err, "updating global policy with defaults")
}
return nil
}
// Compression attempts to set the global compression policy for the kopia repo
// to the given compressor.
func (w *conn) Compression(ctx context.Context, compressor string) error {
// Redo this check so we can exit without looking up a policy if a bad
// compressor was given.
comp := compression.Name(compressor)
if err := checkCompressor(comp); err != nil {
return clues.Stack(err).WithClues(ctx)
}
p, err := w.getGlobalPolicyOrEmpty(ctx)
if err != nil {
return err
}
changed, err := updateCompressionOnPolicy(compressor, p)
if err != nil {
return clues.Stack(err).WithClues(ctx)
}
if !changed {
return nil
}
if err := w.writeGlobalPolicy(ctx, "UpdateGlobalCompressionPolicy", p); err != nil {
return clues.Wrap(err, "updating global compression policy")
}
return nil
}
func updateCompressionOnPolicy(compressor string, p *policy.Policy) (bool, error) {
comp := compression.Name(compressor)
if err := checkCompressor(comp); err != nil {
return false, err
}
if comp == p.CompressionPolicy.CompressorName {
return false, nil
}
p.CompressionPolicy = policy.CompressionPolicy{
CompressorName: comp,
}
return true, nil
}
func updateRetentionOnPolicy(retention policy.RetentionPolicy, p *policy.Policy) bool {
if retention == p.RetentionPolicy {
return false
}
p.RetentionPolicy = retention
return true
}
func updateSchedulingOnPolicy(
interval time.Duration,
p *policy.Policy,
) bool {
if p.SchedulingPolicy.Interval() == interval {
return false
}
p.SchedulingPolicy.SetInterval(interval)
return true
}
func (w *conn) getGlobalPolicyOrEmpty(ctx context.Context) (*policy.Policy, error) {
si := policy.GlobalPolicySourceInfo
return w.getPolicyOrEmpty(ctx, si)
}
func (w *conn) getPolicyOrEmpty(ctx context.Context, si snapshot.SourceInfo) (*policy.Policy, error) {
p, err := policy.GetDefinedPolicy(ctx, w.Repository, si)
if err != nil {
if errors.Is(err, policy.ErrPolicyNotFound) {
return &policy.Policy{}, nil
}
return nil, clues.Wrap(err, "getting backup policy").With("source_info", si).WithClues(ctx)
}
return p, nil
}
func (w *conn) writeGlobalPolicy(
ctx context.Context,
purpose string,
p *policy.Policy,
) error {
si := policy.GlobalPolicySourceInfo
return w.writePolicy(ctx, purpose, si, p)
}
func (w *conn) writePolicy(
ctx context.Context,
purpose string,
si snapshot.SourceInfo,
p *policy.Policy,
) error {
ctx = clues.Add(ctx, "source_info", si)
writeOpts := repo.WriteSessionOptions{Purpose: purpose}
ctr := func(innerCtx context.Context, rw repo.RepositoryWriter) error {
if err := policy.SetPolicy(ctx, rw, si, p); err != nil {
return clues.Stack(err).WithClues(innerCtx)
}
return nil
}
if err := repo.WriteSession(ctx, w.Repository, writeOpts, ctr); err != nil {
return clues.Wrap(err, "updating policy").WithClues(ctx)
}
return nil
}
func checkCompressor(compressor compression.Name) error {
for c := range compression.ByName {
if c == compressor {
return nil
}
}
return clues.Stack(clues.New("unknown compressor type"), clues.New(string(compressor)))
}
func (w *conn) LoadSnapshot(
ctx context.Context,
id manifest.ID,
) (*snapshot.Manifest, error) {
man, err := snapshot.LoadSnapshot(ctx, w.Repository, id)
if err != nil {
return nil, clues.Stack(err).WithClues(ctx)
}
return man, nil
}
func (w *conn) SnapshotRoot(man *snapshot.Manifest) (fs.Entry, error) {
return snapshotfs.SnapshotRoot(w.Repository, man)
}