add fault/clues to the rest of internal/kopia (#2375)
## Does this PR need a docs update or release note? - [x] ⛔ No ## Type of change - [x] 🧹 Tech Debt/Cleanup ## Issue(s) * #1970 ## Test Plan - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
129d6b0b0c
commit
924d345ace
@ -6,6 +6,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
@ -17,7 +18,6 @@ import (
|
||||
"github.com/kopia/kopia/snapshot/snapshotfs"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/common"
|
||||
"github.com/alcionai/corso/src/pkg/storage"
|
||||
)
|
||||
|
||||
@ -29,11 +29,9 @@ const (
|
||||
defaultSchedulingInterval = time.Second * 0
|
||||
)
|
||||
|
||||
const defaultConfigErrTmpl = "setting default repo config values"
|
||||
|
||||
var (
|
||||
errInit = errors.New("initializing repo")
|
||||
errConnect = errors.New("connecting repo")
|
||||
ErrSettingDefaultConfig = errors.New("setting default repo config values")
|
||||
ErrorRepoAlreadyExists = errors.New("repo already exists")
|
||||
)
|
||||
|
||||
// Having all fields set to 0 causes it to keep max-int versions of snapshots.
|
||||
@ -53,19 +51,6 @@ type snapshotLoader interface {
|
||||
SnapshotRoot(man *snapshot.Manifest) (fs.Entry, error)
|
||||
}
|
||||
|
||||
type ErrorRepoAlreadyExists struct {
|
||||
common.Err
|
||||
}
|
||||
|
||||
func RepoAlreadyExistsError(e error) error {
|
||||
return ErrorRepoAlreadyExists{*common.EncapsulateError(e)}
|
||||
}
|
||||
|
||||
func IsRepoAlreadyExistsError(e error) bool {
|
||||
var erae ErrorRepoAlreadyExists
|
||||
return errors.As(e, &erae)
|
||||
}
|
||||
|
||||
var (
|
||||
_ snapshotManager = &conn{}
|
||||
_ snapshotLoader = &conn{}
|
||||
@ -87,22 +72,22 @@ func NewConn(s storage.Storage) *conn {
|
||||
func (w *conn) Initialize(ctx context.Context) error {
|
||||
bst, err := blobStoreByProvider(ctx, w.storage)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, errInit.Error())
|
||||
return errors.Wrap(err, "initializing storage")
|
||||
}
|
||||
defer bst.Close(ctx)
|
||||
|
||||
cfg, err := w.storage.CommonConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
return clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
// todo - issue #75: nil here should be a storage.NewRepoOptions()
|
||||
if err = repo.Initialize(ctx, bst, nil, cfg.CorsoPassphrase); err != nil {
|
||||
if errors.Is(err, repo.ErrAlreadyInitialized) {
|
||||
return RepoAlreadyExistsError(err)
|
||||
return clues.Stack(ErrorRepoAlreadyExists, err).WithClues(ctx)
|
||||
}
|
||||
|
||||
return errors.Wrap(err, errInit.Error())
|
||||
return clues.Wrap(err, "initialzing repo").WithClues(ctx)
|
||||
}
|
||||
|
||||
return w.commonConnect(
|
||||
@ -117,13 +102,13 @@ func (w *conn) Initialize(ctx context.Context) error {
|
||||
func (w *conn) Connect(ctx context.Context) error {
|
||||
bst, err := blobStoreByProvider(ctx, w.storage)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, errInit.Error())
|
||||
return errors.Wrap(err, "initializing storage")
|
||||
}
|
||||
defer bst.Close(ctx)
|
||||
|
||||
cfg, err := w.storage.CommonConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
return clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
return w.commonConnect(
|
||||
@ -162,14 +147,18 @@ func (w *conn) commonConnect(
|
||||
password,
|
||||
opts,
|
||||
); err != nil {
|
||||
return errors.Wrap(err, errConnect.Error())
|
||||
return clues.Wrap(err, "connecting to repo").WithClues(ctx)
|
||||
}
|
||||
|
||||
if err := w.open(ctx, cfgFile, password); err != nil {
|
||||
return err
|
||||
return clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
return w.setDefaultConfigValues(ctx)
|
||||
if err := w.setDefaultConfigValues(ctx); err != nil {
|
||||
return clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage, error) {
|
||||
@ -177,7 +166,7 @@ func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage,
|
||||
case storage.ProviderS3:
|
||||
return s3BlobStorage(ctx, s)
|
||||
default:
|
||||
return nil, errors.New("storage provider details are required")
|
||||
return nil, clues.New("storage provider details are required").WithClues(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
@ -204,7 +193,11 @@ func (w *conn) close(ctx context.Context) error {
|
||||
err := w.Repository.Close(ctx)
|
||||
w.Repository = nil
|
||||
|
||||
return errors.Wrap(err, "closing repository connection")
|
||||
if err != nil {
|
||||
return clues.Wrap(err, "closing repository connection").WithClues(ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *conn) open(ctx context.Context, configPath, password string) error {
|
||||
@ -216,7 +209,7 @@ func (w *conn) open(ctx context.Context, configPath, password string) error {
|
||||
// TODO(ashmrtnz): issue #75: nil here should be storage.ConnectionOptions().
|
||||
rep, err := repo.Open(ctx, configPath, password, nil)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "opening repository connection")
|
||||
return clues.Wrap(err, "opening repository connection").WithClues(ctx)
|
||||
}
|
||||
|
||||
w.Repository = rep
|
||||
@ -229,7 +222,7 @@ func (w *conn) wrap() error {
|
||||
defer w.mu.Unlock()
|
||||
|
||||
if w.refCount == 0 {
|
||||
return errors.New("conn already closed")
|
||||
return clues.New("conn already closed")
|
||||
}
|
||||
|
||||
w.refCount++
|
||||
@ -240,12 +233,12 @@ func (w *conn) wrap() error {
|
||||
func (w *conn) setDefaultConfigValues(ctx context.Context) error {
|
||||
p, err := w.getGlobalPolicyOrEmpty(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, defaultConfigErrTmpl)
|
||||
return clues.Stack(ErrSettingDefaultConfig, err)
|
||||
}
|
||||
|
||||
changed, err := updateCompressionOnPolicy(defaultCompressor, p)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, defaultConfigErrTmpl)
|
||||
return clues.Stack(ErrSettingDefaultConfig, err)
|
||||
}
|
||||
|
||||
if updateRetentionOnPolicy(defaultRetention, p) {
|
||||
@ -260,10 +253,11 @@ func (w *conn) setDefaultConfigValues(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.Wrap(
|
||||
w.writeGlobalPolicy(ctx, "UpdateGlobalPolicyWithDefaults", p),
|
||||
"updating global policy with defaults",
|
||||
)
|
||||
if err := w.writeGlobalPolicy(ctx, "UpdateGlobalPolicyWithDefaults", p); err != nil {
|
||||
return clues.Wrap(err, "updating global policy with defaults")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Compression attempts to set the global compression policy for the kopia repo
|
||||
@ -273,7 +267,7 @@ func (w *conn) Compression(ctx context.Context, compressor string) error {
|
||||
// compressor was given.
|
||||
comp := compression.Name(compressor)
|
||||
if err := checkCompressor(comp); err != nil {
|
||||
return err
|
||||
return clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
p, err := w.getGlobalPolicyOrEmpty(ctx)
|
||||
@ -283,17 +277,18 @@ func (w *conn) Compression(ctx context.Context, compressor string) error {
|
||||
|
||||
changed, err := updateCompressionOnPolicy(compressor, p)
|
||||
if err != nil {
|
||||
return err
|
||||
return clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
if !changed {
|
||||
return nil
|
||||
}
|
||||
|
||||
return errors.Wrap(
|
||||
w.writeGlobalPolicy(ctx, "UpdateGlobalCompressionPolicy", p),
|
||||
"updating global compression policy",
|
||||
)
|
||||
if err := w.writeGlobalPolicy(ctx, "UpdateGlobalCompressionPolicy", p); err != nil {
|
||||
return clues.Wrap(err, "updating global compression policy")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateCompressionOnPolicy(compressor string, p *policy.Policy) (bool, error) {
|
||||
@ -349,7 +344,7 @@ func (w *conn) getPolicyOrEmpty(ctx context.Context, si snapshot.SourceInfo) (*p
|
||||
return &policy.Policy{}, nil
|
||||
}
|
||||
|
||||
return nil, errors.Wrapf(err, "getting backup policy for %+v", si)
|
||||
return nil, clues.Wrap(err, "getting backup policy").With("source_info", si).WithClues(ctx)
|
||||
}
|
||||
|
||||
return p, nil
|
||||
@ -370,16 +365,22 @@ func (w *conn) writePolicy(
|
||||
si snapshot.SourceInfo,
|
||||
p *policy.Policy,
|
||||
) error {
|
||||
err := repo.WriteSession(
|
||||
ctx,
|
||||
w.Repository,
|
||||
repo.WriteSessionOptions{Purpose: purpose},
|
||||
func(innerCtx context.Context, rw repo.RepositoryWriter) error {
|
||||
return policy.SetPolicy(ctx, rw, si, p)
|
||||
},
|
||||
)
|
||||
ctx = clues.Add(ctx, "source_info", si)
|
||||
|
||||
return errors.Wrapf(err, "updating policy for %+v", si)
|
||||
writeOpts := repo.WriteSessionOptions{Purpose: purpose}
|
||||
cb := func(innerCtx context.Context, rw repo.RepositoryWriter) error {
|
||||
if err := policy.SetPolicy(ctx, rw, si, p); err != nil {
|
||||
return clues.Stack(err).WithClues(innerCtx)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := repo.WriteSession(ctx, w.Repository, writeOpts, cb); err != nil {
|
||||
return clues.Wrap(err, "updating policy").WithClues(ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkCompressor(compressor compression.Name) error {
|
||||
@ -389,14 +390,19 @@ func checkCompressor(compressor compression.Name) error {
|
||||
}
|
||||
}
|
||||
|
||||
return errors.Errorf("unknown compressor type %s", compressor)
|
||||
return clues.Stack(clues.New("unknown compressor type"), clues.New(string(compressor)))
|
||||
}
|
||||
|
||||
func (w *conn) LoadSnapshots(
|
||||
ctx context.Context,
|
||||
ids []manifest.ID,
|
||||
) ([]*snapshot.Manifest, error) {
|
||||
return snapshot.LoadSnapshots(ctx, w.Repository, ids)
|
||||
mans, err := snapshot.LoadSnapshots(ctx, w.Repository, ids)
|
||||
if err != nil {
|
||||
return nil, clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
return mans, nil
|
||||
}
|
||||
|
||||
func (w *conn) SnapshotRoot(man *snapshot.Manifest) (fs.Entry, error) {
|
||||
|
||||
@ -85,7 +85,7 @@ func (suite *WrapperIntegrationSuite) TestRepoExistsError() {
|
||||
|
||||
err := k.Initialize(ctx)
|
||||
assert.Error(t, err)
|
||||
assert.True(t, IsRepoAlreadyExistsError(err))
|
||||
assert.ErrorIs(t, err, ErrorRepoAlreadyExists)
|
||||
}
|
||||
|
||||
func (suite *WrapperIntegrationSuite) TestBadProviderErrors() {
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/google/uuid"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
@ -59,7 +60,7 @@ func (ms *ModelStore) Close(ctx context.Context) error {
|
||||
// bad model type is given.
|
||||
func tagsForModel(s model.Schema, tags map[string]string) (map[string]string, error) {
|
||||
if _, ok := tags[manifest.TypeLabelKey]; ok {
|
||||
return nil, errors.WithStack(errBadTagKey)
|
||||
return nil, clues.Stack(errBadTagKey)
|
||||
}
|
||||
|
||||
res := make(map[string]string, len(tags)+1)
|
||||
@ -80,11 +81,11 @@ func tagsForModelWithID(
|
||||
tags map[string]string,
|
||||
) (map[string]string, error) {
|
||||
if !s.Valid() {
|
||||
return nil, errors.WithStack(errUnrecognizedSchema)
|
||||
return nil, clues.Stack(errUnrecognizedSchema)
|
||||
}
|
||||
|
||||
if len(id) == 0 {
|
||||
return nil, errors.WithStack(errNoStableID)
|
||||
return nil, clues.Stack(errNoStableID)
|
||||
}
|
||||
|
||||
res, err := tagsForModel(s, tags)
|
||||
@ -93,13 +94,13 @@ func tagsForModelWithID(
|
||||
}
|
||||
|
||||
if _, ok := res[stableIDKey]; ok {
|
||||
return nil, errors.WithStack(errBadTagKey)
|
||||
return nil, clues.Stack(errBadTagKey)
|
||||
}
|
||||
|
||||
res[stableIDKey] = string(id)
|
||||
|
||||
if _, ok := res[modelVersionKey]; ok {
|
||||
return nil, errors.WithStack(errBadTagKey)
|
||||
return nil, clues.Stack(errBadTagKey)
|
||||
}
|
||||
|
||||
res[modelVersionKey] = strconv.Itoa(version)
|
||||
@ -117,7 +118,7 @@ func putInner(
|
||||
create bool,
|
||||
) error {
|
||||
if !s.Valid() {
|
||||
return errors.WithStack(errUnrecognizedSchema)
|
||||
return clues.Stack(errUnrecognizedSchema).WithClues(ctx)
|
||||
}
|
||||
|
||||
base := m.Base()
|
||||
@ -128,13 +129,13 @@ func putInner(
|
||||
tmpTags, err := tagsForModelWithID(s, base.ID, base.Version, base.Tags)
|
||||
if err != nil {
|
||||
// Will be wrapped at a higher layer.
|
||||
return err
|
||||
return clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
id, err := w.PutManifest(ctx, tmpTags, m)
|
||||
if err != nil {
|
||||
// Will be wrapped at a higher layer.
|
||||
return err
|
||||
return clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
base.ModelStoreID = id
|
||||
@ -150,7 +151,7 @@ func (ms *ModelStore) Put(
|
||||
m model.Model,
|
||||
) error {
|
||||
if !s.Valid() {
|
||||
return errors.WithStack(errUnrecognizedSchema)
|
||||
return clues.Stack(errUnrecognizedSchema)
|
||||
}
|
||||
|
||||
m.Base().Version = ms.modelVersion
|
||||
@ -162,14 +163,16 @@ func (ms *ModelStore) Put(
|
||||
func(innerCtx context.Context, w repo.RepositoryWriter) error {
|
||||
err := putInner(innerCtx, w, s, m, true)
|
||||
if err != nil {
|
||||
return err
|
||||
return clues.Stack(err).WithClues(innerCtx)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
})
|
||||
if err != nil {
|
||||
return clues.Wrap(err, "putting model").WithClues(ctx)
|
||||
}
|
||||
|
||||
return errors.Wrap(err, "putting model")
|
||||
return nil
|
||||
}
|
||||
|
||||
func stripHiddenTags(tags map[string]string) {
|
||||
@ -184,7 +187,7 @@ func (ms ModelStore) populateBaseModelFromMetadata(
|
||||
) error {
|
||||
id, ok := m.Labels[stableIDKey]
|
||||
if !ok {
|
||||
return errors.WithStack(errNoStableID)
|
||||
return clues.Stack(errNoStableID)
|
||||
}
|
||||
|
||||
v, err := strconv.Atoi(m.Labels[modelVersionKey])
|
||||
@ -193,7 +196,7 @@ func (ms ModelStore) populateBaseModelFromMetadata(
|
||||
}
|
||||
|
||||
if v != ms.modelVersion {
|
||||
return errors.Errorf("bad model version %s", m.Labels[modelVersionKey])
|
||||
return clues.Wrap(clues.New(m.Labels[modelVersionKey]), "bad model version")
|
||||
}
|
||||
|
||||
base.ModelStoreID = m.ID
|
||||
@ -211,7 +214,7 @@ func (ms ModelStore) baseModelFromMetadata(
|
||||
) (*model.BaseModel, error) {
|
||||
res := &model.BaseModel{}
|
||||
if err := ms.populateBaseModelFromMetadata(res, m); err != nil {
|
||||
return nil, err
|
||||
return nil, clues.Stack(err).WithAll("metadata_id", m.ID, "metadata_modtime", m.ModTime)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
@ -226,21 +229,21 @@ func (ms *ModelStore) GetIDsForType(
|
||||
tags map[string]string,
|
||||
) ([]*model.BaseModel, error) {
|
||||
if !s.Valid() {
|
||||
return nil, errors.WithStack(errUnrecognizedSchema)
|
||||
return nil, clues.Stack(errUnrecognizedSchema).WithClues(ctx)
|
||||
}
|
||||
|
||||
if _, ok := tags[stableIDKey]; ok {
|
||||
return nil, errors.WithStack(errBadTagKey)
|
||||
return nil, clues.Stack(errBadTagKey).WithClues(ctx)
|
||||
}
|
||||
|
||||
tmpTags, err := tagsForModel(s, tags)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "getting model metadata")
|
||||
return nil, clues.Wrap(err, "getting model metadata").WithClues(ctx)
|
||||
}
|
||||
|
||||
metadata, err := ms.c.FindManifests(ctx, tmpTags)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "getting model metadata")
|
||||
return nil, clues.Wrap(err, "getting model metadata").WithClues(ctx)
|
||||
}
|
||||
|
||||
res := make([]*model.BaseModel, 0, len(metadata))
|
||||
@ -248,7 +251,7 @@ func (ms *ModelStore) GetIDsForType(
|
||||
for _, m := range metadata {
|
||||
bm, err := ms.baseModelFromMetadata(m)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "parsing model metadata")
|
||||
return nil, clues.Wrap(err, "parsing model metadata").WithClues(ctx)
|
||||
}
|
||||
|
||||
res = append(res, bm)
|
||||
@ -266,30 +269,30 @@ func (ms *ModelStore) getModelStoreID(
|
||||
id model.StableID,
|
||||
) (manifest.ID, error) {
|
||||
if !s.Valid() {
|
||||
return "", errors.WithStack(errUnrecognizedSchema)
|
||||
return "", clues.Stack(errUnrecognizedSchema).WithClues(ctx)
|
||||
}
|
||||
|
||||
if len(id) == 0 {
|
||||
return "", errors.WithStack(errNoStableID)
|
||||
return "", clues.Stack(errNoStableID).WithClues(ctx)
|
||||
}
|
||||
|
||||
tags := map[string]string{stableIDKey: string(id)}
|
||||
|
||||
metadata, err := ms.c.FindManifests(ctx, tags)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "getting ModelStoreID")
|
||||
return "", clues.Wrap(err, "getting ModelStoreID").WithClues(ctx)
|
||||
}
|
||||
|
||||
if len(metadata) == 0 {
|
||||
return "", errors.Wrap(data.ErrNotFound, "getting ModelStoreID")
|
||||
return "", clues.Wrap(data.ErrNotFound, "getting ModelStoreID").WithClues(ctx)
|
||||
}
|
||||
|
||||
if len(metadata) != 1 {
|
||||
return "", errors.New("multiple models with same StableID")
|
||||
return "", clues.New("multiple models with same StableID").WithClues(ctx)
|
||||
}
|
||||
|
||||
if metadata[0].Labels[manifest.TypeLabelKey] != s.String() {
|
||||
return "", errors.WithStack(errModelTypeMismatch)
|
||||
return "", clues.Stack(errModelTypeMismatch).WithClues(ctx)
|
||||
}
|
||||
|
||||
return metadata[0].ID, nil
|
||||
@ -305,7 +308,7 @@ func (ms *ModelStore) Get(
|
||||
m model.Model,
|
||||
) error {
|
||||
if !s.Valid() {
|
||||
return errors.WithStack(errUnrecognizedSchema)
|
||||
return clues.Stack(errUnrecognizedSchema).WithClues(ctx)
|
||||
}
|
||||
|
||||
modelID, err := ms.getModelStoreID(ctx, s, id)
|
||||
@ -313,7 +316,7 @@ func (ms *ModelStore) Get(
|
||||
return err
|
||||
}
|
||||
|
||||
return transmuteErr(ms.GetWithModelStoreID(ctx, s, modelID, m))
|
||||
return ms.GetWithModelStoreID(ctx, s, modelID, m)
|
||||
}
|
||||
|
||||
// GetWithModelStoreID deserializes the model with the given ModelStoreID into
|
||||
@ -326,26 +329,34 @@ func (ms *ModelStore) GetWithModelStoreID(
|
||||
m model.Model,
|
||||
) error {
|
||||
if !s.Valid() {
|
||||
return errors.WithStack(errUnrecognizedSchema)
|
||||
return clues.Stack(errUnrecognizedSchema).WithClues(ctx)
|
||||
}
|
||||
|
||||
if len(id) == 0 {
|
||||
return errors.WithStack(errNoModelStoreID)
|
||||
return clues.Stack(errNoModelStoreID).WithClues(ctx)
|
||||
}
|
||||
|
||||
metadata, err := ms.c.GetManifest(ctx, id, m)
|
||||
if err != nil {
|
||||
return errors.Wrap(transmuteErr(err), "getting model data")
|
||||
if errors.Is(err, manifest.ErrNotFound) {
|
||||
err = data.ErrNotFound
|
||||
}
|
||||
|
||||
return clues.Wrap(err, "getting model data").WithClues(ctx)
|
||||
}
|
||||
|
||||
if metadata.Labels[manifest.TypeLabelKey] != s.String() {
|
||||
return errors.WithStack(errModelTypeMismatch)
|
||||
mdlbl := metadata.Labels[manifest.TypeLabelKey]
|
||||
if mdlbl != s.String() {
|
||||
return clues.Stack(errModelTypeMismatch).
|
||||
WithClues(ctx).
|
||||
WithAll("expected_label", s, "got_label", mdlbl)
|
||||
}
|
||||
|
||||
return errors.Wrap(
|
||||
ms.populateBaseModelFromMetadata(m.Base(), metadata),
|
||||
"getting model by ID",
|
||||
)
|
||||
if err := ms.populateBaseModelFromMetadata(m.Base(), metadata); err != nil {
|
||||
return clues.Wrap(err, "getting model by ID").WithClues(ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkPrevModelVersion compares the ModelType and ModelStoreID in this model
|
||||
@ -359,26 +370,31 @@ func (ms *ModelStore) checkPrevModelVersion(
|
||||
b *model.BaseModel,
|
||||
) error {
|
||||
if !s.Valid() {
|
||||
return errors.WithStack(errUnrecognizedSchema)
|
||||
return clues.Stack(errUnrecognizedSchema).WithClues(ctx)
|
||||
}
|
||||
|
||||
id, err := ms.getModelStoreID(ctx, s, b.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
return clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
// We actually got something back during our lookup.
|
||||
meta, err := ms.c.GetManifest(ctx, id, nil)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "getting previous model version")
|
||||
return clues.Wrap(err, "getting previous model version").WithClues(ctx)
|
||||
}
|
||||
|
||||
if meta.ID != b.ModelStoreID {
|
||||
return errors.New("updated model has different ModelStoreID")
|
||||
return clues.New("updated model has different ModelStoreID").
|
||||
WithClues(ctx).
|
||||
WithAll("expected_id", meta.ID, "model_store_id", b.ModelStoreID)
|
||||
}
|
||||
|
||||
if meta.Labels[manifest.TypeLabelKey] != s.String() {
|
||||
return errors.New("updated model has different model type")
|
||||
mdlbl := meta.Labels[manifest.TypeLabelKey]
|
||||
if mdlbl != s.String() {
|
||||
return clues.New("updated model has different model type").
|
||||
WithClues(ctx).
|
||||
WithAll("expected_label", s, "got_label", mdlbl)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -396,12 +412,12 @@ func (ms *ModelStore) Update(
|
||||
m model.Model,
|
||||
) error {
|
||||
if !s.Valid() {
|
||||
return errors.WithStack(errUnrecognizedSchema)
|
||||
return clues.Stack(errUnrecognizedSchema).WithClues(ctx)
|
||||
}
|
||||
|
||||
base := m.Base()
|
||||
if len(base.ModelStoreID) == 0 {
|
||||
return errors.WithStack(errNoModelStoreID)
|
||||
return clues.Stack(errNoModelStoreID).WithClues(ctx)
|
||||
}
|
||||
|
||||
base.Version = ms.modelVersion
|
||||
@ -415,8 +431,11 @@ func (ms *ModelStore) Update(
|
||||
ctx,
|
||||
ms.c,
|
||||
repo.WriteSessionOptions{Purpose: "ModelStoreUpdate"},
|
||||
func(innerCtx context.Context, w repo.RepositoryWriter) (innerErr error) {
|
||||
oldID := base.ModelStoreID
|
||||
func(innerCtx context.Context, w repo.RepositoryWriter) error {
|
||||
var (
|
||||
innerErr error
|
||||
oldID = base.ModelStoreID
|
||||
)
|
||||
|
||||
defer func() {
|
||||
if innerErr != nil {
|
||||
@ -429,19 +448,26 @@ func (ms *ModelStore) Update(
|
||||
return innerErr
|
||||
}
|
||||
|
||||
// if equal, everything worked out fine.
|
||||
// if not, we handle the cleanup below.
|
||||
if oldID == base.ModelStoreID {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If we fail at this point no changes will be made to the manifest store
|
||||
// in kopia, making it appear like nothing ever happened. At worst some
|
||||
// orphaned content blobs may be uploaded, but they should be garbage
|
||||
// collected the next time kopia maintenance is run.
|
||||
if oldID != base.ModelStoreID {
|
||||
innerErr = w.DeleteManifest(innerCtx, oldID)
|
||||
innerErr = w.DeleteManifest(innerCtx, oldID)
|
||||
if innerErr != nil {
|
||||
return clues.Stack(innerErr).WithClues(ctx)
|
||||
}
|
||||
|
||||
return innerErr
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "updating model")
|
||||
return clues.Wrap(err, "updating model").WithClues(ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -452,7 +478,7 @@ func (ms *ModelStore) Update(
|
||||
// have the same StableID.
|
||||
func (ms *ModelStore) Delete(ctx context.Context, s model.Schema, id model.StableID) error {
|
||||
if !s.Valid() {
|
||||
return errors.WithStack(errUnrecognizedSchema)
|
||||
return clues.Stack(errUnrecognizedSchema).WithClues(ctx)
|
||||
}
|
||||
|
||||
latest, err := ms.getModelStoreID(ctx, s, id)
|
||||
@ -472,26 +498,17 @@ func (ms *ModelStore) Delete(ctx context.Context, s model.Schema, id model.Stabl
|
||||
// exist.
|
||||
func (ms *ModelStore) DeleteWithModelStoreID(ctx context.Context, id manifest.ID) error {
|
||||
if len(id) == 0 {
|
||||
return errors.WithStack(errNoModelStoreID)
|
||||
return clues.Stack(errNoModelStoreID).WithClues(ctx)
|
||||
}
|
||||
|
||||
err := repo.WriteSession(
|
||||
ctx,
|
||||
ms.c,
|
||||
repo.WriteSessionOptions{Purpose: "ModelStoreDelete"},
|
||||
func(innerCtx context.Context, w repo.RepositoryWriter) error {
|
||||
return w.DeleteManifest(innerCtx, id)
|
||||
},
|
||||
)
|
||||
|
||||
return errors.Wrap(err, "deleting model")
|
||||
}
|
||||
|
||||
func transmuteErr(err error) error {
|
||||
switch {
|
||||
case errors.Is(err, manifest.ErrNotFound):
|
||||
return data.ErrNotFound
|
||||
default:
|
||||
return err
|
||||
opts := repo.WriteSessionOptions{Purpose: "ModelStoreDelete"}
|
||||
cb := func(innerCtx context.Context, w repo.RepositoryWriter) error {
|
||||
return w.DeleteManifest(innerCtx, id)
|
||||
}
|
||||
|
||||
if err := repo.WriteSession(ctx, ms.c, opts, cb); err != nil {
|
||||
return clues.Wrap(err, "deleting model").WithClues(ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ package kopia
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/s3"
|
||||
|
||||
@ -16,7 +17,7 @@ const (
|
||||
func s3BlobStorage(ctx context.Context, s storage.Storage) (blob.Storage, error) {
|
||||
cfg, err := s.S3Config()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
endpoint := defaultS3Endpoint
|
||||
@ -32,5 +33,10 @@ func s3BlobStorage(ctx context.Context, s storage.Storage) (blob.Storage, error)
|
||||
DoNotVerifyTLS: cfg.DoNotVerifyTLS,
|
||||
}
|
||||
|
||||
return s3.New(ctx, &opts, false)
|
||||
store, err := s3.New(ctx, &opts, false)
|
||||
if err != nil {
|
||||
return nil, clues.Stack(err).WithClues(ctx)
|
||||
}
|
||||
|
||||
return store, nil
|
||||
}
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"sort"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
"github.com/pkg/errors"
|
||||
@ -218,9 +219,7 @@ func fetchPrevManifests(
|
||||
found = append(found, man.Manifest)
|
||||
logger.Ctx(ctx).Infow(
|
||||
"reusing cached complete snapshot",
|
||||
"snapshot_id",
|
||||
man.ID,
|
||||
)
|
||||
"snapshot_id", man.ID)
|
||||
}
|
||||
|
||||
return found, nil
|
||||
@ -251,29 +250,19 @@ func fetchPrevSnapshotManifests(
|
||||
for _, reason := range reasons {
|
||||
logger.Ctx(ctx).Infow(
|
||||
"searching for previous manifests for reason",
|
||||
"service",
|
||||
reason.Service.String(),
|
||||
"category",
|
||||
reason.Category.String(),
|
||||
)
|
||||
"service", reason.Service.String(),
|
||||
"category", reason.Category.String())
|
||||
|
||||
found, err := fetchPrevManifests(
|
||||
ctx,
|
||||
sm,
|
||||
mans,
|
||||
reason,
|
||||
tags,
|
||||
)
|
||||
found, err := fetchPrevManifests(ctx, sm, mans, reason, tags)
|
||||
if err != nil {
|
||||
logger.Ctx(ctx).Warnw(
|
||||
"fetching previous snapshot manifests for service/category/resource owner",
|
||||
"error",
|
||||
err,
|
||||
"service",
|
||||
reason.Service.String(),
|
||||
"category",
|
||||
reason.Category.String(),
|
||||
)
|
||||
logger.Ctx(ctx).
|
||||
With(
|
||||
"err", err,
|
||||
"service", reason.Service.String(),
|
||||
"category", reason.Category.String()).
|
||||
Warnw(
|
||||
"fetching previous snapshot manifests for service/category/resource owner",
|
||||
clues.InErr(err).Slice()...)
|
||||
|
||||
// Snapshot can still complete fine, just not as efficient.
|
||||
continue
|
||||
|
||||
@ -16,7 +16,6 @@ import (
|
||||
"unsafe"
|
||||
|
||||
"github.com/alcionai/clues"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/kopia/kopia/fs"
|
||||
"github.com/kopia/kopia/fs/virtualfs"
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
@ -258,29 +257,26 @@ func collectionEntries(
|
||||
cb func(context.Context, fs.Entry) error,
|
||||
streamedEnts data.BackupCollection,
|
||||
progress *corsoProgress,
|
||||
) (map[string]struct{}, *multierror.Error) {
|
||||
) (map[string]struct{}, error) {
|
||||
if streamedEnts == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var (
|
||||
errs *multierror.Error
|
||||
// Track which items have already been seen so we can skip them if we see
|
||||
// them again in the data from the base snapshot.
|
||||
seen = map[string]struct{}{}
|
||||
items = streamedEnts.Items()
|
||||
log = logger.Ctx(ctx)
|
||||
)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
errs = multierror.Append(errs, ctx.Err())
|
||||
return seen, errs
|
||||
return seen, clues.Stack(ctx.Err()).WithClues(ctx)
|
||||
|
||||
case e, ok := <-items:
|
||||
if !ok {
|
||||
return seen, errs
|
||||
return seen, nil
|
||||
}
|
||||
|
||||
encodedName := encodeAsPath(e.UUID())
|
||||
@ -304,9 +300,9 @@ func collectionEntries(
|
||||
itemPath, err := streamedEnts.FullPath().Append(e.UUID(), true)
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "getting full item path")
|
||||
errs = multierror.Append(errs, err)
|
||||
progress.errs.Add(err)
|
||||
|
||||
log.Error(err)
|
||||
logger.Ctx(ctx).With("err", err).Errorw("getting full item path", clues.InErr(err).Slice()...)
|
||||
|
||||
continue
|
||||
}
|
||||
@ -344,13 +340,12 @@ func collectionEntries(
|
||||
entry := virtualfs.StreamingFileWithModTimeFromReader(
|
||||
encodedName,
|
||||
modTime,
|
||||
newBackupStreamReader(serializationVersion, e.ToReader()),
|
||||
)
|
||||
newBackupStreamReader(serializationVersion, e.ToReader()))
|
||||
|
||||
if err := cb(ctx, entry); err != nil {
|
||||
// Kopia's uploader swallows errors in most cases, so if we see
|
||||
// something here it's probably a big issue and we should return.
|
||||
errs = multierror.Append(errs, errors.Wrapf(err, "executing callback on %q", itemPath))
|
||||
return seen, errs
|
||||
return seen, clues.Wrap(err, "executing callback").WithClues(ctx).With("item_path", itemPath)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -456,11 +451,14 @@ func getStreamItemFunc(
|
||||
// Return static entries in this directory first.
|
||||
for _, d := range staticEnts {
|
||||
if err := cb(ctx, d); err != nil {
|
||||
return errors.Wrap(err, "executing callback on static directory")
|
||||
return clues.Wrap(err, "executing callback on static directory").WithClues(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
seen, errs := collectionEntries(ctx, cb, streamedEnts, progress)
|
||||
seen, err := collectionEntries(ctx, cb, streamedEnts, progress)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "streaming collection entries")
|
||||
}
|
||||
|
||||
if err := streamBaseEntries(
|
||||
ctx,
|
||||
@ -472,13 +470,10 @@ func getStreamItemFunc(
|
||||
globalExcludeSet,
|
||||
progress,
|
||||
); err != nil {
|
||||
errs = multierror.Append(
|
||||
errs,
|
||||
errors.Wrap(err, "streaming base snapshot entries"),
|
||||
)
|
||||
return errors.Wrap(err, "streaming base snapshot entries")
|
||||
}
|
||||
|
||||
return errs.ErrorOrNil()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -935,9 +930,7 @@ func inflateDirTree(
|
||||
|
||||
logger.Ctx(ctx).Infow(
|
||||
"merging hierarchies from base snapshots",
|
||||
"snapshot_ids",
|
||||
baseIDs,
|
||||
)
|
||||
"snapshot_ids", baseIDs)
|
||||
|
||||
for _, snap := range baseSnaps {
|
||||
if err = inflateBaseTree(ctx, loader, snap, updatedPaths, roots); err != nil {
|
||||
|
||||
@ -98,8 +98,8 @@ func Initialize(
|
||||
kopiaRef := kopia.NewConn(s)
|
||||
if err := kopiaRef.Initialize(ctx); err != nil {
|
||||
// replace common internal errors so that sdk users can check results with errors.Is()
|
||||
if kopia.IsRepoAlreadyExistsError(err) {
|
||||
return nil, clues.Stack(ErrorRepoAlreadyExists).WithClues(ctx)
|
||||
if errors.Is(err, kopia.ErrorRepoAlreadyExists) {
|
||||
return nil, clues.Stack(ErrorRepoAlreadyExists, err).WithClues(ctx)
|
||||
}
|
||||
|
||||
return nil, errors.Wrap(err, "initializing kopia")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user