diff --git a/src/internal/kopia/conn.go b/src/internal/kopia/conn.go index 1756de6ea..5a775cb4d 100644 --- a/src/internal/kopia/conn.go +++ b/src/internal/kopia/conn.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/alcionai/clues" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob" @@ -17,7 +18,6 @@ import ( "github.com/kopia/kopia/snapshot/snapshotfs" "github.com/pkg/errors" - "github.com/alcionai/corso/src/internal/common" "github.com/alcionai/corso/src/pkg/storage" ) @@ -29,11 +29,9 @@ const ( defaultSchedulingInterval = time.Second * 0 ) -const defaultConfigErrTmpl = "setting default repo config values" - var ( - errInit = errors.New("initializing repo") - errConnect = errors.New("connecting repo") + ErrSettingDefaultConfig = errors.New("setting default repo config values") + ErrorRepoAlreadyExists = errors.New("repo already exists") ) // Having all fields set to 0 causes it to keep max-int versions of snapshots. @@ -53,19 +51,6 @@ type snapshotLoader interface { SnapshotRoot(man *snapshot.Manifest) (fs.Entry, error) } -type ErrorRepoAlreadyExists struct { - common.Err -} - -func RepoAlreadyExistsError(e error) error { - return ErrorRepoAlreadyExists{*common.EncapsulateError(e)} -} - -func IsRepoAlreadyExistsError(e error) bool { - var erae ErrorRepoAlreadyExists - return errors.As(e, &erae) -} - var ( _ snapshotManager = &conn{} _ snapshotLoader = &conn{} @@ -87,22 +72,22 @@ func NewConn(s storage.Storage) *conn { func (w *conn) Initialize(ctx context.Context) error { bst, err := blobStoreByProvider(ctx, w.storage) if err != nil { - return errors.Wrap(err, errInit.Error()) + return errors.Wrap(err, "initializing storage") } defer bst.Close(ctx) cfg, err := w.storage.CommonConfig() if err != nil { - return err + return clues.Stack(err).WithClues(ctx) } // todo - issue #75: nil here should be a storage.NewRepoOptions() if err = repo.Initialize(ctx, bst, nil, cfg.CorsoPassphrase); err != nil { if errors.Is(err, repo.ErrAlreadyInitialized) { - return RepoAlreadyExistsError(err) + return clues.Stack(ErrorRepoAlreadyExists, err).WithClues(ctx) } - return errors.Wrap(err, errInit.Error()) + return clues.Wrap(err, "initialzing repo").WithClues(ctx) } return w.commonConnect( @@ -117,13 +102,13 @@ func (w *conn) Initialize(ctx context.Context) error { func (w *conn) Connect(ctx context.Context) error { bst, err := blobStoreByProvider(ctx, w.storage) if err != nil { - return errors.Wrap(err, errInit.Error()) + return errors.Wrap(err, "initializing storage") } defer bst.Close(ctx) cfg, err := w.storage.CommonConfig() if err != nil { - return err + return clues.Stack(err).WithClues(ctx) } return w.commonConnect( @@ -162,14 +147,18 @@ func (w *conn) commonConnect( password, opts, ); err != nil { - return errors.Wrap(err, errConnect.Error()) + return clues.Wrap(err, "connecting to repo").WithClues(ctx) } if err := w.open(ctx, cfgFile, password); err != nil { - return err + return clues.Stack(err).WithClues(ctx) } - return w.setDefaultConfigValues(ctx) + if err := w.setDefaultConfigValues(ctx); err != nil { + return clues.Stack(err).WithClues(ctx) + } + + return nil } func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage, error) { @@ -177,7 +166,7 @@ func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage, case storage.ProviderS3: return s3BlobStorage(ctx, s) default: - return nil, errors.New("storage provider details are required") + return nil, clues.New("storage provider details are required").WithClues(ctx) } } @@ -204,7 +193,11 @@ func (w *conn) close(ctx context.Context) error { err := w.Repository.Close(ctx) w.Repository = nil - return errors.Wrap(err, "closing repository connection") + if err != nil { + return clues.Wrap(err, "closing repository connection").WithClues(ctx) + } + + return nil } func (w *conn) open(ctx context.Context, configPath, password string) error { @@ -216,7 +209,7 @@ func (w *conn) open(ctx context.Context, configPath, password string) error { // TODO(ashmrtnz): issue #75: nil here should be storage.ConnectionOptions(). rep, err := repo.Open(ctx, configPath, password, nil) if err != nil { - return errors.Wrap(err, "opening repository connection") + return clues.Wrap(err, "opening repository connection").WithClues(ctx) } w.Repository = rep @@ -229,7 +222,7 @@ func (w *conn) wrap() error { defer w.mu.Unlock() if w.refCount == 0 { - return errors.New("conn already closed") + return clues.New("conn already closed") } w.refCount++ @@ -240,12 +233,12 @@ func (w *conn) wrap() error { func (w *conn) setDefaultConfigValues(ctx context.Context) error { p, err := w.getGlobalPolicyOrEmpty(ctx) if err != nil { - return errors.Wrap(err, defaultConfigErrTmpl) + return clues.Stack(ErrSettingDefaultConfig, err) } changed, err := updateCompressionOnPolicy(defaultCompressor, p) if err != nil { - return errors.Wrap(err, defaultConfigErrTmpl) + return clues.Stack(ErrSettingDefaultConfig, err) } if updateRetentionOnPolicy(defaultRetention, p) { @@ -260,10 +253,11 @@ func (w *conn) setDefaultConfigValues(ctx context.Context) error { return nil } - return errors.Wrap( - w.writeGlobalPolicy(ctx, "UpdateGlobalPolicyWithDefaults", p), - "updating global policy with defaults", - ) + if err := w.writeGlobalPolicy(ctx, "UpdateGlobalPolicyWithDefaults", p); err != nil { + return clues.Wrap(err, "updating global policy with defaults") + } + + return nil } // Compression attempts to set the global compression policy for the kopia repo @@ -273,7 +267,7 @@ func (w *conn) Compression(ctx context.Context, compressor string) error { // compressor was given. comp := compression.Name(compressor) if err := checkCompressor(comp); err != nil { - return err + return clues.Stack(err).WithClues(ctx) } p, err := w.getGlobalPolicyOrEmpty(ctx) @@ -283,17 +277,18 @@ func (w *conn) Compression(ctx context.Context, compressor string) error { changed, err := updateCompressionOnPolicy(compressor, p) if err != nil { - return err + return clues.Stack(err).WithClues(ctx) } if !changed { return nil } - return errors.Wrap( - w.writeGlobalPolicy(ctx, "UpdateGlobalCompressionPolicy", p), - "updating global compression policy", - ) + if err := w.writeGlobalPolicy(ctx, "UpdateGlobalCompressionPolicy", p); err != nil { + return clues.Wrap(err, "updating global compression policy") + } + + return nil } func updateCompressionOnPolicy(compressor string, p *policy.Policy) (bool, error) { @@ -349,7 +344,7 @@ func (w *conn) getPolicyOrEmpty(ctx context.Context, si snapshot.SourceInfo) (*p return &policy.Policy{}, nil } - return nil, errors.Wrapf(err, "getting backup policy for %+v", si) + return nil, clues.Wrap(err, "getting backup policy").With("source_info", si).WithClues(ctx) } return p, nil @@ -370,16 +365,22 @@ func (w *conn) writePolicy( si snapshot.SourceInfo, p *policy.Policy, ) error { - err := repo.WriteSession( - ctx, - w.Repository, - repo.WriteSessionOptions{Purpose: purpose}, - func(innerCtx context.Context, rw repo.RepositoryWriter) error { - return policy.SetPolicy(ctx, rw, si, p) - }, - ) + ctx = clues.Add(ctx, "source_info", si) - return errors.Wrapf(err, "updating policy for %+v", si) + writeOpts := repo.WriteSessionOptions{Purpose: purpose} + cb := func(innerCtx context.Context, rw repo.RepositoryWriter) error { + if err := policy.SetPolicy(ctx, rw, si, p); err != nil { + return clues.Stack(err).WithClues(innerCtx) + } + + return nil + } + + if err := repo.WriteSession(ctx, w.Repository, writeOpts, cb); err != nil { + return clues.Wrap(err, "updating policy").WithClues(ctx) + } + + return nil } func checkCompressor(compressor compression.Name) error { @@ -389,14 +390,19 @@ func checkCompressor(compressor compression.Name) error { } } - return errors.Errorf("unknown compressor type %s", compressor) + return clues.Stack(clues.New("unknown compressor type"), clues.New(string(compressor))) } func (w *conn) LoadSnapshots( ctx context.Context, ids []manifest.ID, ) ([]*snapshot.Manifest, error) { - return snapshot.LoadSnapshots(ctx, w.Repository, ids) + mans, err := snapshot.LoadSnapshots(ctx, w.Repository, ids) + if err != nil { + return nil, clues.Stack(err).WithClues(ctx) + } + + return mans, nil } func (w *conn) SnapshotRoot(man *snapshot.Manifest) (fs.Entry, error) { diff --git a/src/internal/kopia/conn_test.go b/src/internal/kopia/conn_test.go index 05ff5e110..2b2aaee5c 100644 --- a/src/internal/kopia/conn_test.go +++ b/src/internal/kopia/conn_test.go @@ -85,7 +85,7 @@ func (suite *WrapperIntegrationSuite) TestRepoExistsError() { err := k.Initialize(ctx) assert.Error(t, err) - assert.True(t, IsRepoAlreadyExistsError(err)) + assert.ErrorIs(t, err, ErrorRepoAlreadyExists) } func (suite *WrapperIntegrationSuite) TestBadProviderErrors() { diff --git a/src/internal/kopia/model_store.go b/src/internal/kopia/model_store.go index 2c6661d22..58111dfa9 100644 --- a/src/internal/kopia/model_store.go +++ b/src/internal/kopia/model_store.go @@ -4,6 +4,7 @@ import ( "context" "strconv" + "github.com/alcionai/clues" "github.com/google/uuid" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/manifest" @@ -59,7 +60,7 @@ func (ms *ModelStore) Close(ctx context.Context) error { // bad model type is given. func tagsForModel(s model.Schema, tags map[string]string) (map[string]string, error) { if _, ok := tags[manifest.TypeLabelKey]; ok { - return nil, errors.WithStack(errBadTagKey) + return nil, clues.Stack(errBadTagKey) } res := make(map[string]string, len(tags)+1) @@ -80,11 +81,11 @@ func tagsForModelWithID( tags map[string]string, ) (map[string]string, error) { if !s.Valid() { - return nil, errors.WithStack(errUnrecognizedSchema) + return nil, clues.Stack(errUnrecognizedSchema) } if len(id) == 0 { - return nil, errors.WithStack(errNoStableID) + return nil, clues.Stack(errNoStableID) } res, err := tagsForModel(s, tags) @@ -93,13 +94,13 @@ func tagsForModelWithID( } if _, ok := res[stableIDKey]; ok { - return nil, errors.WithStack(errBadTagKey) + return nil, clues.Stack(errBadTagKey) } res[stableIDKey] = string(id) if _, ok := res[modelVersionKey]; ok { - return nil, errors.WithStack(errBadTagKey) + return nil, clues.Stack(errBadTagKey) } res[modelVersionKey] = strconv.Itoa(version) @@ -117,7 +118,7 @@ func putInner( create bool, ) error { if !s.Valid() { - return errors.WithStack(errUnrecognizedSchema) + return clues.Stack(errUnrecognizedSchema).WithClues(ctx) } base := m.Base() @@ -128,13 +129,13 @@ func putInner( tmpTags, err := tagsForModelWithID(s, base.ID, base.Version, base.Tags) if err != nil { // Will be wrapped at a higher layer. - return err + return clues.Stack(err).WithClues(ctx) } id, err := w.PutManifest(ctx, tmpTags, m) if err != nil { // Will be wrapped at a higher layer. - return err + return clues.Stack(err).WithClues(ctx) } base.ModelStoreID = id @@ -150,7 +151,7 @@ func (ms *ModelStore) Put( m model.Model, ) error { if !s.Valid() { - return errors.WithStack(errUnrecognizedSchema) + return clues.Stack(errUnrecognizedSchema) } m.Base().Version = ms.modelVersion @@ -162,14 +163,16 @@ func (ms *ModelStore) Put( func(innerCtx context.Context, w repo.RepositoryWriter) error { err := putInner(innerCtx, w, s, m, true) if err != nil { - return err + return clues.Stack(err).WithClues(innerCtx) } return nil - }, - ) + }) + if err != nil { + return clues.Wrap(err, "putting model").WithClues(ctx) + } - return errors.Wrap(err, "putting model") + return nil } func stripHiddenTags(tags map[string]string) { @@ -184,7 +187,7 @@ func (ms ModelStore) populateBaseModelFromMetadata( ) error { id, ok := m.Labels[stableIDKey] if !ok { - return errors.WithStack(errNoStableID) + return clues.Stack(errNoStableID) } v, err := strconv.Atoi(m.Labels[modelVersionKey]) @@ -193,7 +196,7 @@ func (ms ModelStore) populateBaseModelFromMetadata( } if v != ms.modelVersion { - return errors.Errorf("bad model version %s", m.Labels[modelVersionKey]) + return clues.Wrap(clues.New(m.Labels[modelVersionKey]), "bad model version") } base.ModelStoreID = m.ID @@ -211,7 +214,7 @@ func (ms ModelStore) baseModelFromMetadata( ) (*model.BaseModel, error) { res := &model.BaseModel{} if err := ms.populateBaseModelFromMetadata(res, m); err != nil { - return nil, err + return nil, clues.Stack(err).WithAll("metadata_id", m.ID, "metadata_modtime", m.ModTime) } return res, nil @@ -226,21 +229,21 @@ func (ms *ModelStore) GetIDsForType( tags map[string]string, ) ([]*model.BaseModel, error) { if !s.Valid() { - return nil, errors.WithStack(errUnrecognizedSchema) + return nil, clues.Stack(errUnrecognizedSchema).WithClues(ctx) } if _, ok := tags[stableIDKey]; ok { - return nil, errors.WithStack(errBadTagKey) + return nil, clues.Stack(errBadTagKey).WithClues(ctx) } tmpTags, err := tagsForModel(s, tags) if err != nil { - return nil, errors.Wrap(err, "getting model metadata") + return nil, clues.Wrap(err, "getting model metadata").WithClues(ctx) } metadata, err := ms.c.FindManifests(ctx, tmpTags) if err != nil { - return nil, errors.Wrap(err, "getting model metadata") + return nil, clues.Wrap(err, "getting model metadata").WithClues(ctx) } res := make([]*model.BaseModel, 0, len(metadata)) @@ -248,7 +251,7 @@ func (ms *ModelStore) GetIDsForType( for _, m := range metadata { bm, err := ms.baseModelFromMetadata(m) if err != nil { - return nil, errors.Wrap(err, "parsing model metadata") + return nil, clues.Wrap(err, "parsing model metadata").WithClues(ctx) } res = append(res, bm) @@ -266,30 +269,30 @@ func (ms *ModelStore) getModelStoreID( id model.StableID, ) (manifest.ID, error) { if !s.Valid() { - return "", errors.WithStack(errUnrecognizedSchema) + return "", clues.Stack(errUnrecognizedSchema).WithClues(ctx) } if len(id) == 0 { - return "", errors.WithStack(errNoStableID) + return "", clues.Stack(errNoStableID).WithClues(ctx) } tags := map[string]string{stableIDKey: string(id)} metadata, err := ms.c.FindManifests(ctx, tags) if err != nil { - return "", errors.Wrap(err, "getting ModelStoreID") + return "", clues.Wrap(err, "getting ModelStoreID").WithClues(ctx) } if len(metadata) == 0 { - return "", errors.Wrap(data.ErrNotFound, "getting ModelStoreID") + return "", clues.Wrap(data.ErrNotFound, "getting ModelStoreID").WithClues(ctx) } if len(metadata) != 1 { - return "", errors.New("multiple models with same StableID") + return "", clues.New("multiple models with same StableID").WithClues(ctx) } if metadata[0].Labels[manifest.TypeLabelKey] != s.String() { - return "", errors.WithStack(errModelTypeMismatch) + return "", clues.Stack(errModelTypeMismatch).WithClues(ctx) } return metadata[0].ID, nil @@ -305,7 +308,7 @@ func (ms *ModelStore) Get( m model.Model, ) error { if !s.Valid() { - return errors.WithStack(errUnrecognizedSchema) + return clues.Stack(errUnrecognizedSchema).WithClues(ctx) } modelID, err := ms.getModelStoreID(ctx, s, id) @@ -313,7 +316,7 @@ func (ms *ModelStore) Get( return err } - return transmuteErr(ms.GetWithModelStoreID(ctx, s, modelID, m)) + return ms.GetWithModelStoreID(ctx, s, modelID, m) } // GetWithModelStoreID deserializes the model with the given ModelStoreID into @@ -326,26 +329,34 @@ func (ms *ModelStore) GetWithModelStoreID( m model.Model, ) error { if !s.Valid() { - return errors.WithStack(errUnrecognizedSchema) + return clues.Stack(errUnrecognizedSchema).WithClues(ctx) } if len(id) == 0 { - return errors.WithStack(errNoModelStoreID) + return clues.Stack(errNoModelStoreID).WithClues(ctx) } metadata, err := ms.c.GetManifest(ctx, id, m) if err != nil { - return errors.Wrap(transmuteErr(err), "getting model data") + if errors.Is(err, manifest.ErrNotFound) { + err = data.ErrNotFound + } + + return clues.Wrap(err, "getting model data").WithClues(ctx) } - if metadata.Labels[manifest.TypeLabelKey] != s.String() { - return errors.WithStack(errModelTypeMismatch) + mdlbl := metadata.Labels[manifest.TypeLabelKey] + if mdlbl != s.String() { + return clues.Stack(errModelTypeMismatch). + WithClues(ctx). + WithAll("expected_label", s, "got_label", mdlbl) } - return errors.Wrap( - ms.populateBaseModelFromMetadata(m.Base(), metadata), - "getting model by ID", - ) + if err := ms.populateBaseModelFromMetadata(m.Base(), metadata); err != nil { + return clues.Wrap(err, "getting model by ID").WithClues(ctx) + } + + return nil } // checkPrevModelVersion compares the ModelType and ModelStoreID in this model @@ -359,26 +370,31 @@ func (ms *ModelStore) checkPrevModelVersion( b *model.BaseModel, ) error { if !s.Valid() { - return errors.WithStack(errUnrecognizedSchema) + return clues.Stack(errUnrecognizedSchema).WithClues(ctx) } id, err := ms.getModelStoreID(ctx, s, b.ID) if err != nil { - return err + return clues.Stack(err).WithClues(ctx) } // We actually got something back during our lookup. meta, err := ms.c.GetManifest(ctx, id, nil) if err != nil { - return errors.Wrap(err, "getting previous model version") + return clues.Wrap(err, "getting previous model version").WithClues(ctx) } if meta.ID != b.ModelStoreID { - return errors.New("updated model has different ModelStoreID") + return clues.New("updated model has different ModelStoreID"). + WithClues(ctx). + WithAll("expected_id", meta.ID, "model_store_id", b.ModelStoreID) } - if meta.Labels[manifest.TypeLabelKey] != s.String() { - return errors.New("updated model has different model type") + mdlbl := meta.Labels[manifest.TypeLabelKey] + if mdlbl != s.String() { + return clues.New("updated model has different model type"). + WithClues(ctx). + WithAll("expected_label", s, "got_label", mdlbl) } return nil @@ -396,12 +412,12 @@ func (ms *ModelStore) Update( m model.Model, ) error { if !s.Valid() { - return errors.WithStack(errUnrecognizedSchema) + return clues.Stack(errUnrecognizedSchema).WithClues(ctx) } base := m.Base() if len(base.ModelStoreID) == 0 { - return errors.WithStack(errNoModelStoreID) + return clues.Stack(errNoModelStoreID).WithClues(ctx) } base.Version = ms.modelVersion @@ -415,8 +431,11 @@ func (ms *ModelStore) Update( ctx, ms.c, repo.WriteSessionOptions{Purpose: "ModelStoreUpdate"}, - func(innerCtx context.Context, w repo.RepositoryWriter) (innerErr error) { - oldID := base.ModelStoreID + func(innerCtx context.Context, w repo.RepositoryWriter) error { + var ( + innerErr error + oldID = base.ModelStoreID + ) defer func() { if innerErr != nil { @@ -429,19 +448,26 @@ func (ms *ModelStore) Update( return innerErr } + // if equal, everything worked out fine. + // if not, we handle the cleanup below. + if oldID == base.ModelStoreID { + return nil + } + // If we fail at this point no changes will be made to the manifest store // in kopia, making it appear like nothing ever happened. At worst some // orphaned content blobs may be uploaded, but they should be garbage // collected the next time kopia maintenance is run. - if oldID != base.ModelStoreID { - innerErr = w.DeleteManifest(innerCtx, oldID) + innerErr = w.DeleteManifest(innerCtx, oldID) + if innerErr != nil { + return clues.Stack(innerErr).WithClues(ctx) } - return innerErr + return nil }, ) if err != nil { - return errors.Wrap(err, "updating model") + return clues.Wrap(err, "updating model").WithClues(ctx) } return nil @@ -452,7 +478,7 @@ func (ms *ModelStore) Update( // have the same StableID. func (ms *ModelStore) Delete(ctx context.Context, s model.Schema, id model.StableID) error { if !s.Valid() { - return errors.WithStack(errUnrecognizedSchema) + return clues.Stack(errUnrecognizedSchema).WithClues(ctx) } latest, err := ms.getModelStoreID(ctx, s, id) @@ -472,26 +498,17 @@ func (ms *ModelStore) Delete(ctx context.Context, s model.Schema, id model.Stabl // exist. func (ms *ModelStore) DeleteWithModelStoreID(ctx context.Context, id manifest.ID) error { if len(id) == 0 { - return errors.WithStack(errNoModelStoreID) + return clues.Stack(errNoModelStoreID).WithClues(ctx) } - err := repo.WriteSession( - ctx, - ms.c, - repo.WriteSessionOptions{Purpose: "ModelStoreDelete"}, - func(innerCtx context.Context, w repo.RepositoryWriter) error { - return w.DeleteManifest(innerCtx, id) - }, - ) - - return errors.Wrap(err, "deleting model") -} - -func transmuteErr(err error) error { - switch { - case errors.Is(err, manifest.ErrNotFound): - return data.ErrNotFound - default: - return err + opts := repo.WriteSessionOptions{Purpose: "ModelStoreDelete"} + cb := func(innerCtx context.Context, w repo.RepositoryWriter) error { + return w.DeleteManifest(innerCtx, id) } + + if err := repo.WriteSession(ctx, ms.c, opts, cb); err != nil { + return clues.Wrap(err, "deleting model").WithClues(ctx) + } + + return nil } diff --git a/src/internal/kopia/s3.go b/src/internal/kopia/s3.go index 3d0f3144e..5810487dc 100644 --- a/src/internal/kopia/s3.go +++ b/src/internal/kopia/s3.go @@ -3,6 +3,7 @@ package kopia import ( "context" + "github.com/alcionai/clues" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/blob/s3" @@ -16,7 +17,7 @@ const ( func s3BlobStorage(ctx context.Context, s storage.Storage) (blob.Storage, error) { cfg, err := s.S3Config() if err != nil { - return nil, err + return nil, clues.Stack(err).WithClues(ctx) } endpoint := defaultS3Endpoint @@ -32,5 +33,10 @@ func s3BlobStorage(ctx context.Context, s storage.Storage) (blob.Storage, error) DoNotVerifyTLS: cfg.DoNotVerifyTLS, } - return s3.New(ctx, &opts, false) + store, err := s3.New(ctx, &opts, false) + if err != nil { + return nil, clues.Stack(err).WithClues(ctx) + } + + return store, nil } diff --git a/src/internal/kopia/snapshot_manager.go b/src/internal/kopia/snapshot_manager.go index f15c8ae99..45b1ffabe 100644 --- a/src/internal/kopia/snapshot_manager.go +++ b/src/internal/kopia/snapshot_manager.go @@ -4,6 +4,7 @@ import ( "context" "sort" + "github.com/alcionai/clues" "github.com/kopia/kopia/repo/manifest" "github.com/kopia/kopia/snapshot" "github.com/pkg/errors" @@ -218,9 +219,7 @@ func fetchPrevManifests( found = append(found, man.Manifest) logger.Ctx(ctx).Infow( "reusing cached complete snapshot", - "snapshot_id", - man.ID, - ) + "snapshot_id", man.ID) } return found, nil @@ -251,29 +250,19 @@ func fetchPrevSnapshotManifests( for _, reason := range reasons { logger.Ctx(ctx).Infow( "searching for previous manifests for reason", - "service", - reason.Service.String(), - "category", - reason.Category.String(), - ) + "service", reason.Service.String(), + "category", reason.Category.String()) - found, err := fetchPrevManifests( - ctx, - sm, - mans, - reason, - tags, - ) + found, err := fetchPrevManifests(ctx, sm, mans, reason, tags) if err != nil { - logger.Ctx(ctx).Warnw( - "fetching previous snapshot manifests for service/category/resource owner", - "error", - err, - "service", - reason.Service.String(), - "category", - reason.Category.String(), - ) + logger.Ctx(ctx). + With( + "err", err, + "service", reason.Service.String(), + "category", reason.Category.String()). + Warnw( + "fetching previous snapshot manifests for service/category/resource owner", + clues.InErr(err).Slice()...) // Snapshot can still complete fine, just not as efficient. continue diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index a4ae1fbcc..e25e4ed0f 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -16,7 +16,6 @@ import ( "unsafe" "github.com/alcionai/clues" - "github.com/hashicorp/go-multierror" "github.com/kopia/kopia/fs" "github.com/kopia/kopia/fs/virtualfs" "github.com/kopia/kopia/repo/manifest" @@ -258,29 +257,26 @@ func collectionEntries( cb func(context.Context, fs.Entry) error, streamedEnts data.BackupCollection, progress *corsoProgress, -) (map[string]struct{}, *multierror.Error) { +) (map[string]struct{}, error) { if streamedEnts == nil { return nil, nil } var ( - errs *multierror.Error // Track which items have already been seen so we can skip them if we see // them again in the data from the base snapshot. seen = map[string]struct{}{} items = streamedEnts.Items() - log = logger.Ctx(ctx) ) for { select { case <-ctx.Done(): - errs = multierror.Append(errs, ctx.Err()) - return seen, errs + return seen, clues.Stack(ctx.Err()).WithClues(ctx) case e, ok := <-items: if !ok { - return seen, errs + return seen, nil } encodedName := encodeAsPath(e.UUID()) @@ -304,9 +300,9 @@ func collectionEntries( itemPath, err := streamedEnts.FullPath().Append(e.UUID(), true) if err != nil { err = errors.Wrap(err, "getting full item path") - errs = multierror.Append(errs, err) + progress.errs.Add(err) - log.Error(err) + logger.Ctx(ctx).With("err", err).Errorw("getting full item path", clues.InErr(err).Slice()...) continue } @@ -344,13 +340,12 @@ func collectionEntries( entry := virtualfs.StreamingFileWithModTimeFromReader( encodedName, modTime, - newBackupStreamReader(serializationVersion, e.ToReader()), - ) + newBackupStreamReader(serializationVersion, e.ToReader())) + if err := cb(ctx, entry); err != nil { // Kopia's uploader swallows errors in most cases, so if we see // something here it's probably a big issue and we should return. - errs = multierror.Append(errs, errors.Wrapf(err, "executing callback on %q", itemPath)) - return seen, errs + return seen, clues.Wrap(err, "executing callback").WithClues(ctx).With("item_path", itemPath) } } } @@ -456,11 +451,14 @@ func getStreamItemFunc( // Return static entries in this directory first. for _, d := range staticEnts { if err := cb(ctx, d); err != nil { - return errors.Wrap(err, "executing callback on static directory") + return clues.Wrap(err, "executing callback on static directory").WithClues(ctx) } } - seen, errs := collectionEntries(ctx, cb, streamedEnts, progress) + seen, err := collectionEntries(ctx, cb, streamedEnts, progress) + if err != nil { + return errors.Wrap(err, "streaming collection entries") + } if err := streamBaseEntries( ctx, @@ -472,13 +470,10 @@ func getStreamItemFunc( globalExcludeSet, progress, ); err != nil { - errs = multierror.Append( - errs, - errors.Wrap(err, "streaming base snapshot entries"), - ) + return errors.Wrap(err, "streaming base snapshot entries") } - return errs.ErrorOrNil() + return nil } } @@ -935,9 +930,7 @@ func inflateDirTree( logger.Ctx(ctx).Infow( "merging hierarchies from base snapshots", - "snapshot_ids", - baseIDs, - ) + "snapshot_ids", baseIDs) for _, snap := range baseSnaps { if err = inflateBaseTree(ctx, loader, snap, updatedPaths, roots); err != nil { diff --git a/src/pkg/repository/repository.go b/src/pkg/repository/repository.go index 518685134..da715f61e 100644 --- a/src/pkg/repository/repository.go +++ b/src/pkg/repository/repository.go @@ -98,8 +98,8 @@ func Initialize( kopiaRef := kopia.NewConn(s) if err := kopiaRef.Initialize(ctx); err != nil { // replace common internal errors so that sdk users can check results with errors.Is() - if kopia.IsRepoAlreadyExistsError(err) { - return nil, clues.Stack(ErrorRepoAlreadyExists).WithClues(ctx) + if errors.Is(err, kopia.ErrorRepoAlreadyExists) { + return nil, clues.Stack(ErrorRepoAlreadyExists, err).WithClues(ctx) } return nil, errors.Wrap(err, "initializing kopia")