split repo create and init/cnn (#4343)

splits the functionality for creating a new repository struct, and initializing or connecting to it.  This is the first step in a multi-step improvement for init and connect processing.

---

#### Does this PR need a docs update or release note?

- [x]  No

#### Type of change

- [x] 🧹 Tech Debt/Cleanup

#### Issue(s)

* #2025

#### Test Plan

- [x]  Unit test
- [x] 💚 E2E
This commit is contained in:
Keepers 2023-09-25 17:48:53 -06:00 committed by GitHub
parent cf3744bcfb
commit 365078a861
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 249 additions and 232 deletions

View File

@ -165,7 +165,7 @@ var defaultSelectorConfig = selectors.Config{OnlyMatchItemNames: true}
func runBackups( func runBackups(
ctx context.Context, ctx context.Context,
r repository.Repository, r repository.Repositoryer,
serviceName string, serviceName string,
selectorSet []selectors.Selector, selectorSet []selectors.Selector,
ins idname.Cacher, ins idname.Cacher,
@ -342,7 +342,7 @@ func ifShow(flag string) bool {
return strings.ToLower(strings.TrimSpace(flag)) == "show" return strings.ToLower(strings.TrimSpace(flag)) == "show"
} }
func printBackupStats(ctx context.Context, r repository.Repository, bid string) { func printBackupStats(ctx context.Context, r repository.Repositoryer, bid string) {
b, err := r.Backup(ctx, bid) b, err := r.Backup(ctx, bid)
if err != nil { if err != nil {
logger.CtxErr(ctx, err).Error("finding backup immediately after backup operation completion") logger.CtxErr(ctx, err).Error("finding backup immediately after backup operation completion")

View File

@ -124,7 +124,7 @@ func newIntegrationTesterSetup(t *testing.T) intgTesterSetup {
type dependencies struct { type dependencies struct {
st storage.Storage st storage.Storage
repo repository.Repository repo repository.Repositoryer
vpr *viper.Viper vpr *viper.Viper
recorder strings.Builder recorder strings.Builder
configFilePath string configFilePath string
@ -154,12 +154,15 @@ func prepM365Test(
vpr, cfgFP := tconfig.MakeTempTestConfigClone(t, force) vpr, cfgFP := tconfig.MakeTempTestConfigClone(t, force)
ctx = config.SetViper(ctx, vpr) ctx = config.SetViper(ctx, vpr)
repo, err := repository.Initialize( repo, err := repository.New(
ctx, ctx,
acct, acct,
st, st,
control.DefaultOptions(), control.DefaultOptions(),
ctrlRepo.Retention{}) repository.NewRepoID)
require.NoError(t, err, clues.ToCore(err))
err = repo.Initialize(ctx, ctrlRepo.Retention{})
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return dependencies{ return dependencies{

View File

@ -108,13 +108,17 @@ func initFilesystemCmd(cmd *cobra.Command, args []string) error {
return Only(ctx, clues.Wrap(err, "Failed to parse m365 account config")) return Only(ctx, clues.Wrap(err, "Failed to parse m365 account config"))
} }
r, err := repository.Initialize( r, err := repository.New(
ctx, ctx,
cfg.Account, cfg.Account,
cfg.Storage, cfg.Storage,
opt, opt,
retention) repository.NewRepoID)
if err != nil { if err != nil {
return Only(ctx, clues.Wrap(err, "Failed to construct the repository controller"))
}
if err = r.Initialize(ctx, retention); err != nil {
if flags.SucceedIfExistsFV && errors.Is(err, repository.ErrorRepoAlreadyExists) { if flags.SucceedIfExistsFV && errors.Is(err, repository.ErrorRepoAlreadyExists) {
return nil return nil
} }
@ -191,13 +195,17 @@ func connectFilesystemCmd(cmd *cobra.Command, args []string) error {
opts := utils.ControlWithConfig(cfg) opts := utils.ControlWithConfig(cfg)
r, err := repository.ConnectAndSendConnectEvent( r, err := repository.New(
ctx, ctx,
cfg.Account, cfg.Account,
cfg.Storage, cfg.Storage,
repoID, opts,
opts) repoID)
if err != nil { if err != nil {
return Only(ctx, clues.Wrap(err, "Failed to create a repository controller"))
}
if err := r.Connect(ctx); err != nil {
return Only(ctx, clues.Wrap(err, "Failed to connect to the filesystem repository")) return Only(ctx, clues.Wrap(err, "Failed to connect to the filesystem repository"))
} }

View File

@ -132,12 +132,15 @@ func (suite *FilesystemE2ESuite) TestConnectFilesystemCmd() {
ctx = config.SetViper(ctx, vpr) ctx = config.SetViper(ctx, vpr)
// init the repo first // init the repo first
_, err = repository.Initialize( r, err := repository.New(
ctx, ctx,
account.Account{}, account.Account{},
st, st,
control.DefaultOptions(), control.DefaultOptions(),
ctrlRepo.Retention{}) repository.NewRepoID)
require.NoError(t, err, clues.ToCore(err))
err = r.Initialize(ctx, ctrlRepo.Retention{})
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
// then test it // then test it

View File

@ -130,13 +130,17 @@ func initS3Cmd(cmd *cobra.Command, args []string) error {
return Only(ctx, clues.Wrap(err, "Failed to parse m365 account config")) return Only(ctx, clues.Wrap(err, "Failed to parse m365 account config"))
} }
r, err := repository.Initialize( r, err := repository.New(
ctx, ctx,
cfg.Account, cfg.Account,
cfg.Storage, cfg.Storage,
opt, opt,
retentionOpts) repository.NewRepoID)
if err != nil { if err != nil {
return Only(ctx, clues.Wrap(err, "Failed to construct the repository controller"))
}
if err = r.Initialize(ctx, retentionOpts); err != nil {
if flags.SucceedIfExistsFV && errors.Is(err, repository.ErrorRepoAlreadyExists) { if flags.SucceedIfExistsFV && errors.Is(err, repository.ErrorRepoAlreadyExists) {
return nil return nil
} }
@ -211,12 +215,12 @@ func connectS3Cmd(cmd *cobra.Command, args []string) error {
opts := utils.ControlWithConfig(cfg) opts := utils.ControlWithConfig(cfg)
r, err := repository.ConnectAndSendConnectEvent( r, err := repository.New(
ctx, ctx,
cfg.Account, cfg.Account,
cfg.Storage, cfg.Storage,
repoID, opts,
opts) repoID)
if err != nil { if err != nil {
return Only(ctx, clues.Wrap(err, "Failed to connect to the S3 repository")) return Only(ctx, clues.Wrap(err, "Failed to connect to the S3 repository"))
} }

View File

@ -208,12 +208,15 @@ func (suite *S3E2ESuite) TestConnectS3Cmd() {
ctx = config.SetViper(ctx, vpr) ctx = config.SetViper(ctx, vpr)
// init the repo first // init the repo first
_, err = repository.Initialize( r, err := repository.New(
ctx, ctx,
account.Account{}, account.Account{},
st, st,
control.DefaultOptions(), control.DefaultOptions(),
ctrlRepo.Retention{}) repository.NewRepoID)
require.NoError(t, err, clues.ToCore(err))
err = r.Initialize(ctx, ctrlRepo.Retention{})
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
// then test it // then test it

View File

@ -43,7 +43,7 @@ type RestoreExchangeE2ESuite struct {
st storage.Storage st storage.Storage
vpr *viper.Viper vpr *viper.Viper
cfgFP string cfgFP string
repo repository.Repository repo repository.Repositoryer
m365UserID string m365UserID string
backupOps map[path.CategoryType]operations.BackupOperation backupOps map[path.CategoryType]operations.BackupOperation
} }
@ -86,12 +86,15 @@ func (suite *RestoreExchangeE2ESuite) SetupSuite() {
) )
// init the repo first // init the repo first
suite.repo, err = repository.Initialize( r, err := repository.New(
ctx, ctx,
suite.acct, suite.acct,
suite.st, suite.st,
control.Options{}, control.DefaultOptions(),
ctrlRepo.Retention{}) repository.NewRepoID)
require.NoError(t, err, clues.ToCore(err))
err = r.Initialize(ctx, ctrlRepo.Retention{})
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
suite.backupOps = make(map[path.CategoryType]operations.BackupOperation) suite.backupOps = make(map[path.CategoryType]operations.BackupOperation)

View File

@ -31,7 +31,7 @@ func GetAccountAndConnectWithOverrides(
ctx context.Context, ctx context.Context,
cmd *cobra.Command, cmd *cobra.Command,
pst path.ServiceType, pst path.ServiceType,
) (repository.Repository, *storage.Storage, *account.Account, *control.Options, error) { ) (repository.Repositoryer, *storage.Storage, *account.Account, *control.Options, error) {
provider, overrides, err := GetStorageProviderAndOverrides(ctx, cmd) provider, overrides, err := GetStorageProviderAndOverrides(ctx, cmd)
if err != nil { if err != nil {
return nil, nil, nil, nil, clues.Stack(err) return nil, nil, nil, nil, clues.Stack(err)
@ -45,7 +45,7 @@ func GetAccountAndConnect(
pst path.ServiceType, pst path.ServiceType,
provider storage.ProviderType, provider storage.ProviderType,
overrides map[string]string, overrides map[string]string,
) (repository.Repository, *storage.Storage, *account.Account, *control.Options, error) { ) (repository.Repositoryer, *storage.Storage, *account.Account, *control.Options, error) {
cfg, err := config.GetConfigRepoDetails( cfg, err := config.GetConfigRepoDetails(
ctx, ctx,
provider, provider,
@ -63,8 +63,17 @@ func GetAccountAndConnect(
opts := ControlWithConfig(cfg) opts := ControlWithConfig(cfg)
r, err := repository.Connect(ctx, cfg.Account, cfg.Storage, repoID, opts) r, err := repository.New(
ctx,
cfg.Account,
cfg.Storage,
opts,
repoID)
if err != nil { if err != nil {
return nil, nil, nil, nil, clues.Wrap(err, "creating a repository controller")
}
if err := r.Connect(ctx); err != nil {
return nil, nil, nil, nil, clues.Wrap(err, "connecting to the "+cfg.Storage.Provider.String()+" repository") return nil, nil, nil, nil, clues.Wrap(err, "connecting to the "+cfg.Storage.Provider.String()+" repository")
} }
@ -81,7 +90,7 @@ func AccountConnectAndWriteRepoConfig(
ctx context.Context, ctx context.Context,
cmd *cobra.Command, cmd *cobra.Command,
pst path.ServiceType, pst path.ServiceType,
) (repository.Repository, *account.Account, error) { ) (repository.Repositoryer, *account.Account, error) {
r, stg, acc, opts, err := GetAccountAndConnectWithOverrides( r, stg, acc, opts, err := GetAccountAndConnectWithOverrides(
ctx, ctx,
cmd, cmd,
@ -115,7 +124,7 @@ func AccountConnectAndWriteRepoConfig(
} }
// CloseRepo handles closing a repo. // CloseRepo handles closing a repo.
func CloseRepo(ctx context.Context, r repository.Repository) { func CloseRepo(ctx context.Context, r repository.Repositoryer) {
if err := r.Close(ctx); err != nil { if err := r.Close(ctx); err != nil {
fmt.Print("Error closing repository:", err) fmt.Print("Error closing repository:", err)
} }

View File

@ -107,9 +107,19 @@ func pitrListBackups(
opts := utils.ControlWithConfig(cfg) opts := utils.ControlWithConfig(cfg)
opts.Repo.ViewTimestamp = &pitr opts.Repo.ViewTimestamp = &pitr
r, err := repository.Connect(ctx, cfg.Account, cfg.Storage, cfg.RepoID, opts) r, err := repository.New(
ctx,
cfg.Account,
cfg.Storage,
opts,
cfg.RepoID)
if err != nil { if err != nil {
return clues.Wrap(err, "connecting to repo").WithClues(ctx) return clues.Wrap(err, "creating a repo")
}
err = r.Connect(ctx)
if err != nil {
return clues.Wrap(err, "connecting to the repository")
} }
defer r.Close(ctx) defer r.Close(ctx)

View File

@ -87,7 +87,7 @@ func TestMain(m *testing.M) {
func initM365Repo(t *testing.T) ( func initM365Repo(t *testing.T) (
context.Context, context.Context,
repository.Repository, repository.Repositoryer,
account.Account, account.Account,
storage.Storage, storage.Storage,
) { ) {
@ -103,10 +103,18 @@ func initM365Repo(t *testing.T) (
FailureHandling: control.FailFast, FailureHandling: control.FailFast,
} }
repo, err := repository.Initialize(ctx, ac, st, opts, ctrlRepo.Retention{}) r, err := repository.New(
ctx,
ac,
st,
opts,
repository.NewRepoID)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
return ctx, repo, ac, st err = r.Initialize(ctx, ctrlRepo.Retention{})
require.NoError(t, err, clues.ToCore(err))
return ctx, r, ac, st
} }
// ------------------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------------------
@ -117,7 +125,7 @@ func initM365Repo(t *testing.T) (
func runLoadTest( func runLoadTest(
t *testing.T, t *testing.T,
ctx context.Context, ctx context.Context,
r repository.Repository, r repository.Repositoryer,
prefix, service string, prefix, service string,
usersUnderTest []string, usersUnderTest []string,
bupSel, restSel selectors.Selector, bupSel, restSel selectors.Selector,
@ -142,7 +150,7 @@ func runLoadTest(
func runRestoreLoadTest( func runRestoreLoadTest(
t *testing.T, t *testing.T,
ctx context.Context, ctx context.Context,
r repository.Repository, r repository.Repositoryer,
prefix, service, backupID string, prefix, service, backupID string,
usersUnderTest []string, usersUnderTest []string,
restSel selectors.Selector, restSel selectors.Selector,
@ -200,7 +208,7 @@ func runBackupLoadTest(
func runBackupListLoadTest( func runBackupListLoadTest(
t *testing.T, t *testing.T,
ctx context.Context, ctx context.Context,
r repository.Repository, r repository.Repositoryer,
name, expectID string, name, expectID string,
) { ) {
//revive:enable:context-as-argument //revive:enable:context-as-argument
@ -237,7 +245,7 @@ func runBackupListLoadTest(
func runBackupDetailsLoadTest( func runBackupDetailsLoadTest(
t *testing.T, t *testing.T,
ctx context.Context, ctx context.Context,
r repository.Repository, r repository.Repositoryer,
name, backupID string, name, backupID string,
users []string, users []string,
) { ) {
@ -409,7 +417,7 @@ func normalizeCategorySet(t *testing.T, cats map[string]struct{}) []string {
type LoadExchangeSuite struct { type LoadExchangeSuite struct {
tester.Suite tester.Suite
ctx context.Context ctx context.Context
repo repository.Repository repo repository.Repositoryer
acct account.Account //lint:ignore U1000 future test use acct account.Account //lint:ignore U1000 future test use
st storage.Storage //lint:ignore U1000 future test use st storage.Storage //lint:ignore U1000 future test use
usersUnderTest []string usersUnderTest []string
@ -459,7 +467,7 @@ func (suite *LoadExchangeSuite) TestExchange() {
type IndividualLoadExchangeSuite struct { type IndividualLoadExchangeSuite struct {
tester.Suite tester.Suite
ctx context.Context ctx context.Context
repo repository.Repository repo repository.Repositoryer
acct account.Account //lint:ignore U1000 future test use acct account.Account //lint:ignore U1000 future test use
st storage.Storage //lint:ignore U1000 future test use st storage.Storage //lint:ignore U1000 future test use
usersUnderTest []string usersUnderTest []string
@ -512,7 +520,7 @@ func (suite *IndividualLoadExchangeSuite) TestExchange() {
type LoadOneDriveSuite struct { type LoadOneDriveSuite struct {
tester.Suite tester.Suite
ctx context.Context ctx context.Context
repo repository.Repository repo repository.Repositoryer
acct account.Account //lint:ignore U1000 future test use acct account.Account //lint:ignore U1000 future test use
st storage.Storage //lint:ignore U1000 future test use st storage.Storage //lint:ignore U1000 future test use
usersUnderTest []string usersUnderTest []string
@ -559,7 +567,7 @@ func (suite *LoadOneDriveSuite) TestOneDrive() {
type IndividualLoadOneDriveSuite struct { type IndividualLoadOneDriveSuite struct {
tester.Suite tester.Suite
ctx context.Context ctx context.Context
repo repository.Repository repo repository.Repositoryer
acct account.Account //lint:ignore U1000 future test use acct account.Account //lint:ignore U1000 future test use
st storage.Storage //lint:ignore U1000 future test use st storage.Storage //lint:ignore U1000 future test use
usersUnderTest []string usersUnderTest []string
@ -609,7 +617,7 @@ func (suite *IndividualLoadOneDriveSuite) TestOneDrive() {
type LoadSharePointSuite struct { type LoadSharePointSuite struct {
tester.Suite tester.Suite
ctx context.Context ctx context.Context
repo repository.Repository repo repository.Repositoryer
acct account.Account //lint:ignore U1000 future test use acct account.Account //lint:ignore U1000 future test use
st storage.Storage //lint:ignore U1000 future test use st storage.Storage //lint:ignore U1000 future test use
sitesUnderTest []string sitesUnderTest []string
@ -656,7 +664,7 @@ func (suite *LoadSharePointSuite) TestSharePoint() {
type IndividualLoadSharePointSuite struct { type IndividualLoadSharePointSuite struct {
tester.Suite tester.Suite
ctx context.Context ctx context.Context
repo repository.Repository repo repository.Repositoryer
acct account.Account //lint:ignore U1000 future test use acct account.Account //lint:ignore U1000 future test use
st storage.Storage //lint:ignore U1000 future test use st storage.Storage //lint:ignore U1000 future test use
sitesUnderTest []string sitesUnderTest []string

View File

@ -35,6 +35,8 @@ import (
"github.com/alcionai/corso/src/pkg/store" "github.com/alcionai/corso/src/pkg/store"
) )
const NewRepoID = ""
var ( var (
ErrorRepoAlreadyExists = clues.New("a repository was already initialized with that configuration") ErrorRepoAlreadyExists = clues.New("a repository was already initialized with that configuration")
ErrorBackupNotFound = clues.New("no backup exists with that id") ErrorBackupNotFound = clues.New("no backup exists with that id")
@ -56,7 +58,9 @@ type BackupGetter interface {
) (*fault.Errors, *backup.Backup, *fault.Bus) ) (*fault.Errors, *backup.Backup, *fault.Bus)
} }
type Repository interface { type Repositoryer interface {
Initialize(ctx context.Context, retentionOpts ctrlRepo.Retention) error
Connect(ctx context.Context) error
GetID() string GetID() string
Close(context.Context) error Close(context.Context) error
NewBackup( NewBackup(
@ -117,6 +121,48 @@ func (r repository) GetID() string {
return r.ID return r.ID
} }
// New constructs a repository that can be used to Initialize or Connect a repo instance.
func New(
ctx context.Context,
acct account.Account,
s storage.Storage,
opts control.Options,
configFileRepoID string,
) (repo *repository, err error) {
ctx = clues.Add(
ctx,
"acct_provider", acct.Provider.String(),
"acct_id", clues.Hide(acct.ID()),
"storage_provider", s.Provider.String())
bus, err := events.NewBus(ctx, s, acct.ID(), opts)
if err != nil {
return nil, clues.Wrap(err, "constructing event bus").WithClues(ctx)
}
repoID := configFileRepoID
if len(configFileRepoID) == 0 {
repoID = newRepoID(s)
}
bus.SetRepoID(repoID)
r := repository{
ID: repoID,
Version: "v1",
Account: acct,
Storage: s,
Bus: bus,
Opts: opts,
}
if !r.Opts.DisableMetrics {
bus.SetRepoID(r.ID)
}
return &r, nil
}
// Initialize will: // Initialize will:
// - validate the m365 account & secrets // - validate the m365 account & secrets
// - connect to the m365 account to ensure communication capability // - connect to the m365 account to ensure communication capability
@ -125,19 +171,15 @@ func (r repository) GetID() string {
// - update maintenance retention parameters as needed // - update maintenance retention parameters as needed
// - store the configuration details // - store the configuration details
// - connect to the provider // - connect to the provider
// - return the connected repository func (r *repository) Initialize(
func Initialize(
ctx context.Context, ctx context.Context,
acct account.Account,
s storage.Storage,
opts control.Options,
retentionOpts ctrlRepo.Retention, retentionOpts ctrlRepo.Retention,
) (repo Repository, err error) { ) (err error) {
ctx = clues.Add( ctx = clues.Add(
ctx, ctx,
"acct_provider", acct.Provider.String(), "acct_provider", r.Account.Provider.String(),
"acct_id", clues.Hide(acct.ID()), "acct_id", clues.Hide(r.Account.ID()),
"storage_provider", s.Provider.String()) "storage_provider", r.Storage.Provider.String())
defer func() { defer func() {
if crErr := crash.Recovery(ctx, recover(), "repo init"); crErr != nil { if crErr := crash.Recovery(ctx, recover(), "repo init"); crErr != nil {
@ -145,55 +187,36 @@ func Initialize(
} }
}() }()
kopiaRef := kopia.NewConn(s) kopiaRef := kopia.NewConn(r.Storage)
if err := kopiaRef.Initialize(ctx, opts.Repo, retentionOpts); err != nil { if err := kopiaRef.Initialize(ctx, r.Opts.Repo, retentionOpts); err != nil {
// replace common internal errors so that sdk users can check results with errors.Is() // replace common internal errors so that sdk users can check results with errors.Is()
if errors.Is(err, kopia.ErrorRepoAlreadyExists) { if errors.Is(err, kopia.ErrorRepoAlreadyExists) {
return nil, clues.Stack(ErrorRepoAlreadyExists, err).WithClues(ctx) return clues.Stack(ErrorRepoAlreadyExists, err).WithClues(ctx)
} }
return nil, clues.Wrap(err, "initializing kopia") return clues.Wrap(err, "initializing kopia")
} }
// kopiaRef comes with a count of 1 and NewWrapper/NewModelStore bumps it again so safe // kopiaRef comes with a count of 1 and NewWrapper/NewModelStore bumps it again so safe
// to close here. // to close here.
defer kopiaRef.Close(ctx) defer kopiaRef.Close(ctx)
w, err := kopia.NewWrapper(kopiaRef) r.dataLayer, err = kopia.NewWrapper(kopiaRef)
if err != nil { if err != nil {
return nil, clues.Stack(err).WithClues(ctx) return clues.Stack(err).WithClues(ctx)
} }
ms, err := kopia.NewModelStore(kopiaRef) r.modelStore, err = kopia.NewModelStore(kopiaRef)
if err != nil { if err != nil {
return nil, clues.Stack(err).WithClues(ctx) return clues.Stack(err).WithClues(ctx)
} }
bus, err := events.NewBus(ctx, s, acct.ID(), opts) if err := newRepoModel(ctx, r.modelStore, r.ID); err != nil {
if err != nil { return clues.Wrap(err, "setting up repository").WithClues(ctx)
return nil, clues.Wrap(err, "constructing event bus")
}
repoID := newRepoID(s)
bus.SetRepoID(repoID)
r := &repository{
ID: repoID,
Version: "v1",
Account: acct,
Storage: s,
Bus: bus,
Opts: opts,
dataLayer: w,
modelStore: ms,
}
if err := newRepoModel(ctx, ms, r.ID); err != nil {
return nil, clues.New("setting up repository").WithClues(ctx)
} }
r.Bus.Event(ctx, events.RepoInit, nil) r.Bus.Event(ctx, events.RepoInit, nil)
return r, nil return nil
} }
// Connect will: // Connect will:
@ -201,18 +224,12 @@ func Initialize(
// - connect to the m365 account to ensure communication capability // - connect to the m365 account to ensure communication capability
// - connect to the provider storage // - connect to the provider storage
// - return the connected repository // - return the connected repository
func Connect( func (r *repository) Connect(ctx context.Context) (err error) {
ctx context.Context,
acct account.Account,
s storage.Storage,
repoid string,
opts control.Options,
) (r Repository, err error) {
ctx = clues.Add( ctx = clues.Add(
ctx, ctx,
"acct_provider", acct.Provider.String(), "acct_provider", r.Account.Provider.String(),
"acct_id", clues.Hide(acct.ID()), "acct_id", clues.Hide(r.Account.ID()),
"storage_provider", s.Provider.String()) "storage_provider", r.Storage.Provider.String())
defer func() { defer func() {
if crErr := crash.Recovery(ctx, recover(), "repo connect"); crErr != nil { if crErr := crash.Recovery(ctx, recover(), "repo connect"); crErr != nil {
@ -223,71 +240,36 @@ func Connect(
progressBar := observe.MessageWithCompletion(ctx, "Connecting to repository") progressBar := observe.MessageWithCompletion(ctx, "Connecting to repository")
defer close(progressBar) defer close(progressBar)
kopiaRef := kopia.NewConn(s) kopiaRef := kopia.NewConn(r.Storage)
if err := kopiaRef.Connect(ctx, opts.Repo); err != nil { if err := kopiaRef.Connect(ctx, r.Opts.Repo); err != nil {
return nil, clues.Wrap(err, "connecting kopia client") return clues.Wrap(err, "connecting kopia client")
} }
// kopiaRef comes with a count of 1 and NewWrapper/NewModelStore bumps it again so safe // kopiaRef comes with a count of 1 and NewWrapper/NewModelStore bumps it again so safe
// to close here. // to close here.
defer kopiaRef.Close(ctx) defer kopiaRef.Close(ctx)
w, err := kopia.NewWrapper(kopiaRef) r.dataLayer, err = kopia.NewWrapper(kopiaRef)
if err != nil { if err != nil {
return nil, clues.Stack(err).WithClues(ctx) return clues.Stack(err).WithClues(ctx)
} }
ms, err := kopia.NewModelStore(kopiaRef) r.modelStore, err = kopia.NewModelStore(kopiaRef)
if err != nil { if err != nil {
return nil, clues.Stack(err).WithClues(ctx) return clues.Stack(err).WithClues(ctx)
} }
bus, err := events.NewBus(ctx, s, acct.ID(), opts) if r.ID == events.RepoIDNotFound {
if err != nil { rm, err := getRepoModel(ctx, r.modelStore)
return nil, clues.Wrap(err, "constructing event bus")
}
if repoid == events.RepoIDNotFound {
rm, err := getRepoModel(ctx, ms)
if err != nil { if err != nil {
return nil, clues.New("retrieving repo info") return clues.Wrap(err, "retrieving repo model info")
} }
repoid = string(rm.ID) r.ID = string(rm.ID)
} }
// Do not query repo ID if metrics are disabled
if !opts.DisableMetrics {
bus.SetRepoID(repoid)
}
// todo: ID and CreatedAt should get retrieved from a stored kopia config.
return &repository{
ID: repoid,
Version: "v1",
Account: acct,
Storage: s,
Bus: bus,
Opts: opts,
dataLayer: w,
modelStore: ms,
}, nil
}
func ConnectAndSendConnectEvent(ctx context.Context,
acct account.Account,
s storage.Storage,
repoid string,
opts control.Options,
) (Repository, error) {
repo, err := Connect(ctx, acct, s, repoid, opts)
if err != nil {
return nil, err
}
r := repo.(*repository)
r.Bus.Event(ctx, events.RepoConnect, nil) r.Bus.Event(ctx, events.RepoConnect, nil)
return r, nil return nil
} }
func (r *repository) Close(ctx context.Context) error { func (r *repository) Close(ctx context.Context) error {

View File

@ -6,7 +6,6 @@ import (
"time" "time"
"github.com/alcionai/clues" "github.com/alcionai/clues"
"github.com/kopia/kopia/repo/blob/readonly"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
@ -62,12 +61,15 @@ func (suite *RepositoryUnitSuite) TestInitialize() {
st, err := test.storage() st, err := test.storage()
assert.NoError(t, err, clues.ToCore(err)) assert.NoError(t, err, clues.ToCore(err))
_, err = Initialize( r, err := New(
ctx, ctx,
test.account, test.account,
st, st,
control.DefaultOptions(), control.DefaultOptions(),
ctrlRepo.Retention{}) NewRepoID)
require.NoError(t, err, clues.ToCore(err))
err = r.Initialize(ctx, ctrlRepo.Retention{})
test.errCheck(t, err, clues.ToCore(err)) test.errCheck(t, err, clues.ToCore(err))
}) })
} }
@ -101,7 +103,15 @@ func (suite *RepositoryUnitSuite) TestConnect() {
st, err := test.storage() st, err := test.storage()
assert.NoError(t, err, clues.ToCore(err)) assert.NoError(t, err, clues.ToCore(err))
_, err = Connect(ctx, test.account, st, "not_found", control.DefaultOptions()) r, err := New(
ctx,
test.account,
st,
control.DefaultOptions(),
NewRepoID)
require.NoError(t, err, clues.ToCore(err))
err = r.Connect(ctx)
test.errCheck(t, err, clues.ToCore(err)) test.errCheck(t, err, clues.ToCore(err))
}) })
} }
@ -144,12 +154,15 @@ func (suite *RepositoryIntegrationSuite) TestInitialize() {
defer flush() defer flush()
st := test.storage(t) st := test.storage(t)
r, err := Initialize( r, err := New(
ctx, ctx,
test.account, test.account,
st, st,
control.DefaultOptions(), control.DefaultOptions(),
ctrlRepo.Retention{}) NewRepoID)
require.NoError(t, err, clues.ToCore(err))
err = r.Initialize(ctx, ctrlRepo.Retention{})
if err == nil { if err == nil {
defer func() { defer func() {
err := r.Close(ctx) err := r.Close(ctx)
@ -168,21 +181,31 @@ const (
) )
func (suite *RepositoryIntegrationSuite) TestInitializeWithRole() { func (suite *RepositoryIntegrationSuite) TestInitializeWithRole() {
t := suite.T()
if _, ok := os.LookupEnv(roleARNEnvKey); !ok { if _, ok := os.LookupEnv(roleARNEnvKey); !ok {
suite.T().Skip(roleARNEnvKey + " not set") t.Skip(roleARNEnvKey + " not set")
} }
ctx, flush := tester.NewContext(suite.T()) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
st := storeTD.NewPrefixedS3Storage(suite.T()) st := storeTD.NewPrefixedS3Storage(t)
st.Role = os.Getenv(roleARNEnvKey) st.Role = os.Getenv(roleARNEnvKey)
st.SessionName = "corso-repository-test" st.SessionName = "corso-repository-test"
st.SessionDuration = roleDuration.String() st.SessionDuration = roleDuration.String()
r, err := Initialize(ctx, account.Account{}, st, control.Options{}, ctrlRepo.Retention{}) r, err := New(
require.NoError(suite.T(), err) ctx,
account.Account{},
st,
control.DefaultOptions(),
NewRepoID)
require.NoError(t, err, clues.ToCore(err))
err = r.Initialize(ctx, ctrlRepo.Retention{})
require.NoError(t, err)
defer func() { defer func() {
r.Close(ctx) r.Close(ctx)
@ -197,17 +220,19 @@ func (suite *RepositoryIntegrationSuite) TestConnect() {
// need to initialize the repository before we can test connecting to it. // need to initialize the repository before we can test connecting to it.
st := storeTD.NewPrefixedS3Storage(t) st := storeTD.NewPrefixedS3Storage(t)
r, err := New(
repo, err := Initialize(
ctx, ctx,
account.Account{}, account.Account{},
st, st,
control.DefaultOptions(), control.DefaultOptions(),
ctrlRepo.Retention{}) NewRepoID)
require.NoError(t, err, clues.ToCore(err))
err = r.Initialize(ctx, ctrlRepo.Retention{})
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
// now re-connect // now re-connect
_, err = Connect(ctx, account.Account{}, st, repo.GetID(), control.DefaultOptions()) err = r.Connect(ctx)
assert.NoError(t, err, clues.ToCore(err)) assert.NoError(t, err, clues.ToCore(err))
} }
@ -219,13 +244,15 @@ func (suite *RepositoryIntegrationSuite) TestConnect_sameID() {
// need to initialize the repository before we can test connecting to it. // need to initialize the repository before we can test connecting to it.
st := storeTD.NewPrefixedS3Storage(t) st := storeTD.NewPrefixedS3Storage(t)
r, err := New(
r, err := Initialize(
ctx, ctx,
account.Account{}, account.Account{},
st, st,
control.DefaultOptions(), control.DefaultOptions(),
ctrlRepo.Retention{}) NewRepoID)
require.NoError(t, err, clues.ToCore(err))
err = r.Initialize(ctx, ctrlRepo.Retention{})
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
oldID := r.GetID() oldID := r.GetID()
@ -234,7 +261,7 @@ func (suite *RepositoryIntegrationSuite) TestConnect_sameID() {
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
// now re-connect // now re-connect
r, err = Connect(ctx, account.Account{}, st, oldID, control.DefaultOptions()) err = r.Connect(ctx)
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
assert.Equal(t, oldID, r.GetID()) assert.Equal(t, oldID, r.GetID())
} }
@ -249,13 +276,15 @@ func (suite *RepositoryIntegrationSuite) TestNewBackup() {
// need to initialize the repository before we can test connecting to it. // need to initialize the repository before we can test connecting to it.
st := storeTD.NewPrefixedS3Storage(t) st := storeTD.NewPrefixedS3Storage(t)
r, err := New(
r, err := Initialize(
ctx, ctx,
acct, acct,
st, st,
control.DefaultOptions(), control.DefaultOptions(),
ctrlRepo.Retention{}) NewRepoID)
require.NoError(t, err, clues.ToCore(err))
err = r.Initialize(ctx, ctrlRepo.Retention{})
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
userID := tconfig.M365UserID(t) userID := tconfig.M365UserID(t)
@ -276,13 +305,15 @@ func (suite *RepositoryIntegrationSuite) TestNewRestore() {
// need to initialize the repository before we can test connecting to it. // need to initialize the repository before we can test connecting to it.
st := storeTD.NewPrefixedS3Storage(t) st := storeTD.NewPrefixedS3Storage(t)
r, err := New(
r, err := Initialize(
ctx, ctx,
acct, acct,
st, st,
control.DefaultOptions(), control.DefaultOptions(),
ctrlRepo.Retention{}) "")
require.NoError(t, err, clues.ToCore(err))
err = r.Initialize(ctx, ctrlRepo.Retention{})
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
ro, err := r.NewRestore( ro, err := r.NewRestore(
@ -304,13 +335,15 @@ func (suite *RepositoryIntegrationSuite) TestNewBackupAndDelete() {
// need to initialize the repository before we can test connecting to it. // need to initialize the repository before we can test connecting to it.
st := storeTD.NewPrefixedS3Storage(t) st := storeTD.NewPrefixedS3Storage(t)
r, err := New(
r, err := Initialize(
ctx, ctx,
acct, acct,
st, st,
control.DefaultOptions(), control.DefaultOptions(),
ctrlRepo.Retention{}) NewRepoID)
require.NoError(t, err, clues.ToCore(err))
err = r.Initialize(ctx, ctrlRepo.Retention{})
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
userID := tconfig.M365UserID(t) userID := tconfig.M365UserID(t)
@ -355,13 +388,15 @@ func (suite *RepositoryIntegrationSuite) TestNewMaintenance() {
// need to initialize the repository before we can test connecting to it. // need to initialize the repository before we can test connecting to it.
st := storeTD.NewPrefixedS3Storage(t) st := storeTD.NewPrefixedS3Storage(t)
r, err := New(
r, err := Initialize(
ctx, ctx,
acct, acct,
st, st,
control.DefaultOptions(), control.DefaultOptions(),
ctrlRepo.Retention{}) NewRepoID)
require.NoError(t, err, clues.ToCore(err))
err = r.Initialize(ctx, ctrlRepo.Retention{})
require.NoError(t, err, clues.ToCore(err)) require.NoError(t, err, clues.ToCore(err))
mo, err := r.NewMaintenance(ctx, ctrlRepo.Maintenance{}) mo, err := r.NewMaintenance(ctx, ctrlRepo.Maintenance{})
@ -369,61 +404,6 @@ func (suite *RepositoryIntegrationSuite) TestNewMaintenance() {
require.NotNil(t, mo) require.NotNil(t, mo)
} }
func (suite *RepositoryIntegrationSuite) TestConnect_DisableMetrics() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
// need to initialize the repository before we can test connecting to it.
st := storeTD.NewPrefixedS3Storage(t)
repo, err := Initialize(
ctx,
account.Account{},
st,
control.DefaultOptions(),
ctrlRepo.Retention{})
require.NoError(t, err)
// now re-connect
r, err := Connect(ctx, account.Account{}, st, repo.GetID(), control.Options{DisableMetrics: true})
assert.NoError(t, err)
// now we have repoID beforehand
assert.Equal(t, r.GetID(), r.GetID())
}
func (suite *RepositoryIntegrationSuite) TestConnect_ReadOnly() {
t := suite.T()
ctx, flush := tester.NewContext(t)
defer flush()
// need to initialize the repository before we can test connecting to it.
st := storeTD.NewPrefixedS3Storage(t)
repo, err := Initialize(
ctx,
account.Account{},
st,
control.DefaultOptions(),
ctrlRepo.Retention{})
require.NoError(t, err)
// now re-connect
r, err := Connect(ctx, account.Account{}, st, repo.GetID(), control.Options{Repo: ctrlRepo.Options{ReadOnly: true}})
assert.NoError(t, err)
// Maintenance attempts to write some blobs just to say it was running. Since
// we're in readonly mode it should fail with a sentinel error.
op, err := r.NewMaintenance(ctx, ctrlRepo.Maintenance{})
require.NoError(t, err, clues.ToCore(err))
err = op.Run(ctx)
assert.ErrorIs(t, err, readonly.ErrReadonly)
}
// Test_Options tests that the options are passed through to the repository // Test_Options tests that the options are passed through to the repository
// correctly // correctly
func (suite *RepositoryIntegrationSuite) Test_Options() { func (suite *RepositoryIntegrationSuite) Test_Options() {
@ -477,16 +457,20 @@ func (suite *RepositoryIntegrationSuite) Test_Options() {
ctx, flush := tester.NewContext(t) ctx, flush := tester.NewContext(t)
defer flush() defer flush()
repo, err := Initialize(ctx, acct, st, test.opts(), ctrlRepo.Retention{}) r, err := New(
require.NoError(t, err) ctx,
acct,
st,
test.opts(),
NewRepoID)
require.NoError(t, err, clues.ToCore(err))
r := repo.(*repository) err = r.Initialize(ctx, ctrlRepo.Retention{})
require.NoError(t, err)
assert.Equal(t, test.expectedLen, len(r.Opts.ItemExtensionFactory)) assert.Equal(t, test.expectedLen, len(r.Opts.ItemExtensionFactory))
repo, err = Connect(ctx, acct, st, repo.GetID(), test.opts()) err = r.Connect(ctx)
assert.NoError(t, err) assert.NoError(t, err)
r = repo.(*repository)
assert.Equal(t, test.expectedLen, len(r.Opts.ItemExtensionFactory)) assert.Equal(t, test.expectedLen, len(r.Opts.ItemExtensionFactory))
}) })
} }