diff --git a/src/internal/kopia/conn.go b/src/internal/kopia/conn.go index 887cd01ed..a2e91f096 100644 --- a/src/internal/kopia/conn.go +++ b/src/internal/kopia/conn.go @@ -71,6 +71,7 @@ func (w *conn) Initialize(ctx context.Context) error { if errors.Is(err, repo.ErrAlreadyInitialized) { return RepoAlreadyExistsError(err) } + return errors.Wrap(err, errInit.Error()) } @@ -189,6 +190,7 @@ func (w *conn) open(ctx context.Context, configPath, password string) error { } w.Repository = rep + return nil } @@ -201,6 +203,7 @@ func (w *conn) wrap() error { } w.refCount++ + return nil } diff --git a/src/internal/kopia/conn_test.go b/src/internal/kopia/conn_test.go index 6ec13789a..e041dd736 100644 --- a/src/internal/kopia/conn_test.go +++ b/src/internal/kopia/conn_test.go @@ -38,8 +38,8 @@ func TestWrapperUnitSuite(t *testing.T) { func (suite *WrapperUnitSuite) TestCloseWithoutOpenDoesNotCrash() { ctx := context.Background() - k := conn{} + assert.NotPanics(suite.T(), func() { k.Close(ctx) }) diff --git a/src/internal/kopia/data_collection.go b/src/internal/kopia/data_collection.go index 12900e54a..8b0236832 100644 --- a/src/internal/kopia/data_collection.go +++ b/src/internal/kopia/data_collection.go @@ -18,6 +18,7 @@ type kopiaDataCollection struct { func (kdc *kopiaDataCollection) Items() <-chan data.Stream { res := make(chan data.Stream) + go func() { defer close(res) diff --git a/src/internal/kopia/model_store.go b/src/internal/kopia/model_store.go index 36727ab42..666baefe0 100644 --- a/src/internal/kopia/model_store.go +++ b/src/internal/kopia/model_store.go @@ -25,6 +25,7 @@ func NewModelStore(c *conn) (*ModelStore, error) { if err := c.wrap(); err != nil { return nil, errors.Wrap(err, "creating ModelStore") } + return &ModelStore{c: c}, nil } @@ -54,6 +55,7 @@ func tagsForModel(s model.Schema, tags map[string]string) (map[string]string, er res := make(map[string]string, len(tags)+1) res[manifest.TypeLabelKey] = s.String() + for k, v := range tags { res[k] = v } @@ -122,6 +124,7 @@ func putInner( } base.ModelStoreID = id + return nil } @@ -135,6 +138,7 @@ func (ms *ModelStore) Put( if !s.Valid() { return errors.WithStack(errUnrecognizedSchema) } + err := repo.WriteSession( ctx, ms.c, @@ -170,6 +174,7 @@ func baseModelFromMetadata(m *manifest.EntryMetadata) (*model.BaseModel, error) } stripHiddenTags(res.Tags) + return res, nil } @@ -200,6 +205,7 @@ func (ms *ModelStore) GetIDsForType( } res := make([]*model.BaseModel, 0, len(metadata)) + for _, m := range metadata { bm, err := baseModelFromMetadata(m) if err != nil { @@ -230,6 +236,7 @@ func (ms *ModelStore) getModelStoreID( } tags := map[string]string{stableIDKey: string(id)} + metadata, err := ms.c.FindManifests(ctx, tags) if err != nil { return "", errors.Wrap(err, "getting ModelStoreID") @@ -238,9 +245,11 @@ func (ms *ModelStore) getModelStoreID( if len(metadata) == 0 { return "", errors.Wrap(manifest.ErrNotFound, "getting ModelStoreID") } + if len(metadata) != 1 { return "", errors.New("multiple models with same StableID") } + if metadata[0].Labels[manifest.TypeLabelKey] != s.String() { return "", errors.WithStack(errModelTypeMismatch) } @@ -303,6 +312,7 @@ func (ms *ModelStore) GetWithModelStoreID( base.Tags = metadata.Labels stripHiddenTags(base.Tags) base.ModelStoreID = id + return nil } @@ -334,6 +344,7 @@ func (ms *ModelStore) checkPrevModelVersion( if meta.ID != b.ModelStoreID { return errors.New("updated model has different ModelStoreID") } + if meta.Labels[manifest.TypeLabelKey] != s.String() { return errors.New("updated model has different model type") } diff --git a/src/internal/kopia/s3.go b/src/internal/kopia/s3.go index 556bee52c..31190895e 100644 --- a/src/internal/kopia/s3.go +++ b/src/internal/kopia/s3.go @@ -18,10 +18,12 @@ func s3BlobStorage(ctx context.Context, s storage.Storage) (blob.Storage, error) if err != nil { return nil, err } + endpoint := defaultS3Endpoint if len(cfg.Endpoint) > 0 { endpoint = cfg.Endpoint } + opts := s3.Options{ AccessKeyID: cfg.AccessKey, BucketName: cfg.Bucket, @@ -30,5 +32,6 @@ func s3BlobStorage(ctx context.Context, s storage.Storage) (blob.Storage, error) SecretAccessKey: cfg.SecretKey, SessionToken: cfg.SessionToken, } + return s3.New(ctx, &opts) } diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 947ae9a34..306592e62 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -107,6 +107,7 @@ func NewWrapper(c *conn) (*Wrapper, error) { if err := c.wrap(); err != nil { return nil, errors.Wrap(err, "creating Wrapper") } + return &Wrapper{c}, nil } @@ -151,6 +152,7 @@ func getStreamItemFunc( } items := streamedEnts.Items() + for { select { case <-ctx.Done(): @@ -293,6 +295,7 @@ func inflateDirTree( } var res fs.Directory + for dirName, dir := range roots { tmp, err := buildKopiaDirs(dirName, dir, progress) if err != nil { @@ -397,6 +400,7 @@ func (w Wrapper) makeSnapshotWithRoot( } res := manifestToStats(man) + return &res, nil } @@ -528,9 +532,10 @@ func walkDirectory( ctx context.Context, dir fs.Directory, ) ([]fs.File, []fs.Directory, *multierror.Error) { + var errs *multierror.Error + files := []fs.File{} dirs := []fs.Directory{} - var errs *multierror.Error err := dir.IterateEntries(ctx, func(innerCtx context.Context, e fs.Entry) error { // Early exit on context cancel. @@ -569,10 +574,11 @@ func restoreSubtree( dir fs.Directory, relativePath []string, ) ([]data.Collection, *multierror.Error) { + var errs *multierror.Error + collections := []data.Collection{} // Want a local copy of relativePath with our new element. fullPath := append(append([]string{}, relativePath...), dir.Name()) - var errs *multierror.Error files, dirs, err := walkDirectory(ctx, dir) if err != nil { @@ -594,8 +600,10 @@ func restoreSubtree( fileFullPath := path.Join(append(append([]string{}, fullPath...), f.Name())...) errs = multierror.Append( errs, errors.Wrapf(err, "getting reader for file %q", fileFullPath)) + logger.Ctx(ctx).Errorw( "unable to get file reader; skipping", "path", fileFullPath) + continue } @@ -656,6 +664,7 @@ func (w Wrapper) RestoreMultipleItems( dcs = []data.Collection{} errs *multierror.Error ) + for _, itemPath := range paths { dc, err := w.RestoreSingleItem(ctx, snapshotID, itemPath) if err != nil { @@ -664,6 +673,7 @@ func (w Wrapper) RestoreMultipleItems( dcs = append(dcs, dc) } } + return dcs, errs.ErrorOrNil() } diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index 70b388657..69de2e45e 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -67,6 +67,7 @@ func testForFiles( collections []data.Collection, ) { count := 0 + for _, c := range collections { for s := range c.Items() { count++ @@ -228,6 +229,7 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree() { entries, err := fs.GetAllEntries(ctx, dirTree) require.NoError(suite.T(), err) + names := entriesToNames(entries) assert.Len(suite.T(), names, 2) assert.Contains(suite.T(), names, user1) @@ -261,7 +263,6 @@ func (suite *KopiaUnitSuite) TestBuildDirectoryTree_NoAncestorDirs() { ctx := context.Background() emails := "emails" - expectedFileCount := 42 progress := &corsoProgress{pending: map[string]*itemDetails{}} @@ -537,8 +538,10 @@ func (suite *KopiaIntegrationSuite) SetupSuite() { func (suite *KopiaIntegrationSuite) SetupTest() { t := suite.T() suite.ctx = context.Background() + c, err := openKopiaRepo(t, suite.ctx) require.NoError(t, err) + suite.w = &Wrapper{c} } @@ -760,6 +763,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { for k, v := range suite.inboxExpectedFiles { suite.allExpectedFiles[k] = v } + for k, v := range suite.archiveExpectedFiles { suite.allExpectedFiles[k] = v } @@ -782,6 +786,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupAndRestoreSingleItem() { assert.Equal(t, c.FullPath(), testPath) count := 0 + for resultStream := range c.Items() { buf, err := ioutil.ReadAll(resultStream.ToReader()) require.NoError(t, err) @@ -925,6 +930,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestRestoreMultipleItems() { p2 := []string{tid, "uid2", "emails", "fid"} dc1 := mockconnector.NewMockExchangeCollection(p1, 1) dc2 := mockconnector.NewMockExchangeCollection(p2, 1) + fp1 := append(p1, dc1.Names[0]) fp2 := append(p2, dc2.Names[0])