Store backup details in a separate snapshot (#1755)
## Description This PR uses a separate snapshot to store backup details instead of a Corso model (i.e. a kopia manifest). Introduces a `StreamStore` that can be leveraged to store larger metadata objects. We can also leverage this for incrementals or restartable backups going forward. ## Type of change <!--- Please check the type of change your PR introduces: ---> - [x] 🌻 Feature - [ ] 🐛 Bugfix - [ ] 🗺️ Documentation - [ ] 🤖 Test - [ ] 💻 CI/Deployment - [ ] 🐹 Trivial/Minor ## Issue(s) <!-- Can reference multiple issues. Use one of the following "magic words" - "closes, fixes" to auto-close the Github issue. --> * #1735 ## Test Plan <!-- How will this be tested prior to merging.--> - [ ] 💪 Manual - [x] ⚡ Unit test - [x] 💚 E2E
This commit is contained in:
parent
be9b214c0c
commit
93a7ff27e3
@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Changed
|
||||
|
||||
- *Breaking Change*:
|
||||
Changed how backup details are stored in the repository to
|
||||
improve memory usage (#1735) from [vkamra](https://github.com/vkamra)
|
||||
|
||||
## [v0.0.3] (alpha) - 2022-12-05
|
||||
|
||||
### Added
|
||||
|
||||
@ -82,7 +82,7 @@ require (
|
||||
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
|
||||
github.com/microsoft/kiota-serialization-text-go v0.6.0 // indirect
|
||||
github.com/minio/md5-simd v1.1.2 // indirect
|
||||
github.com/minio/minio-go/v7 v7.0.39 // indirect
|
||||
github.com/minio/minio-go/v7 v7.0.39
|
||||
github.com/minio/sha256-simd v1.0.0 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
|
||||
@ -2,7 +2,6 @@ package kopia
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
@ -15,7 +14,7 @@ import (
|
||||
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/backup"
|
||||
)
|
||||
|
||||
type fooModel struct {
|
||||
@ -837,29 +836,13 @@ func (suite *ModelStoreRegressionSuite) TestMultipleConfigs() {
|
||||
defer flush()
|
||||
|
||||
t := suite.T()
|
||||
numEntries := 10
|
||||
deets := details.DetailsModel{
|
||||
Entries: make([]details.DetailsEntry, 0, numEntries),
|
||||
}
|
||||
|
||||
for i := 0; i < numEntries; i++ {
|
||||
deets.Entries = append(
|
||||
deets.Entries,
|
||||
details.DetailsEntry{
|
||||
RepoRef: fmt.Sprintf("exchange/user1/email/inbox/mail%v", i),
|
||||
ItemInfo: details.ItemInfo{
|
||||
Exchange: &details.ExchangeInfo{
|
||||
Sender: "John Doe",
|
||||
Subject: fmt.Sprintf("Hola mundo %v", i),
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
backupModel := backup.Backup{
|
||||
SnapshotID: "snapshotID",
|
||||
}
|
||||
|
||||
conn1, ms1 := openConnAndModelStore(t, ctx)
|
||||
|
||||
require.NoError(t, ms1.Put(ctx, model.BackupDetailsSchema, &deets))
|
||||
require.NoError(t, ms1.Put(ctx, model.BackupSchema, &backupModel))
|
||||
require.NoError(t, ms1.Close(ctx))
|
||||
|
||||
start := make(chan struct{})
|
||||
@ -898,22 +881,22 @@ func (suite *ModelStoreRegressionSuite) TestMultipleConfigs() {
|
||||
}()
|
||||
|
||||
// New instance should not have model we added.
|
||||
gotDeets := details.Details{}
|
||||
gotBackup := backup.Backup{}
|
||||
err := ms2.GetWithModelStoreID(
|
||||
ctx,
|
||||
model.BackupDetailsSchema,
|
||||
deets.ModelStoreID,
|
||||
&gotDeets,
|
||||
model.BackupSchema,
|
||||
backupModel.ModelStoreID,
|
||||
&gotBackup,
|
||||
)
|
||||
assert.Error(t, err)
|
||||
|
||||
// Old instance should still be able to access added model.
|
||||
gotDeets = details.Details{}
|
||||
gotBackup = backup.Backup{}
|
||||
err = ms1.GetWithModelStoreID(
|
||||
ctx,
|
||||
model.BackupDetailsSchema,
|
||||
deets.ModelStoreID,
|
||||
&gotDeets,
|
||||
model.BackupSchema,
|
||||
backupModel.ModelStoreID,
|
||||
&gotBackup,
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -14,7 +14,6 @@ import (
|
||||
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
D "github.com/alcionai/corso/src/internal/diagnostics"
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
"github.com/alcionai/corso/src/internal/stats"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
@ -128,10 +127,6 @@ func (w Wrapper) BackupCollections(
|
||||
deets: &details.Details{},
|
||||
}
|
||||
|
||||
progress.deets.Tags = map[string]string{
|
||||
model.ServiceTag: service.String(),
|
||||
}
|
||||
|
||||
dirTree, oc, err := inflateDirTree(ctx, collections, progress)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "building kopia directories")
|
||||
|
||||
@ -18,7 +18,6 @@ import (
|
||||
|
||||
"github.com/alcionai/corso/src/internal/connector/mockconnector"
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/logger"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
@ -258,7 +257,6 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() {
|
||||
assert.Equal(t, 0, stats.IgnoredErrorCount)
|
||||
assert.Equal(t, 0, stats.ErrorCount)
|
||||
assert.False(t, stats.Incomplete)
|
||||
assert.Equal(t, path.ExchangeService.String(), deets.Tags[model.ServiceTag])
|
||||
// 47 file and 6 folder entries.
|
||||
assert.Len(
|
||||
t,
|
||||
@ -299,14 +297,13 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() {
|
||||
fp2, err := suite.testPath2.Append(dc2.Names[0], true)
|
||||
require.NoError(t, err)
|
||||
|
||||
stats, deets, err := w.BackupCollections(
|
||||
stats, _, err := w.BackupCollections(
|
||||
ctx,
|
||||
nil,
|
||||
[]data.Collection{dc1, dc2},
|
||||
path.ExchangeService,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, path.ExchangeService.String(), deets.Tags[model.ServiceTag])
|
||||
|
||||
require.NoError(t, k.Compression(ctx, "gzip"))
|
||||
|
||||
@ -383,7 +380,6 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() {
|
||||
assert.Equal(t, 6, stats.TotalDirectoryCount)
|
||||
assert.Equal(t, 1, stats.IgnoredErrorCount)
|
||||
assert.False(t, stats.Incomplete)
|
||||
assert.Equal(t, path.ExchangeService.String(), deets.Tags[model.ServiceTag])
|
||||
// 5 file and 6 folder entries.
|
||||
assert.Len(t, deets.Entries, 5+6)
|
||||
}
|
||||
@ -424,8 +420,6 @@ func (suite *KopiaIntegrationSuite) TestBackupCollectionsHandlesNoCollections()
|
||||
|
||||
assert.Equal(t, BackupStats{}, *s)
|
||||
assert.Empty(t, d.Entries)
|
||||
// unknownService resolves to an empty string here.
|
||||
assert.Equal(t, "", d.Tags[model.ServiceTag])
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -577,7 +571,6 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() {
|
||||
require.Equal(t, stats.TotalDirectoryCount, expectedDirs)
|
||||
require.Equal(t, stats.IgnoredErrorCount, 0)
|
||||
require.False(t, stats.Incomplete)
|
||||
assert.Equal(t, path.ExchangeService.String(), deets.Tags[model.ServiceTag])
|
||||
// 6 file and 6 folder entries.
|
||||
assert.Len(t, deets.Entries, expectedFiles+expectedDirs)
|
||||
|
||||
|
||||
@ -22,7 +22,6 @@ const (
|
||||
BackupOpSchema
|
||||
RestoreOpSchema
|
||||
BackupSchema
|
||||
BackupDetailsSchema
|
||||
)
|
||||
|
||||
// common tags for filtering
|
||||
@ -32,7 +31,7 @@ const (
|
||||
|
||||
// Valid returns true if the ModelType value fits within the iota range.
|
||||
func (mt Schema) Valid() bool {
|
||||
return mt > 0 && mt < BackupDetailsSchema+1
|
||||
return mt > 0 && mt < BackupSchema+1
|
||||
}
|
||||
|
||||
type Model interface {
|
||||
|
||||
@ -27,7 +27,6 @@ func (suite *ModelUnitSuite) TestValid() {
|
||||
{model.BackupOpSchema, assert.True},
|
||||
{model.RestoreOpSchema, assert.True},
|
||||
{model.BackupSchema, assert.True},
|
||||
{model.BackupDetailsSchema, assert.True},
|
||||
{model.Schema(-1), assert.False},
|
||||
{model.Schema(100), assert.False},
|
||||
}
|
||||
|
||||
@ -12,12 +12,11 @@ func _() {
|
||||
_ = x[BackupOpSchema-1]
|
||||
_ = x[RestoreOpSchema-2]
|
||||
_ = x[BackupSchema-3]
|
||||
_ = x[BackupDetailsSchema-4]
|
||||
}
|
||||
|
||||
const _Schema_name = "UnknownSchemaBackupOpSchemaRestoreOpSchemaBackupSchemaBackupDetailsSchema"
|
||||
const _Schema_name = "UnknownSchemaBackupOpSchemaRestoreOpSchemaBackupSchema"
|
||||
|
||||
var _Schema_index = [...]uint8{0, 13, 27, 42, 54, 73}
|
||||
var _Schema_index = [...]uint8{0, 13, 27, 42, 54}
|
||||
|
||||
func (i Schema) String() string {
|
||||
if i < 0 || i >= Schema(len(_Schema_index)-1) {
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
"github.com/alcionai/corso/src/internal/observe"
|
||||
"github.com/alcionai/corso/src/internal/stats"
|
||||
"github.com/alcionai/corso/src/internal/streamstore"
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
"github.com/alcionai/corso/src/pkg/backup"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
@ -242,13 +243,17 @@ func (op *BackupOperation) createBackupModels(
|
||||
return errors.New("no backup details to record")
|
||||
}
|
||||
|
||||
err := op.store.Put(ctx, model.BackupDetailsSchema, &backupDetails.DetailsModel)
|
||||
detailsID, err := streamstore.New(
|
||||
op.kopia,
|
||||
op.account.ID(),
|
||||
op.Selectors.PathService(),
|
||||
).WriteBackupDetails(ctx, backupDetails)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "creating backupdetails model")
|
||||
}
|
||||
|
||||
b := backup.New(
|
||||
snapID, string(backupDetails.ModelStoreID), op.Status.String(),
|
||||
snapID, detailsID, op.Status.String(),
|
||||
op.Results.BackupID,
|
||||
op.Selectors,
|
||||
op.Results.ReadWrites,
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
"github.com/alcionai/corso/src/internal/observe"
|
||||
"github.com/alcionai/corso/src/internal/stats"
|
||||
"github.com/alcionai/corso/src/internal/streamstore"
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/control"
|
||||
@ -116,9 +117,21 @@ func (op *RestoreOperation) Run(ctx context.Context) (restoreDetails *details.De
|
||||
}
|
||||
}()
|
||||
|
||||
deets, bup, err := op.store.GetDetailsFromBackupID(ctx, op.BackupID)
|
||||
dID, bup, err := op.store.GetDetailsIDFromBackupID(ctx, op.BackupID)
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "getting backup details for restore")
|
||||
err = errors.Wrap(err, "getting backup details ID for restore")
|
||||
opStats.readErr = err
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
deets, err := streamstore.New(
|
||||
op.kopia,
|
||||
op.account.ID(),
|
||||
op.Selectors.PathService(),
|
||||
).ReadBackupDetails(ctx, dID)
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "getting backup details data for restore")
|
||||
opStats.readErr = err
|
||||
|
||||
return nil, err
|
||||
|
||||
205
src/internal/streamstore/streamstore.go
Normal file
205
src/internal/streamstore/streamstore.go
Normal file
@ -0,0 +1,205 @@
|
||||
// streamstore implements helpers to store large
|
||||
// data streams in a repository
|
||||
package streamstore
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/data"
|
||||
"github.com/alcionai/corso/src/internal/kopia"
|
||||
"github.com/alcionai/corso/src/internal/stats"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
type streamStore struct {
|
||||
kw *kopia.Wrapper
|
||||
tenant string
|
||||
service path.ServiceType
|
||||
}
|
||||
|
||||
func New(
|
||||
kw *kopia.Wrapper,
|
||||
tenant string,
|
||||
service path.ServiceType,
|
||||
) *streamStore {
|
||||
return &streamStore{kw: kw, tenant: tenant, service: service}
|
||||
}
|
||||
|
||||
const (
|
||||
// detailsItemName is the name of the stream used to store
|
||||
// backup details
|
||||
detailsItemName = "details"
|
||||
// collectionPurposeDetails is used to indicate
|
||||
// what the collection is being used for
|
||||
collectionPurposeDetails = "details"
|
||||
)
|
||||
|
||||
// WriteBackupDetails persists a `details.Details`
|
||||
// object in the stream store
|
||||
func (ss *streamStore) WriteBackupDetails(
|
||||
ctx context.Context,
|
||||
backupDetails *details.Details,
|
||||
) (string, error) {
|
||||
// construct the path of the container for the `details` item
|
||||
p, err := path.Builder{}.
|
||||
ToStreamStorePath(
|
||||
ss.tenant,
|
||||
collectionPurposeDetails,
|
||||
ss.service,
|
||||
false,
|
||||
)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// TODO: We could use an io.Pipe here to avoid a double copy but that
|
||||
// makes error handling a bit complicated
|
||||
dbytes, err := json.Marshal(backupDetails)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "marshalling backup details")
|
||||
}
|
||||
|
||||
dc := &streamCollection{
|
||||
folderPath: p,
|
||||
item: &streamItem{
|
||||
name: detailsItemName,
|
||||
data: dbytes,
|
||||
},
|
||||
}
|
||||
|
||||
backupStats, _, err := ss.kw.BackupCollections(ctx, nil, []data.Collection{dc}, ss.service)
|
||||
if err != nil {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
return backupStats.SnapshotID, nil
|
||||
}
|
||||
|
||||
// ReadBackupDetails reads the specified details object
|
||||
// from the kopia repository
|
||||
func (ss *streamStore) ReadBackupDetails(
|
||||
ctx context.Context,
|
||||
detailsID string,
|
||||
) (*details.Details, error) {
|
||||
// construct the path for the `details` item
|
||||
detailsPath, err := path.Builder{}.
|
||||
Append(detailsItemName).
|
||||
ToStreamStorePath(
|
||||
ss.tenant,
|
||||
collectionPurposeDetails,
|
||||
ss.service,
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var bc stats.ByteCounter
|
||||
|
||||
dcs, err := ss.kw.RestoreMultipleItems(ctx, detailsID, []path.Path{detailsPath}, &bc)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "retrieving backup details data")
|
||||
}
|
||||
|
||||
// Expect only 1 data collection
|
||||
if len(dcs) != 1 {
|
||||
return nil, errors.Errorf("expected 1 details data collection: %d", len(dcs))
|
||||
}
|
||||
|
||||
dc := dcs[0]
|
||||
|
||||
var d details.Details
|
||||
|
||||
found := false
|
||||
items := dc.Items()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, errors.New("context cancelled waiting for backup details data")
|
||||
|
||||
case itemData, ok := <-items:
|
||||
if !ok {
|
||||
if !found {
|
||||
return nil, errors.New("no backup details found")
|
||||
}
|
||||
|
||||
return &d, nil
|
||||
}
|
||||
|
||||
err := json.NewDecoder(itemData.ToReader()).Decode(&d)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to decode details data from repository")
|
||||
}
|
||||
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteBackupDetails deletes the specified details object from the kopia repository
|
||||
func (ss *streamStore) DeleteBackupDetails(
|
||||
ctx context.Context,
|
||||
detailsID string,
|
||||
) error {
|
||||
err := ss.kw.DeleteSnapshot(ctx, detailsID)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "deleting backup details failed")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// streamCollection is a data.Collection used to persist
|
||||
// a single data stream
|
||||
type streamCollection struct {
|
||||
// folderPath indicates what level in the hierarchy this collection
|
||||
// represents
|
||||
folderPath path.Path
|
||||
item *streamItem
|
||||
}
|
||||
|
||||
func (dc *streamCollection) FullPath() path.Path {
|
||||
return dc.folderPath
|
||||
}
|
||||
|
||||
func (dc *streamCollection) PreviousPath() path.Path {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dc *streamCollection) State() data.CollectionState {
|
||||
return data.NewState
|
||||
}
|
||||
|
||||
// Items() always returns a channel with a single data.Stream
|
||||
// representing the object to be persisted
|
||||
func (dc *streamCollection) Items() <-chan data.Stream {
|
||||
items := make(chan data.Stream, 1)
|
||||
defer close(items)
|
||||
items <- dc.item
|
||||
|
||||
return items
|
||||
}
|
||||
|
||||
type streamItem struct {
|
||||
name string
|
||||
data []byte
|
||||
}
|
||||
|
||||
func (di *streamItem) UUID() string {
|
||||
return di.name
|
||||
}
|
||||
|
||||
func (di *streamItem) ToReader() io.ReadCloser {
|
||||
return io.NopCloser(bytes.NewReader(di.data))
|
||||
}
|
||||
|
||||
func (di *streamItem) Deleted() bool {
|
||||
return false
|
||||
}
|
||||
74
src/internal/streamstore/streamstore_test.go
Normal file
74
src/internal/streamstore/streamstore_test.go
Normal file
@ -0,0 +1,74 @@
|
||||
package streamstore
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/kopia"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
type StreamStoreIntegrationSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func TestStreamStoreIntegrationSuite(t *testing.T) {
|
||||
if err := tester.RunOnAny(
|
||||
tester.CorsoCITests,
|
||||
); err != nil {
|
||||
t.Skip(err)
|
||||
}
|
||||
|
||||
suite.Run(t, new(StreamStoreIntegrationSuite))
|
||||
}
|
||||
|
||||
func (suite *StreamStoreIntegrationSuite) TestDetails() {
|
||||
t := suite.T()
|
||||
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
// need to initialize the repository before we can test connecting to it.
|
||||
st := tester.NewPrefixedS3Storage(t)
|
||||
|
||||
k := kopia.NewConn(st)
|
||||
require.NoError(t, k.Initialize(ctx))
|
||||
|
||||
defer k.Close(ctx)
|
||||
|
||||
kw, err := kopia.NewWrapper(k)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer kw.Close(ctx)
|
||||
|
||||
deets := &details.Details{}
|
||||
|
||||
deets.Add("ref", "shortref", "parentref",
|
||||
details.ItemInfo{
|
||||
Exchange: &details.ExchangeInfo{
|
||||
Subject: "hello world",
|
||||
},
|
||||
})
|
||||
|
||||
ss := New(kw, "tenant", path.ExchangeService)
|
||||
|
||||
id, err := ss.WriteBackupDetails(ctx, deets)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, id)
|
||||
|
||||
readDeets, err := ss.ReadBackupDetails(ctx, id)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, readDeets)
|
||||
|
||||
assert.Equal(t, len(deets.Entries), len(readDeets.Entries))
|
||||
assert.Equal(t, deets.Entries[0].ParentRef, readDeets.Entries[0].ParentRef)
|
||||
assert.Equal(t, deets.Entries[0].ShortRef, readDeets.Entries[0].ShortRef)
|
||||
assert.Equal(t, deets.Entries[0].RepoRef, readDeets.Entries[0].RepoRef)
|
||||
assert.NotNil(t, readDeets.Entries[0].Exchange)
|
||||
assert.Equal(t, *deets.Entries[0].Exchange, *readDeets.Entries[0].Exchange)
|
||||
}
|
||||
@ -10,7 +10,6 @@ import (
|
||||
|
||||
"github.com/alcionai/corso/src/cli/print"
|
||||
"github.com/alcionai/corso/src/internal/common"
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
)
|
||||
|
||||
type FolderEntry struct {
|
||||
@ -26,7 +25,6 @@ type FolderEntry struct {
|
||||
|
||||
// DetailsModel describes what was stored in a Backup
|
||||
type DetailsModel struct {
|
||||
model.BaseModel
|
||||
Entries []DetailsEntry `json:"entries"`
|
||||
}
|
||||
|
||||
|
||||
@ -15,11 +15,12 @@ func _() {
|
||||
_ = x[FilesCategory-4]
|
||||
_ = x[ListsCategory-5]
|
||||
_ = x[LibrariesCategory-6]
|
||||
_ = x[DetailsCategory-7]
|
||||
}
|
||||
|
||||
const _CategoryType_name = "UnknownCategoryemailcontactseventsfileslistslibraries"
|
||||
const _CategoryType_name = "UnknownCategoryemailcontactseventsfileslistslibrariesdetails"
|
||||
|
||||
var _CategoryType_index = [...]uint8{0, 15, 20, 28, 34, 39, 44, 53}
|
||||
var _CategoryType_index = [...]uint8{0, 15, 20, 28, 34, 39, 44, 53, 60}
|
||||
|
||||
func (i CategoryType) String() string {
|
||||
if i < 0 || i >= CategoryType(len(_CategoryType_index)-1) {
|
||||
|
||||
@ -281,6 +281,43 @@ func (pb Builder) withPrefix(elements ...string) *Builder {
|
||||
return res
|
||||
}
|
||||
|
||||
func (pb Builder) ToStreamStorePath(
|
||||
tenant, purpose string,
|
||||
service ServiceType,
|
||||
isItem bool,
|
||||
) (Path, error) {
|
||||
if err := verifyInputValues(tenant, purpose); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if isItem && len(pb.elements) == 0 {
|
||||
return nil, errors.New("missing path beyond prefix")
|
||||
}
|
||||
|
||||
metadataService := UnknownService
|
||||
|
||||
switch service {
|
||||
case ExchangeService:
|
||||
metadataService = ExchangeMetadataService
|
||||
case OneDriveService:
|
||||
metadataService = OneDriveMetadataService
|
||||
case SharePointService:
|
||||
metadataService = SharePointMetadataService
|
||||
}
|
||||
|
||||
return &dataLayerResourcePath{
|
||||
Builder: *pb.withPrefix(
|
||||
tenant,
|
||||
metadataService.String(),
|
||||
purpose,
|
||||
DetailsCategory.String(),
|
||||
),
|
||||
service: metadataService,
|
||||
category: DetailsCategory,
|
||||
hasItem: isItem,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (pb Builder) ToServiceCategoryMetadataPath(
|
||||
tenant, user string,
|
||||
service ServiceType,
|
||||
|
||||
@ -65,6 +65,7 @@ const (
|
||||
FilesCategory // files
|
||||
ListsCategory // lists
|
||||
LibrariesCategory // libraries
|
||||
DetailsCategory // details
|
||||
)
|
||||
|
||||
func ToCategoryType(category string) CategoryType {
|
||||
@ -81,6 +82,8 @@ func ToCategoryType(category string) CategoryType {
|
||||
return LibrariesCategory
|
||||
case ListsCategory.String():
|
||||
return ListsCategory
|
||||
case DetailsCategory.String():
|
||||
return DetailsCategory
|
||||
default:
|
||||
return UnknownCategory
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
"github.com/alcionai/corso/src/internal/observe"
|
||||
"github.com/alcionai/corso/src/internal/operations"
|
||||
"github.com/alcionai/corso/src/internal/streamstore"
|
||||
"github.com/alcionai/corso/src/pkg/account"
|
||||
"github.com/alcionai/corso/src/pkg/backup"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
@ -270,7 +271,21 @@ func (r repository) BackupsByTag(ctx context.Context, fs ...store.FilterOption)
|
||||
// BackupDetails returns the specified backup details object
|
||||
func (r repository) BackupDetails(ctx context.Context, backupID string) (*details.Details, *backup.Backup, error) {
|
||||
sw := store.NewKopiaStore(r.modelStore)
|
||||
return sw.GetDetailsFromBackupID(ctx, model.StableID(backupID))
|
||||
|
||||
dID, b, err := sw.GetDetailsIDFromBackupID(ctx, model.StableID(backupID))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
deets, err := streamstore.New(
|
||||
r.dataLayer,
|
||||
r.Account.ID(),
|
||||
b.Selectors.PathService()).ReadBackupDetails(ctx, dID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return deets, b, nil
|
||||
}
|
||||
|
||||
// DeleteBackup removes the backup from both the model store and the backup storage.
|
||||
|
||||
@ -3,12 +3,10 @@ package store
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
"github.com/alcionai/corso/src/pkg/backup"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/path"
|
||||
)
|
||||
|
||||
@ -81,44 +79,18 @@ func (w Wrapper) GetBackups(
|
||||
|
||||
// DeleteBackup deletes the backup and its details entry from the model store.
|
||||
func (w Wrapper) DeleteBackup(ctx context.Context, backupID model.StableID) error {
|
||||
deets, _, err := w.GetDetailsFromBackupID(ctx, backupID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := w.Delete(ctx, model.BackupDetailsSchema, deets.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return w.Delete(ctx, model.BackupSchema, backupID)
|
||||
}
|
||||
|
||||
// GetDetails gets the backup details by ID.
|
||||
func (w Wrapper) GetDetails(ctx context.Context, detailsID manifest.ID) (*details.Details, error) {
|
||||
d := details.Details{}
|
||||
|
||||
err := w.GetWithModelStoreID(ctx, model.BackupDetailsSchema, detailsID, &d)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "getting details")
|
||||
}
|
||||
|
||||
return &d, nil
|
||||
}
|
||||
|
||||
// GetDetailsFromBackupID retrieves the backup.Details within the specified backup.
|
||||
func (w Wrapper) GetDetailsFromBackupID(
|
||||
func (w Wrapper) GetDetailsIDFromBackupID(
|
||||
ctx context.Context,
|
||||
backupID model.StableID,
|
||||
) (*details.Details, *backup.Backup, error) {
|
||||
) (string, *backup.Backup, error) {
|
||||
b, err := w.GetBackup(ctx, backupID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return "", nil, err
|
||||
}
|
||||
|
||||
d, err := w.GetDetails(ctx, manifest.ID(b.DetailsID))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return d, b, nil
|
||||
return b.DetailsID, b, nil
|
||||
}
|
||||
|
||||
@ -12,7 +12,6 @@ import (
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
"github.com/alcionai/corso/src/internal/tester"
|
||||
"github.com/alcionai/corso/src/pkg/backup"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
"github.com/alcionai/corso/src/pkg/store"
|
||||
storeMock "github.com/alcionai/corso/src/pkg/store/mock"
|
||||
)
|
||||
@ -32,14 +31,6 @@ var (
|
||||
SnapshotID: uuid.NewString(),
|
||||
DetailsID: detailsID,
|
||||
}
|
||||
deets = details.Details{
|
||||
DetailsModel: details.DetailsModel{
|
||||
BaseModel: model.BaseModel{
|
||||
ID: model.StableID(detailsID),
|
||||
ModelStoreID: manifest.ID(uuid.NewString()),
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
type StoreBackupUnitSuite struct {
|
||||
@ -61,12 +52,12 @@ func (suite *StoreBackupUnitSuite) TestGetBackup() {
|
||||
}{
|
||||
{
|
||||
name: "gets backup",
|
||||
mock: storeMock.NewMock(&bu, nil, nil),
|
||||
mock: storeMock.NewMock(&bu, nil),
|
||||
expect: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "errors",
|
||||
mock: storeMock.NewMock(&bu, nil, assert.AnError),
|
||||
mock: storeMock.NewMock(&bu, assert.AnError),
|
||||
expect: assert.Error,
|
||||
},
|
||||
}
|
||||
@ -94,12 +85,12 @@ func (suite *StoreBackupUnitSuite) TestGetBackups() {
|
||||
}{
|
||||
{
|
||||
name: "gets backups",
|
||||
mock: storeMock.NewMock(&bu, nil, nil),
|
||||
mock: storeMock.NewMock(&bu, nil),
|
||||
expect: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "errors",
|
||||
mock: storeMock.NewMock(&bu, nil, assert.AnError),
|
||||
mock: storeMock.NewMock(&bu, assert.AnError),
|
||||
expect: assert.Error,
|
||||
},
|
||||
}
|
||||
@ -128,12 +119,12 @@ func (suite *StoreBackupUnitSuite) TestDeleteBackup() {
|
||||
}{
|
||||
{
|
||||
name: "deletes backup",
|
||||
mock: storeMock.NewMock(&bu, &deets, nil),
|
||||
mock: storeMock.NewMock(&bu, nil),
|
||||
expect: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "errors",
|
||||
mock: storeMock.NewMock(&bu, &deets, assert.AnError),
|
||||
mock: storeMock.NewMock(&bu, assert.AnError),
|
||||
expect: assert.Error,
|
||||
},
|
||||
}
|
||||
@ -146,40 +137,7 @@ func (suite *StoreBackupUnitSuite) TestDeleteBackup() {
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *StoreBackupUnitSuite) TestGetDetails() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
table := []struct {
|
||||
name string
|
||||
mock *storeMock.MockModelStore
|
||||
expect assert.ErrorAssertionFunc
|
||||
}{
|
||||
{
|
||||
name: "gets details",
|
||||
mock: storeMock.NewMock(nil, &deets, nil),
|
||||
expect: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "errors",
|
||||
mock: storeMock.NewMock(nil, &deets, assert.AnError),
|
||||
expect: assert.Error,
|
||||
},
|
||||
}
|
||||
for _, test := range table {
|
||||
suite.T().Run(test.name, func(t *testing.T) {
|
||||
sm := &store.Wrapper{Storer: test.mock}
|
||||
result, err := sm.GetDetails(ctx, manifest.ID(uuid.NewString()))
|
||||
test.expect(t, err)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
assert.Equal(t, deets.ID, result.ID)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *StoreBackupUnitSuite) TestGetDetailsFromBackupID() {
|
||||
func (suite *StoreBackupUnitSuite) TestGetDetailsIDFromBackupID() {
|
||||
ctx, flush := tester.NewContext()
|
||||
defer flush()
|
||||
|
||||
@ -190,24 +148,24 @@ func (suite *StoreBackupUnitSuite) TestGetDetailsFromBackupID() {
|
||||
}{
|
||||
{
|
||||
name: "gets details from backup id",
|
||||
mock: storeMock.NewMock(&bu, &deets, nil),
|
||||
mock: storeMock.NewMock(&bu, nil),
|
||||
expect: assert.NoError,
|
||||
},
|
||||
{
|
||||
name: "errors",
|
||||
mock: storeMock.NewMock(&bu, &deets, assert.AnError),
|
||||
mock: storeMock.NewMock(&bu, assert.AnError),
|
||||
expect: assert.Error,
|
||||
},
|
||||
}
|
||||
for _, test := range table {
|
||||
suite.T().Run(test.name, func(t *testing.T) {
|
||||
store := &store.Wrapper{Storer: test.mock}
|
||||
dResult, bResult, err := store.GetDetailsFromBackupID(ctx, model.StableID(uuid.NewString()))
|
||||
dResult, bResult, err := store.GetDetailsIDFromBackupID(ctx, model.StableID(uuid.NewString()))
|
||||
test.expect(t, err)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
assert.Equal(t, deets.ID, dResult.ID)
|
||||
assert.Equal(t, bu.DetailsID, dResult)
|
||||
assert.Equal(t, bu.ID, bResult.ID)
|
||||
})
|
||||
}
|
||||
|
||||
@ -8,7 +8,6 @@ import (
|
||||
|
||||
"github.com/alcionai/corso/src/internal/model"
|
||||
"github.com/alcionai/corso/src/pkg/backup"
|
||||
"github.com/alcionai/corso/src/pkg/backup/details"
|
||||
)
|
||||
|
||||
// ------------------------------------------------------------
|
||||
@ -16,16 +15,14 @@ import (
|
||||
// ------------------------------------------------------------
|
||||
|
||||
type MockModelStore struct {
|
||||
backup *backup.Backup
|
||||
details *details.Details
|
||||
err error
|
||||
backup *backup.Backup
|
||||
err error
|
||||
}
|
||||
|
||||
func NewMock(b *backup.Backup, d *details.Details, err error) *MockModelStore {
|
||||
func NewMock(b *backup.Backup, err error) *MockModelStore {
|
||||
return &MockModelStore{
|
||||
backup: b,
|
||||
details: d,
|
||||
err: err,
|
||||
backup: b,
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
@ -60,10 +57,6 @@ func (mms *MockModelStore) Get(
|
||||
bm := data.(*backup.Backup)
|
||||
*bm = *mms.backup
|
||||
|
||||
case model.BackupDetailsSchema:
|
||||
dm := data.(*details.Details)
|
||||
dm.DetailsModel = mms.details.DetailsModel
|
||||
|
||||
default:
|
||||
return errors.Errorf("schema %s not supported by mock Get", s)
|
||||
}
|
||||
@ -84,12 +77,6 @@ func (mms *MockModelStore) GetIDsForType(
|
||||
case model.BackupSchema:
|
||||
b := *mms.backup
|
||||
return []*model.BaseModel{&b.BaseModel}, nil
|
||||
|
||||
case model.BackupDetailsSchema:
|
||||
d := details.Details{}
|
||||
d.DetailsModel = mms.details.DetailsModel
|
||||
|
||||
return []*model.BaseModel{&d.BaseModel}, nil
|
||||
}
|
||||
|
||||
return nil, errors.Errorf("schema %s not supported by mock GetIDsForType", s)
|
||||
@ -110,10 +97,6 @@ func (mms *MockModelStore) GetWithModelStoreID(
|
||||
bm := data.(*backup.Backup)
|
||||
*bm = *mms.backup
|
||||
|
||||
case model.BackupDetailsSchema:
|
||||
dm := data.(*details.Details)
|
||||
dm.DetailsModel = mms.details.DetailsModel
|
||||
|
||||
default:
|
||||
return errors.Errorf("schema %s not supported by mock GetWithModelStoreID", s)
|
||||
}
|
||||
@ -131,10 +114,6 @@ func (mms *MockModelStore) Put(ctx context.Context, s model.Schema, m model.Mode
|
||||
bm := m.(*backup.Backup)
|
||||
mms.backup = bm
|
||||
|
||||
case model.BackupDetailsSchema:
|
||||
dm := m.(*details.Details)
|
||||
mms.details = dm
|
||||
|
||||
default:
|
||||
return errors.Errorf("schema %s not supported by mock Put", s)
|
||||
}
|
||||
@ -148,10 +127,6 @@ func (mms *MockModelStore) Update(ctx context.Context, s model.Schema, m model.M
|
||||
bm := m.(*backup.Backup)
|
||||
mms.backup = bm
|
||||
|
||||
case model.BackupDetailsSchema:
|
||||
dm := m.(*details.Details)
|
||||
mms.details = dm
|
||||
|
||||
default:
|
||||
return errors.Errorf("schema %s not supported by mock Update", s)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user