generate a unique repository id on init (#1716)
## Description generates a unique repository ID on repo init, and retrieves it again on repo connect. This allows the event bus (and any other tracker) to id a repo independent of other context. I've been unable to find any good way to retrieve a unique repository ID from kopia. This seems like the next quickest solution. ## Type of change - [x] 🐛 Bugfix ## Issue(s) * #1602 ## Test Plan - [x] 💪 Manual
This commit is contained in:
parent
893bc978ba
commit
565c33af4e
@ -99,7 +99,6 @@ func NewBus(ctx context.Context, s storage.Storage, tenID string, opts control.O
|
||||
|
||||
return Bus{
|
||||
client: client,
|
||||
repoID: repoHash(s),
|
||||
tenant: tenantHash(tenID),
|
||||
version: version.Version,
|
||||
}, nil
|
||||
@ -152,34 +151,11 @@ func (b Bus) Event(ctx context.Context, key string, data map[string]any) {
|
||||
}
|
||||
}
|
||||
|
||||
func storageID(s storage.Storage) string {
|
||||
id := s.Provider.String()
|
||||
|
||||
switch s.Provider {
|
||||
case storage.ProviderS3:
|
||||
s3, err := s.S3Config()
|
||||
if err != nil {
|
||||
return id
|
||||
}
|
||||
|
||||
id += s3.Bucket + s3.Prefix
|
||||
}
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
func repoHash(s storage.Storage) string {
|
||||
return md5HashOf(storageID(s))
|
||||
func (b *Bus) SetRepoID(hash string) {
|
||||
b.repoID = hash
|
||||
}
|
||||
|
||||
func tenantHash(tenID string) string {
|
||||
return md5HashOf(tenID)
|
||||
}
|
||||
|
||||
func md5HashOf(s string) string {
|
||||
sum := md5.Sum(
|
||||
[]byte(s),
|
||||
)
|
||||
|
||||
sum := md5.Sum([]byte(tenID))
|
||||
return fmt.Sprintf("%x", sum)
|
||||
}
|
||||
|
||||
@ -22,6 +22,8 @@ const (
|
||||
BackupOpSchema
|
||||
RestoreOpSchema
|
||||
BackupSchema
|
||||
BackupDetailsSchema
|
||||
RepositorySchema
|
||||
)
|
||||
|
||||
// common tags for filtering
|
||||
@ -31,7 +33,7 @@ const (
|
||||
|
||||
// Valid returns true if the ModelType value fits within the iota range.
|
||||
func (mt Schema) Valid() bool {
|
||||
return mt > 0 && mt < BackupSchema+1
|
||||
return mt > 0 && mt < RepositorySchema+1
|
||||
}
|
||||
|
||||
type Model interface {
|
||||
|
||||
@ -27,6 +27,9 @@ func (suite *ModelUnitSuite) TestValid() {
|
||||
{model.BackupOpSchema, assert.True},
|
||||
{model.RestoreOpSchema, assert.True},
|
||||
{model.BackupSchema, assert.True},
|
||||
{model.BackupDetailsSchema, assert.True},
|
||||
{model.RepositorySchema, assert.True},
|
||||
{model.RepositorySchema + 1, assert.False},
|
||||
{model.Schema(-1), assert.False},
|
||||
{model.Schema(100), assert.False},
|
||||
}
|
||||
|
||||
@ -39,6 +39,7 @@ type BackupGetter interface {
|
||||
}
|
||||
|
||||
type Repository interface {
|
||||
GetID() string
|
||||
Close(context.Context) error
|
||||
NewBackup(
|
||||
ctx context.Context,
|
||||
@ -56,7 +57,7 @@ type Repository interface {
|
||||
|
||||
// Repository contains storage provider information.
|
||||
type repository struct {
|
||||
ID uuid.UUID
|
||||
ID string
|
||||
CreatedAt time.Time
|
||||
Version string // in case of future breaking changes
|
||||
|
||||
@ -69,6 +70,10 @@ type repository struct {
|
||||
modelStore *kopia.ModelStore
|
||||
}
|
||||
|
||||
func (r repository) GetID() string {
|
||||
return r.ID
|
||||
}
|
||||
|
||||
// Initialize will:
|
||||
// - validate the m365 account & secrets
|
||||
// - connect to the m365 account to ensure communication capability
|
||||
@ -111,8 +116,11 @@ func Initialize(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
repoID := newRepoID(s)
|
||||
bus.SetRepoID(repoID)
|
||||
|
||||
r := &repository{
|
||||
ID: uuid.New(),
|
||||
ID: repoID,
|
||||
Version: "v1",
|
||||
Account: acct,
|
||||
Storage: s,
|
||||
@ -121,6 +129,10 @@ func Initialize(
|
||||
modelStore: ms,
|
||||
}
|
||||
|
||||
if err := newRepoModel(ctx, ms, r.ID); err != nil {
|
||||
return nil, errors.New("setting up repository")
|
||||
}
|
||||
|
||||
r.Bus.Event(ctx, events.RepoInit, nil)
|
||||
|
||||
return r, nil
|
||||
@ -164,10 +176,18 @@ func Connect(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rm, err := getRepoModel(ctx, ms)
|
||||
if err != nil {
|
||||
return nil, errors.New("retrieving repo info")
|
||||
}
|
||||
|
||||
bus.SetRepoID(string(rm.ID))
|
||||
|
||||
complete <- struct{}{}
|
||||
|
||||
// todo: ID and CreatedAt should get retrieved from a stored kopia config.
|
||||
return &repository{
|
||||
ID: string(rm.ID),
|
||||
Version: "v1",
|
||||
Account: acct,
|
||||
Storage: s,
|
||||
@ -303,3 +323,47 @@ func (r repository) DeleteBackup(ctx context.Context, id model.StableID) error {
|
||||
|
||||
return sw.DeleteBackup(ctx, id)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Repository ID Model
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// repositoryModel identifies the current repository
|
||||
type repositoryModel struct {
|
||||
model.BaseModel
|
||||
}
|
||||
|
||||
// should only be called on init.
|
||||
func newRepoModel(ctx context.Context, ms *kopia.ModelStore, repoID string) error {
|
||||
rm := repositoryModel{
|
||||
BaseModel: model.BaseModel{
|
||||
ID: model.StableID(repoID),
|
||||
},
|
||||
}
|
||||
|
||||
return ms.Put(ctx, model.RepositorySchema, &rm)
|
||||
}
|
||||
|
||||
// retrieves the repository info
|
||||
func getRepoModel(ctx context.Context, ms *kopia.ModelStore) (*repositoryModel, error) {
|
||||
bms, err := ms.GetIDsForType(ctx, model.RepositorySchema, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rm := &repositoryModel{}
|
||||
if len(bms) == 0 {
|
||||
return rm, nil
|
||||
}
|
||||
|
||||
rm.BaseModel = *bms[0]
|
||||
|
||||
return rm, nil
|
||||
}
|
||||
|
||||
// newRepoID generates a new unique repository id hash.
|
||||
// Repo IDs should only be generated once per repository,
|
||||
// and must be stored after that.
|
||||
func newRepoID(s storage.Storage) string {
|
||||
return uuid.NewString()
|
||||
}
|
||||
|
||||
@ -162,6 +162,28 @@ func (suite *RepositoryIntegrationSuite) TestConnect() {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func (suite *RepositoryIntegrationSuite) TestConnect_sameID() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
t := suite.T()
|
||||
|
||||
// need to initialize the repository before we can test connecting to it.
|
||||
st := tester.NewPrefixedS3Storage(t)
|
||||
|
||||
r, err := repository.Initialize(ctx, account.Account{}, st, control.Options{})
|
||||
require.NoError(t, err)
|
||||
|
||||
oldID := r.GetID()
|
||||
|
||||
require.NoError(t, r.Close(ctx))
|
||||
|
||||
// now re-connect
|
||||
r, err = repository.Connect(ctx, account.Account{}, st, control.Options{})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, oldID, r.GetID())
|
||||
}
|
||||
|
||||
func (suite *RepositoryIntegrationSuite) TestNewBackup() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
63
src/pkg/repository/repository_unexported_test.go
Normal file
63
src/pkg/repository/repository_unexported_test.go
Normal file
@ -0,0 +1,63 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/kopia"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
)
|
||||
|
||||
type RepositoryModelSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestRepositoryModelSuite(t *testing.T) {
|
||||
if err := tester.RunOnAny(
|
||||
tester.CorsoCITests,
|
||||
tester.CorsoRepositoryTests,
|
||||
"flomp",
|
||||
); err != nil {
|
||||
t.Skip(err)
|
||||
}
|
||||
|
||||
suite.Run(t, new(RepositoryModelSuite))
|
||||
}
|
||||
|
||||
// ensure all required env values are populated
|
||||
func (suite *RepositoryModelSuite) SetupSuite() {
|
||||
_, err := tester.GetRequiredEnvSls(
|
||||
tester.AWSStorageCredEnvs,
|
||||
tester.M365AcctCredEnvs)
|
||||
require.NoError(suite.T(), err)
|
||||
}
|
||||
|
||||
func (suite *RepositoryModelSuite) TestWriteGetModel() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
var (
|
||||
t = suite.T()
|
||||
s = tester.NewPrefixedS3Storage(t)
|
||||
kopiaRef = kopia.NewConn(s)
|
||||
)
|
||||
|
||||
require.NoError(t, kopiaRef.Initialize(ctx))
|
||||
require.NoError(t, kopiaRef.Connect(ctx))
|
||||
|
||||
defer kopiaRef.Close(ctx)
|
||||
|
||||
ms, err := kopia.NewModelStore(kopiaRef)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer ms.Close(ctx)
|
||||
|
||||
require.NoError(t, newRepoModel(ctx, ms, "fnords"))
|
||||
|
||||
got, err := getRepoModel(ctx, ms)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "fnords", string(got.ID))
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user