Add function to open kopia (#111)
* Add function to open kopia and keep handle * Add kopia handle in Repository * Have Repository manage lifetime of KopiaWrapper * Have CLI manage lifetime of Repository
This commit is contained in:
parent
01c275707a
commit
f1550c967b
@ -1,6 +1,7 @@
|
|||||||
package repo
|
package repo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
@ -75,10 +76,12 @@ func initS3Cmd(cmd *cobra.Command, args []string) {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := repository.Initialize(cmd.Context(), a, s); err != nil {
|
r, err := repository.Connect(cmd.Context(), a, s)
|
||||||
|
if err != nil {
|
||||||
fmt.Printf("Failed to initialize a new S3 repository: %v", err)
|
fmt.Printf("Failed to initialize a new S3 repository: %v", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
defer closeRepo(cmd.Context(), r)
|
||||||
|
|
||||||
fmt.Printf("Initialized a S3 repository within bucket %s.\n", s3Cfg.Bucket)
|
fmt.Printf("Initialized a S3 repository within bucket %s.\n", s3Cfg.Bucket)
|
||||||
}
|
}
|
||||||
@ -121,10 +124,12 @@ func connectS3Cmd(cmd *cobra.Command, args []string) {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := repository.Connect(cmd.Context(), a, s); err != nil {
|
r, err := repository.Connect(cmd.Context(), a, s)
|
||||||
|
if err != nil {
|
||||||
fmt.Printf("Failed to connect to the S3 repository: %v", err)
|
fmt.Printf("Failed to connect to the S3 repository: %v", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
defer closeRepo(cmd.Context(), r)
|
||||||
|
|
||||||
fmt.Printf("Connected to S3 bucket %s.\n", s3Cfg.Bucket)
|
fmt.Printf("Connected to S3 bucket %s.\n", s3Cfg.Bucket)
|
||||||
}
|
}
|
||||||
@ -158,3 +163,9 @@ func makeS3Config() (storage.S3Config, storage.CommonConfig, error) {
|
|||||||
storage.CORSO_PASSWORD: corsoPasswd,
|
storage.CORSO_PASSWORD: corsoPasswd,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func closeRepo(ctx context.Context, r repository.Repository) {
|
||||||
|
if err := r.Close(ctx); err != nil {
|
||||||
|
fmt.Printf("Error closing repository: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -19,15 +19,16 @@ var (
|
|||||||
errConnect = errors.New("connecting repo")
|
errConnect = errors.New("connecting repo")
|
||||||
)
|
)
|
||||||
|
|
||||||
type kopiaWrapper struct {
|
type KopiaWrapper struct {
|
||||||
storage storage.Storage
|
storage storage.Storage
|
||||||
|
rep repo.Repository
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(s storage.Storage) kopiaWrapper {
|
func New(s storage.Storage) KopiaWrapper {
|
||||||
return kopiaWrapper{s}
|
return KopiaWrapper{storage: s}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kw kopiaWrapper) Initialize(ctx context.Context) error {
|
func (kw KopiaWrapper) Initialize(ctx context.Context) error {
|
||||||
bst, err := blobStoreByProvider(ctx, kw.storage)
|
bst, err := blobStoreByProvider(ctx, kw.storage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, errInit.Error())
|
return errors.Wrap(err, errInit.Error())
|
||||||
@ -55,10 +56,14 @@ func (kw kopiaWrapper) Initialize(ctx context.Context) error {
|
|||||||
return errors.Wrap(err, errConnect.Error())
|
return errors.Wrap(err, errConnect.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := kw.open(ctx, cfg.CorsoPassword); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kw kopiaWrapper) Connect(ctx context.Context) error {
|
func (kw KopiaWrapper) Connect(ctx context.Context) error {
|
||||||
bst, err := blobStoreByProvider(ctx, kw.storage)
|
bst, err := blobStoreByProvider(ctx, kw.storage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, errInit.Error())
|
return errors.Wrap(err, errInit.Error())
|
||||||
@ -80,6 +85,11 @@ func (kw kopiaWrapper) Connect(ctx context.Context) error {
|
|||||||
); err != nil {
|
); err != nil {
|
||||||
return errors.Wrap(err, errConnect.Error())
|
return errors.Wrap(err, errConnect.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := kw.open(ctx, cfg.CorsoPassword); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,3 +101,29 @@ func blobStoreByProvider(ctx context.Context, s storage.Storage) (blob.Storage,
|
|||||||
return nil, errors.New("storage provider details are required")
|
return nil, errors.New("storage provider details are required")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kw KopiaWrapper) Close(ctx context.Context) error {
|
||||||
|
if kw.rep == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err := kw.rep.Close(ctx)
|
||||||
|
kw.rep = nil
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "closing repository connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kw KopiaWrapper) open(ctx context.Context, password string) error {
|
||||||
|
// TODO(ashmrtnz): issue #75: nil here should be storage.ConnectionOptions().
|
||||||
|
rep, err := repo.Open(ctx, defaultKopiaConfigFilePath, password, nil)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "opening repository connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
kw.rep = rep
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@ -4,9 +4,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
"github.com/alcionai/corso/internal/kopia"
|
"github.com/alcionai/corso/internal/kopia"
|
||||||
"github.com/alcionai/corso/pkg/storage"
|
"github.com/alcionai/corso/pkg/storage"
|
||||||
"github.com/google/uuid"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type repoProvider int
|
type repoProvider int
|
||||||
@ -25,6 +27,7 @@ type Repository struct {
|
|||||||
|
|
||||||
Account Account // the user's m365 account connection details
|
Account Account // the user's m365 account connection details
|
||||||
Storage storage.Storage // the storage provider details and configuration
|
Storage storage.Storage // the storage provider details and configuration
|
||||||
|
dataLayer *kopia.KopiaWrapper
|
||||||
}
|
}
|
||||||
|
|
||||||
// Account holds the user's m365 account details.
|
// Account holds the user's m365 account details.
|
||||||
@ -56,6 +59,7 @@ func Initialize(
|
|||||||
Version: "v1",
|
Version: "v1",
|
||||||
Account: acct,
|
Account: acct,
|
||||||
Storage: storage,
|
Storage: storage,
|
||||||
|
dataLayer: &k,
|
||||||
}
|
}
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
@ -79,6 +83,22 @@ func Connect(
|
|||||||
Version: "v1",
|
Version: "v1",
|
||||||
Account: acct,
|
Account: acct,
|
||||||
Storage: storage,
|
Storage: storage,
|
||||||
|
dataLayer: &k,
|
||||||
}
|
}
|
||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r Repository) Close(ctx context.Context) error {
|
||||||
|
if r.dataLayer == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.dataLayer.Close(ctx)
|
||||||
|
r.dataLayer = nil
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "closing corso Repository")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@ -125,7 +125,13 @@ func (suite *RepositoryIntegrationSuite) TestInitialize() {
|
|||||||
suite.T().Run(test.prefix, func(t *testing.T) {
|
suite.T().Run(test.prefix, func(t *testing.T) {
|
||||||
st, err := test.storage()
|
st, err := test.storage()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
_, err = repository.Initialize(ctx, test.account, st)
|
r, err := repository.Initialize(ctx, test.account, st)
|
||||||
|
if err == nil {
|
||||||
|
defer func() {
|
||||||
|
assert.NoError(t, r.Close(ctx))
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
test.errCheck(t, err)
|
test.errCheck(t, err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user